Dataset

Dataset is a class for handling data in workflows. It stores the data in a polars DataFrame, the metadata in a pyarrow schema. It provides methods for manipulating the metadata.

class workflow_utils.dataset.Dataset(df: DataFrame, roles: list[Role] | None = None, schema: Schema | None = None)

Bases: object

Dataset handle the data and the contextual information about the data.

properties:

df: polars.DataFrame contains the data, stored as a polars DataFrame, the metadata rows are not included in the DataFrame.

roles : list[Role] contains the roles of the columns in the DataFrame. order will match the order of the columns in the DataFrame.

Example:

>>> dataset.roles
>>> [Role(ColumnType='PrimaryId'), Role(ColumnType='BatchId'), Role(ColumnType='Data'),
Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'),
Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Age')]

schema : pa.Schema the pyarrow schema of the Dataset, contains the datatype of all columns and if they are metadata columns. contains information about the metadata rows.

meta_data_df : pl.DataFrame contains the metadata rows as a polars DataFrame, will be empty if no metadata rows are present.

cast_categorical(column: str) None

Casts the specified column in the dataset to a categorical data type.

Parameters:

column (str) – The name of the column to be casted.

Returns:

None

Return type:

None

contextualize(roles: dict[str, Role] | None = None, date_format: str | None = None) None

Contextualize the dataset based on the provided roles.

Parameters:
  • roles – A dictionary of roles where the keys are the column names and the values are Role objects.

  • date_format – The date format for the batch time column. Defaults to None.

Example:

>>> dataset.contextualize(roles={"column1": Role(ColumnType=_BATCH_ID_TYPE)}, date_format="%d/%m/%Y%H:%M:%S")
date_format must have the same number format specifiers as the date strings in the column.
>>> dataset.contextualize(roles={"column1": Role(ColumnType=_BATCH_ID_TYPE)}, date_format="%d/%m/%Y%H:%M")
will raise an error as the date strings in the dataset has the format "20/08/2019 13:24:07".
contextualize_arrow_table(df: DataFrame, batchid: str, btime: str) Table

Contextualizes the Arrow table by adding context information.

Parameters:
  • df (pl.DataFrame) – The input DataFrame.

  • batchid (str) – The batch ID.

  • btime (str) – The time of the batch.

Returns:

The contextualized Arrow table.

Return type:

pa.Table

contextualize_df(batchid: str, batchtime: str, date_format: str | None = None) None

This function modifies the dataframe ‘df’ by adding and modifying certain columns.

Parameters:
  • batchid (str) – The name of the batch id column.

  • batchtime (str) – The name of the batch time column.

  • date_format (str, optional) – The date format for the batch time column. Defaults to None.

Returns:

None

Return type:

None

contextualize_schema(roles: dict[str, Role] | None = None) None

This function modifies the schema of the dataframe ‘df’ based on the roles provided.

Parameters:

roles (dict[str, Role], optional) – A dictionary mapping column names to their roles.

Returns:

None

Return type:

None

copy() Dataset
copy_metadata(source: Schema) Schema

Copy metadata from a source schema to the current schema.

Parameters:

source (pa.Schema) – The source schema.

Returns:

The modified schema.

Return type:

pa.Schema

create_metadata_frame() DataFrame

Creates a DataFrame containing metadata information.

return:

A DataFrame containing metadata information.

rtype:

pl.DataFrame

Example:
>>> metadata_df = dataset.create_metadata_frame()
metadata_df will contain a DataFrame with only the metadata rows
classmethod from_arrow(table: Table) Dataset

Create a Dataset from an Arrow table.

Parameters:

table (pa.Table) – The Arrow table to parse.

Returns:

The parsed Dataset object.

Return type:

Dataset

classmethod from_arrow_file(file_path: str) Dataset

Create a Dataset from an Arrow file. :param file_path: The path to the Arrow file. :type file_path: str :return: The parsed Dataset object. :rtype: Dataset

classmethod from_filelike(filelike: bytes) Dataset

Create a Dataset from a file-like object.

Example:

>>> data = BytesIO(arrow_file.read())
>>> dataset = Dataset.from_filelike(data)
generate_primary_id() None

Generates a primary ID column by concatenating the batch ID and age columns.

If the “Primary ID” column does not exist, it creates it by concatenating the batch ID and age columns. The batch ID column is obtained using the get_batch_id_column method, and the age column is obtained using the get_age_column method.

If the age column does not exist, the method returns without making any changes.

The primary ID column is created by concatenating the batch ID and age columns using the concat_str function from the pyspark.sql.functions module. The resulting column is then inserted at the beginning of the DataFrame using the insert_column method. The RoleType.PRIMARY_ID.value is also inserted at the beginning of the roles list.

Returns:

None

Return type:

None

get_age_column() str | None

This function returns the name of the Age column in the schema.

Parameters:

schema (pa.schema) – The schema of the dataframe.

Returns:

The name of the Age column.

Return type:

str

get_batch_id_column() str

This function returns the name of the BatchId column in the schema.

Parameters:

schema (pa.Schema) – The schema of the dataframe.

Returns:

The name of the BatchId column.

Return type:

str

Raises:

ValueError – If the BatchId column is not found in the schema.

get_categorical_columns() list[str]

Returns a list of column names that contain categorical data.

Returns:

A list of column names that contain categorical data.

Return type:

list[str]

get_columns(roles: list[str]) list[str]

This function returns the names of the columns in the Dataset that have a certain role.

Parameters:

roles (list[str]) – The roles to filter by.

Returns:

The names of the columns.

Return type:

list[str]

Example:

>>> dataset.get_columns(["PrimaryId", "BatchId"])
returns the names of the columns with role PrimaryId or BatchId
get_data_columns() list[str]

This function returns the names of the Data columns in the schema.

Returns:

The names of the Data columns.

Return type:

list[str]

get_non_data_columns() list[str]

This function returns the names of the columns in the schema that are not Data columns.

Returns:

The names of the non-Data columns.

Return type:

list[str]

get_payload_data() DataFrame

Returns the payload data from the dataset.

Returns:

The payload data as a DataFrame.

Return type:

pl.DataFrame

Example:

>>> payload_data = dataset.get_payload_data()
payload_data will contain only the data columns without metadata columns
get_phase_id_column() str | None

Returns the phase ID column from the dataset schema.

If a column named ‘phaseid’ exists in the schema, it is returned.

Returns:

The name of the phase ID column.

Return type:

str

get_primary_id_column() str

This function returns the name of the PrimaryId column in the schema.

Parameters:

schema (pa.Schema) – The schema of the dataframe.

Returns:

The name of the PrimaryId column.

Return type:

str

Raises:

ValueError – If the PrimaryId column is not found in the schema.

get_secondary_id_columns() list[str] | None

This function returns the name of the SecondaryId column in the schema.

Parameters:

schema (pa.Schema) – The schema of the dataframe.

Returns:

The name of the SecondaryId column.

Return type:

list[str] or None if SecondaryId column is not found in the schema.

get_text_columns() list[str]

Returns a list of column names that contain text data.

Returns:

A list of column names that contain text data.

Return type:

list[str]

parse_roles() list[Role]

This function parses the roles from the schema.

Returns:

The roles.

Return type:

list[Role]

remove_metadata(column: str) Schema

This function removes the metadata from a column in the schema.

Parameters:

column (str) – The name of the column.

Returns:

The modified schema.

Return type:

pyarrow.Schema

remove_role_metadata(column: str) None

Removes the metadata from the specified column in the dataset schema.

Parameters:

column (str) – The name of the column to remove metadata from.

Returns:

None

set_role(column: str, role: Role) Schema

This function sets the role of a column in the schema.

Parameters:
  • column (str) – The name of the column.

  • role (str) – The role to set for the column.

Returns:

The modified schema.

Return type:

pyarrow.Schema

Example:

>>> dataset.set_role("Time", Role(ColumnType="Age"))
Time will have the role Age in roles and schema
to_table() Table

This function converts the dataframe ‘df’ to an Arrow table and casts it to the stored schema.

Returns:

The converted Arrow table.

Return type:

pyarrow.Table

update_after_removal() None

This function updates the schema after columns have been removed from the dataframe.

Returns:

None

Return type:

None

update_roles() None

Update the roles of the dataset based on the schema metadata.

This method iterates over each field in the schema and checks if it has metadata. If a field has metadata, a new Role object is created using the metadata and added to the roles list. If a field does not have metadata, a new Role object with an empty ColumnType is added to the roles list.

Returns:

None

workflow_utils.dataset.concat_datasets(datasets: list[Dataset]) Dataset

This function concatenates a list of datasets into a single dataset.

Parameters:

datasets (List[Dataset]) – The list of datasets.

Returns:

The concatenated dataset.

Return type:

Dataset

Example:

>>> merged_dataset = concat_datasets([dataset1, dataset2])
merges dataset1 and dataset2 into a single dataset
workflow_utils.dataset.parse_arrow_table(table: Table) Dataset

Parse an Arrow table into a Dataset.

Parameters:

table (pa.Table) – The Arrow table to parse.

Returns:

The parsed Dataset object.

Return type:

Dataset

Raises:

None

Example:

>>> table = pa.Table.from_pandas(df)
>>> dataset = parse_arrow_table(table)
workflow_utils.dataset.parse_roles(schema: Schema) list[Role]

Parse the roles from the given schema.

Parameters:

schema (pa.Schema) – The schema to parse roles from.

Returns:

A list of Role objects parsed from the schema.

Return type:

list[Role]