Workflow IO

class workflow_utils.workflow_io.WorkflowIo(config_path: str = '/app/config.json')

Bases: object

argument_type(parameter: int) → ArgumentType

Get the type of an input parameter.

  • Parameters: parameter (int) – The parameter number of the input (1 or greater)
  • Returns: The type of the input parameter, such as INTEGER, STRING, etc.
  • Return type: ArgumentType

enum_value(*, parameter_name: str | None = None, parameter: int | None = None) → str | None

Get the input enum value as a string, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str) – The name of the input parameter. If provided, the value will be retrieved based on the parameter name.
  • parameter (int) – The parameter number of the input. If provided, the value will be retrieved based on the parameter number.
  • Returns: The enum value as a string, or None if not found.
  • Return type: str or None

file_content(*, parameter_name: str | None = None, parameter: int | None = None) → bytes | None

Get the file content as bytes, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter of the input (1 or greater)
  • Returns: The file content as bytes or None if not found
  • Return type: Optional[bytes]

float_value(*, parameter_name: str | None = None, parameter: int | None = None) → float | None

Get the input float value, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • Returns: The value as float or None if not found
  • Return type: Optional[float]

get_inputs(input_class: Type[InputT]) → InputT

Get all input values as specified in a data class.

Example of a data class as input:

@dataclass
class Inputs(InputsBase):
    num_rounds: InputType.INTEGER
    name: InputType.STRING
    config: InputType.JSON
    file_data: InputType.FILE_POLARS_DATAFRAME
  • Parameters: input_class (InputsBase) – A specification of input data to read
  • Returns: The same input class but with all values added
  • Return type: InputT
  • Raises: ValueError – If the number of inputs received and specified do not match

int_value(*, parameter_name: str | None = None, parameter: int | None = None) → int | None

Get the input integer value, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • Returns: The value as integer or None if not found
  • Return type: Optional[int]

json_value(*, parameter_name: str | None = None, parameter: int | None = None) → Any

Get the input value as a JSON dictionary, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • Returns: The value as a JSON dictionary or None if not found
  • Return type: Any

num_inputs() → int

Get the number of input parameters.

  • Returns: The number of input parameters.
  • Return type: int

num_outputs() → int

Get the number of output parameters.

  • Returns: The number of output parameters.
  • Return type: int

pandas_value(*, parameter_name: str | None = None, parameter: int | None = None, converters: Dict[Any, Any] | None = None, has_header: bool = True, index_col: int | None = 0) → DataFrame | None

Get an input as Pandas DataFrame, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • converters (Union [Dict [Any , Any ] , None ] , optional) – None or an override how a column should be parsed to, for example Dict[“My Column”, str]
  • has_header (bool , optional) – True if the first row is the header
  • index_col (Optional [int ] , optional) – Column to use as the row labels of the DataFrame
  • Returns: The value as Pandas DataFrame or None if not found
  • Return type: Optional[pd.DataFrame]

polars_value(*, parameter_name: str | None = None, parameter: int | None = None, converters: Dict[str, Any] | None = None, has_header: bool = True) → DataFrame | None

Get an input as Polars DataFrame, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • converters (Dict [str , Any ] , optional) – None or an override how a column should be parsed to, for example Dict[“My Column”, str]
  • has_header (bool , optional) – True if the first row is the header
  • Returns: The value as Polars DataFrame or None if not found
  • Return type: Optional[polars.DataFrame]

python_module(*, parameter_name: str | None = None, parameter: int | None = None) → ModuleType | None

Get the input as a Python module.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • Returns: The imported Python module
  • Return type: Optional[ModuleType]

read_vector(file: IO[bytes]) → List[float]

Reads a vector of floats from the given file.

  • Parameters: file (IO [bytes ]) – The file object to read from.
  • Returns: The vector of floats read from the file.
  • Return type: List[float]

return_arrow(*, parameter_name: str | None = None, parameter: int | None = None, table: DataFrame | Table) → None

return_bytes(*, parameter_name: str | None = None, parameter: int | None = None, content: bytes | str) → None

Write back data bytes.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the return parameter.
  • parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
  • content (bytes | str) – The bytes or string to write back.
  • Returns: None
  • Return type: None

return_csv(*, parameter_name: str | None = None, parameter: int | None = None, content: DataFrame | DataFrame | Series | Series, index_label: str | None = None) → None

Write back a DataFrame as a CSV file.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the return parameter.
  • parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
  • content (pl.DataFrame | pd.DataFrame | pl.Series | pd.Series) – A Polars or Pandas DataFrame or Series to write back as a CSV file.
  • index_label (str , optional) – The label of the index column in the CSV file.
  • Raises: ValueError – If the content type is invalid.
  • Returns: None
  • Return type: None

return_excel(*, parameter_name: str | None = None, parameter: int | None = None, content: DataFrame | DataFrame | Series | Series, index_label: str | None = None) → None

