Dataset
Dataset is a class for handling data in workflows. It stores the data in a polars DataFrame, the metadata in a pyarrow schema. It provides methods for manipulating the metadata.
- class workflow_utils.dataset.Dataset(df: DataFrame, roles: list[Role] | None = None, schema: Schema | None = None)
Bases:
objectDataset handle the data and the contextual information about the data.
properties:
df: polars.DataFrame contains the data, stored as a polars DataFrame, the metadata rows are not included in the DataFrame.
roles : list[Role] contains the roles of the columns in the DataFrame. order will match the order of the columns in the DataFrame.
- Example:
>>> dataset.roles >>> [Role(ColumnType='PrimaryId'), Role(ColumnType='BatchId'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Data'), Role(ColumnType='Age')]
schema : pa.Schema the pyarrow schema of the Dataset, contains the datatype of all columns and if they are metadata columns. contains information about the metadata rows.
meta_data_df : pl.DataFrame contains the metadata rows as a polars DataFrame, will be empty if no metadata rows are present.
- cast_categorical(column: str) None
Casts the specified column in the dataset to a categorical data type.
- Parameters:
column (str) – The name of the column to be casted.
- Returns:
None
- Return type:
None
- contextualize(roles: dict[str, Role] | None = None, date_format: str | None = None) None
Contextualize the dataset based on the provided roles.
- Parameters:
roles – A dictionary of roles where the keys are the column names and the values are Role objects.
date_format – The date format for the batch time column. Defaults to None.
- Example:
>>> dataset.contextualize(roles={"column1": Role(ColumnType=_BATCH_ID_TYPE)}, date_format="%d/%m/%Y%H:%M:%S") date_format must have the same number format specifiers as the date strings in the column. >>> dataset.contextualize(roles={"column1": Role(ColumnType=_BATCH_ID_TYPE)}, date_format="%d/%m/%Y%H:%M") will raise an error as the date strings in the dataset has the format "20/08/2019 13:24:07".
- contextualize_arrow_table(df: DataFrame, batchid: str, btime: str) Table
Contextualizes the Arrow table by adding context information.
- Parameters:
df (pl.DataFrame) – The input DataFrame.
batchid (str) – The batch ID.
btime (str) – The time of the batch.
- Returns:
The contextualized Arrow table.
- Return type:
pa.Table
- contextualize_df(batchid: str, batchtime: str, date_format: str | None = None) None
This function modifies the dataframe ‘df’ by adding and modifying certain columns.
- Parameters:
batchid (str) – The name of the batch id column.
batchtime (str) – The name of the batch time column.
date_format (str, optional) – The date format for the batch time column. Defaults to None.
- Returns:
None
- Return type:
None
- contextualize_schema(roles: dict[str, Role] | None = None) None
This function modifies the schema of the dataframe ‘df’ based on the roles provided.
- Parameters:
roles (dict[str, Role], optional) – A dictionary mapping column names to their roles.
- Returns:
None
- Return type:
None
- copy_metadata(source: Schema) Schema
Copy metadata from a source schema to the current schema.
- Parameters:
source (pa.Schema) – The source schema.
- Returns:
The modified schema.
- Return type:
pa.Schema
- create_metadata_frame() DataFrame
Creates a DataFrame containing metadata information.
- return:
A DataFrame containing metadata information.
- rtype:
pl.DataFrame
- Example:
>>> metadata_df = dataset.create_metadata_frame() metadata_df will contain a DataFrame with only the metadata rows
- classmethod from_arrow(table: Table) Dataset
Create a Dataset from an Arrow table.
- Parameters:
table (pa.Table) – The Arrow table to parse.
- Returns:
The parsed Dataset object.
- Return type:
- classmethod from_arrow_file(file_path: str) Dataset
Create a Dataset from an Arrow file. :param file_path: The path to the Arrow file. :type file_path: str :return: The parsed Dataset object. :rtype: Dataset
- classmethod from_filelike(filelike: bytes) Dataset
Create a Dataset from a file-like object.
- Example:
>>> data = BytesIO(arrow_file.read()) >>> dataset = Dataset.from_filelike(data)
- generate_primary_id() None
Generates a primary ID column by concatenating the batch ID and age columns.
If the “Primary ID” column does not exist, it creates it by concatenating the batch ID and age columns. The batch ID column is obtained using the get_batch_id_column method, and the age column is obtained using the get_age_column method.
If the age column does not exist, the method returns without making any changes.
The primary ID column is created by concatenating the batch ID and age columns using the concat_str function from the pyspark.sql.functions module. The resulting column is then inserted at the beginning of the DataFrame using the insert_column method. The RoleType.PRIMARY_ID.value is also inserted at the beginning of the roles list.
- Returns:
None
- Return type:
None
- get_age_column() str | None
This function returns the name of the Age column in the schema.
- Parameters:
schema (pa.schema) – The schema of the dataframe.
- Returns:
The name of the Age column.
- Return type:
str
- get_batch_id_column() str
This function returns the name of the BatchId column in the schema.
- Parameters:
schema (pa.Schema) – The schema of the dataframe.
- Returns:
The name of the BatchId column.
- Return type:
str
- Raises:
ValueError – If the BatchId column is not found in the schema.
- get_categorical_columns() list[str]
Returns a list of column names that contain categorical data.
- Returns:
A list of column names that contain categorical data.
- Return type:
list[str]
- get_columns(roles: list[str]) list[str]
This function returns the names of the columns in the Dataset that have a certain role.
- Parameters:
roles (list[str]) – The roles to filter by.
- Returns:
The names of the columns.
- Return type:
list[str]
- Example:
>>> dataset.get_columns(["PrimaryId", "BatchId"]) returns the names of the columns with role PrimaryId or BatchId
- get_data_columns() list[str]
This function returns the names of the Data columns in the schema.
- Returns:
The names of the Data columns.
- Return type:
list[str]
- get_non_data_columns() list[str]
This function returns the names of the columns in the schema that are not Data columns.
- Returns:
The names of the non-Data columns.
- Return type:
list[str]
- get_payload_data() DataFrame
Returns the payload data from the dataset.
- Returns:
The payload data as a DataFrame.
- Return type:
pl.DataFrame
- Example:
>>> payload_data = dataset.get_payload_data() payload_data will contain only the data columns without metadata columns
- get_phase_id_column() str | None
Returns the phase ID column from the dataset schema.
If a column named ‘phaseid’ exists in the schema, it is returned.
- Returns:
The name of the phase ID column.
- Return type:
str
- get_primary_id_column() str
This function returns the name of the PrimaryId column in the schema.
- Parameters:
schema (pa.Schema) – The schema of the dataframe.
- Returns:
The name of the PrimaryId column.
- Return type:
str
- Raises:
ValueError – If the PrimaryId column is not found in the schema.
- get_secondary_id_columns() list[str] | None
This function returns the name of the SecondaryId column in the schema.
- Parameters:
schema (pa.Schema) – The schema of the dataframe.
- Returns:
The name of the SecondaryId column.
- Return type:
list[str] or None if SecondaryId column is not found in the schema.
- get_text_columns() list[str]
Returns a list of column names that contain text data.
- Returns:
A list of column names that contain text data.
- Return type:
list[str]
- parse_roles() list[Role]
This function parses the roles from the schema.
- Returns:
The roles.
- Return type:
list[Role]
- remove_metadata(column: str) Schema
This function removes the metadata from a column in the schema.
- Parameters:
column (str) – The name of the column.
- Returns:
The modified schema.
- Return type:
pyarrow.Schema
- remove_role_metadata(column: str) None
Removes the metadata from the specified column in the dataset schema.
- Parameters:
column (str) – The name of the column to remove metadata from.
- Returns:
None
- set_role(column: str, role: Role) Schema
This function sets the role of a column in the schema.
- Parameters:
column (str) – The name of the column.
role (str) – The role to set for the column.
- Returns:
The modified schema.
- Return type:
pyarrow.Schema
- Example:
>>> dataset.set_role("Time", Role(ColumnType="Age")) Time will have the role Age in roles and schema
- to_table() Table
This function converts the dataframe ‘df’ to an Arrow table and casts it to the stored schema.
- Returns:
The converted Arrow table.
- Return type:
pyarrow.Table
- update_after_removal() None
This function updates the schema after columns have been removed from the dataframe.
- Returns:
None
- Return type:
None
- update_roles() None
Update the roles of the dataset based on the schema metadata.
This method iterates over each field in the schema and checks if it has metadata. If a field has metadata, a new Role object is created using the metadata and added to the roles list. If a field does not have metadata, a new Role object with an empty ColumnType is added to the roles list.
- Returns:
None
- workflow_utils.dataset.concat_datasets(datasets: list[Dataset]) Dataset
This function concatenates a list of datasets into a single dataset.
- Parameters:
datasets (List[Dataset]) – The list of datasets.
- Returns:
The concatenated dataset.
- Return type:
- Example:
>>> merged_dataset = concat_datasets([dataset1, dataset2]) merges dataset1 and dataset2 into a single dataset
- workflow_utils.dataset.parse_arrow_table(table: Table) Dataset
Parse an Arrow table into a Dataset.
- Parameters:
table (pa.Table) – The Arrow table to parse.
- Returns:
The parsed Dataset object.
- Return type:
- Raises:
None
- Example:
>>> table = pa.Table.from_pandas(df) >>> dataset = parse_arrow_table(table)
- workflow_utils.dataset.parse_roles(schema: Schema) list[Role]
Parse the roles from the given schema.
- Parameters:
schema (pa.Schema) – The schema to parse roles from.
- Returns:
A list of Role objects parsed from the schema.
- Return type:
list[Role]