Write back a DataFrame as an Excel file.

This method writes the provided DataFrame (content) to an Excel file and returns it as an output. The Excel file is temporarily saved as a named temporary file with the .xlsx extension.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the return parameter.
  • parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
  • content (pl.DataFrame | pd.DataFrame | pl.Series | pd.Series) – A Polars or Pandas DataFrame or Series to write back as an Excel file.
  • index_label (str , optional) – The label of the index column in the Excel file.
  • Raises: ValueError – If the content type is invalid.
  • Returns: This method does not return any value directly. Instead, it saves the Excel file and returns it as an output.
  • Return type: None

return_file(file_name: str, *, parameter_name: str | None = None, parameter: int | None = None) → None

Write back a file.

  • Parameters:
  • file_name (str) – The path of the file to return.
  • parameter_name (str , optional) – Either None if not used or the name of the return parameter.
  • parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
  • Returns: None
  • Return type: None

return_outputs(output_class: Type[OutputT], *args: DataFrame | DataFrame | bytes | str | Table) → None

Return all outputs as files.

Example of a data class as output:

@dataclass
class Outputs(OutputBase):
    result1: OutputType.AS_CSV
    result2: OutputType.AS_ARROW
  • Parameters:
  • output_class (OutputBase) – A specification of output data to return
  • args (pd.DataFrame | pl.DataFrame | bytes | str | pa.Table) – Ordered params of the data to return, like DataFrame or bytes
  • Returns: None
  • Return type: None
  • Raises: ValueError – If the number of output parameters and output definitions are not the same

return_vector(*, parameter_name: str | None = None, parameter: int | None = None, content: List[str]) → None

Writes the vector to a file and uploads it to the output URL.

  • Parameters:
  • parameter_name (str , optional) – The name of the output parameter. Defaults to None.
  • parameter (int , optional) – The index of the output parameter. Defaults to None.
  • content (List [str ]) – The vector content to be written to the file.
  • Returns: None
  • Return type: None

string_value(*, parameter_name: str | None = None, parameter: int | None = None) → str | None

Get the input string value, either from the parameter or the name of the input.

  • Parameters:
  • parameter_name (str) – Either None if not used or the name of the input
  • parameter (int) – Either None if not used or the parameter number of the input
  • Returns: The value as string value or None if not found
  • Return type: str or None

usp_file(*, parameter_name: str | None = None, parameter: int | None = None) → str

Get the input as an USP file.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter number of the input
  • Returns: The path of the USP file
  • Return type: str

value(*, parameter_name: str | None = None, parameter: int | None = None) → str | None

Get the value as a string, either from the index or the name of the input.

  • Parameters:
  • parameter_name (str , optional) – Either None if not used or the name of the input
  • parameter (int , optional) – Either None if not used or the parameter of the input (1 or greater)
  • Returns: The value as a string or None if not found
  • Return type: Optional[str]

Dataset

class workflow_utils.dataset.Dataset(df: DataFrame, roles: list[Role] | None = None, schema: Schema | None = None)

Bases: object

cast_categorical(column: str) → None

Casts the specified column in the dataset to a categorical data type.

  • Parameters: column (str) – The name of the column to be casted.
  • Returns: None
  • Return type: None

contextualize(roles: Dict[str, Role] | None = None, date_format: str | None = None) → None

contextualize_arrow_table(df: DataFrame, batchid: str, btime: str) → Table

Contextualizes the Arrow table by adding context information.

  • Parameters:
  • df (pl.DataFrame) – The input DataFrame.
  • batchid (str) – The batch ID.
  • btime (str) – The time of the batch.
  • Returns: The contextualized Arrow table.
  • Return type: pa.Table

contextualize_df(batchid: str, batchtime: str, date_format: str | None = None) → None

This function modifies the dataframe ‘df’ by adding and modifying certain columns.

  • Parameters:
  • batchid (str) – The name of the batch id column.
  • batchtime (str) – The name of the batch time column.
  • date_format (str , optional) – The date format for the batch time column. Defaults to None.
  • Returns: None
  • Return type: None

contextualize_schema(roles: Dict[str, Role] | None = None) → None

This function modifies the schema of the dataframe ‘df’ based on the roles provided.

  • Parameters: roles (Dict [str , Role ] , optional) – A dictionary mapping column names to their roles.
  • Returns: None
  • Return type: None

copy() → Dataset

copy_metadata(source: Schema) → Schema

Copy metadata from a source schema to the current schema.

  • Parameters: source (pa.Schema) – The source schema.
  • Returns: The modified schema.
  • Return type: pa.Schema

classmethod from_arrow_file(file_path: str) → Dataset

classmethod from_filelike(filelike: bytes) → Dataset

generate_primary_id() → None

Generates a primary ID column by concatenating the batch ID and age columns.

If the “Primary ID” column does not exist, it creates it by concatenating the batch ID and age columns. The batch ID column is obtained using the get_batch_id_column method, and the age column is obtained using the get_age_column method.

If the age column does not exist, the method returns without making any changes.

The primary ID column is created by concatenating the batch ID and age columns using the concat_str function from the pyspark.sql.functions module. The resulting column is then inserted at the beginning of the DataFrame using the insert_column method. The RoleType.PRIMARY_ID.value is also inserted at the beginning of the roles list.

  • Returns: None
  • Return type: None

get_age_column() → str | None

This function returns the name of the Age column in the schema.

  • Parameters: schema (pa.schema) – The schema of the dataframe.
  • Returns: The name of the Age column.
  • Return type: str

get_batch_id_column() → str

This function returns the name of the BatchId column in the schema.

  • Parameters: schema (pa.Schema) – The schema of the dataframe.
  • Returns: The name of the BatchId column.
  • Return type: str
  • Raises: ValueError – If the BatchId column is not found in the schema.

get_categorical_columns() → List[str]

Returns a list of column names that contain categorical data.

  • Returns: A list of column names that contain categorical data.
  • Return type: List[str]

get_columns(schema: Schema, roles: list[str]) → List[str]

This function returns the names of the columns in the schema that have a certain role.

  • Parameters:
  • schema (pa.schema) – The schema of the dataframe.
  • roles (list [str ]) – The roles to filter by.
  • Returns: The names of the columns.
  • Return type: list[str]

get_data_columns() → List[str]

This function returns the names of the Data columns in the schema.

  • Parameters: schema (pa.schema) – The schema of the dataframe.
  • Returns: The names of the Data columns.
  • Return type: list[str]

get_non_data_columns() → List[str]

This function returns the names of the columns in the schema that are not Data columns.

  • Parameters: schema (pa.schema) – The schema of the dataframe.
  • Returns: The names of the non-Data columns.
  • Return type: list[str]

get_phase_id_column() → str

Returns the phase ID column from the dataset schema.

If a column named ‘phaseid’ exists in the schema, it is returned. Otherwise, the first categorical column is returned.

  • Returns: The name of the phase ID column.
  • Return type: str

parse_roles() → None

This function parses the roles from the schema.

  • Returns: The roles.
  • Return type: list[Role]

remove_metadata(column: str) → Schema

This function removes the metadata from a column in the schema.

  • Parameters: column (str) – The name of the column.
  • Returns: The modified schema.
  • Return type: pyarrow.Schema

remove_role_metadata(column: str) → None

Removes the metadata from the specified column in the dataset schema.

  • Parameters: column (str) – The name of the column to remove metadata from.
  • Returns: None
  • Return type: None

set_role(column: str, role: Role) → Schema

This function sets the role of a column in the schema.

  • Parameters:
  • column (str) – The name of the column.
  • role (str) – The role to set for the column.
  • Returns: The modified schema.
  • Return type: pyarrow.Schema

to_table() → Table

This function converts the dataframe ‘df’ to an Arrow table and casts it to the stored schema.

  • Returns: The converted Arrow table.
  • Return type: pyarrow.Table

update_after_removal() → None

This function updates the schema after columns have been removed from the dataframe.

  • Returns: None
  • Return type: None

update_roles() → None

Update the roles of the dataset based on the schema metadata.

This method iterates over each field in the schema and checks if it has metadata. If a field has metadata, a new Role object is created using the metadata and added to the roles list. If a field does not have metadata, a new Role object with an empty ColumnType is added to the roles list.

  • Returns: None

class workflow_utils.dataset.RoleType(value)

Bases: Enum

An enumeration.

AGE = Role(ColumnType='Age')

BATCH_ID = Role(ColumnType='BatchId')

DATA = Role(ColumnType='Data')

PRIMARY_ID = Role(ColumnType='PrimaryId')

SECONDARY_ID = Role(ColumnType='SecondaryId')

workflow_utils.dataset.concat_datasets(datasets: List[Dataset]) → Dataset

This function concatenates a list of datasets into a single dataset.

  • Parameters: datasets (List [Dataset ]) – The list of datasets.
  • Returns: The concatenated dataset.
  • Return type: Dataset

workflow_utils.dataset.parse_arrow_table(table: Table) → Dataset

Parse an Arrow table into a Dataset.

  • Parameters: table (pa.Table) – The Arrow table to parse.
  • Returns: The parsed Dataset object.
  • Return type: Dataset
  • Raises: None
  • Example:
>>> table = pa.Table.from_pandas(df)
>>> dataset = parse_arrow_table(table)

workflow_utils.dataset.parse_roles(schema: Schema) → list[Role]

Parse the roles from the given schema.

  • Parameters: schema (pa.Schema) – The schema to parse roles from.
  • Returns: A list of Role objects parsed from the schema.
  • Return type: list[Role]