Workflow IO
class workflow_utils.workflow_io.WorkflowIo(config_path: str = '/app/config.json')
Bases: object
argument_type(parameter: int) → ArgumentType
Get the type of an input parameter.
- Parameters: parameter (int) – The parameter number of the input (1 or greater)
- Returns: The type of the input parameter, such as INTEGER, STRING, etc.
- Return type: ArgumentType
enum_value(*, parameter_name: str | None = None, parameter: int | None = None) → str | None
Get the input enum value as a string, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str) – The name of the input parameter. If provided, the value will be retrieved based on the parameter name.
- parameter (int) – The parameter number of the input. If provided, the value will be retrieved based on the parameter number.
- Returns: The enum value as a string, or None if not found.
- Return type: str or None
file_content(*, parameter_name: str | None = None, parameter: int | None = None) → bytes | None
Get the file content as bytes, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter of the input (1 or greater)
- Returns: The file content as bytes or None if not found
- Return type: Optional[bytes]
float_value(*, parameter_name: str | None = None, parameter: int | None = None) → float | None
Get the input float value, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- Returns: The value as float or None if not found
- Return type: Optional[float]
get_inputs(input_class: Type[InputT]) → InputT
Get all input values as specified in a data class.
Example of a data class as input:
@dataclass
class Inputs(InputsBase):
num_rounds: InputType.INTEGER
name: InputType.STRING
config: InputType.JSON
file_data: InputType.FILE_POLARS_DATAFRAME
- Parameters: input_class (InputsBase) – A specification of input data to read
- Returns: The same input class but with all values added
- Return type: InputT
- Raises: ValueError – If the number of inputs received and specified do not match
int_value(*, parameter_name: str | None = None, parameter: int | None = None) → int | None
Get the input integer value, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- Returns: The value as integer or None if not found
- Return type: Optional[int]
json_value(*, parameter_name: str | None = None, parameter: int | None = None) → Any
Get the input value as a JSON dictionary, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- Returns: The value as a JSON dictionary or None if not found
- Return type: Any
num_inputs() → int
Get the number of input parameters.
- Returns: The number of input parameters.
- Return type: int
num_outputs() → int
Get the number of output parameters.
- Returns: The number of output parameters.
- Return type: int
pandas_value(*, parameter_name: str | None = None, parameter: int | None = None, converters: Dict[Any, Any] | None = None, has_header: bool = True, index_col: int | None = 0) → DataFrame | None
Get an input as Pandas DataFrame, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- converters (Union [Dict [Any , Any ] , None ] , optional) – None or an override how a column should be parsed to, for example Dict[“My Column”, str]
- has_header (bool , optional) – True if the first row is the header
- index_col (Optional [int ] , optional) – Column to use as the row labels of the DataFrame
- Returns: The value as Pandas DataFrame or None if not found
- Return type: Optional[pd.DataFrame]
polars_value(*, parameter_name: str | None = None, parameter: int | None = None, converters: Dict[str, Any] | None = None, has_header: bool = True) → DataFrame | None
Get an input as Polars DataFrame, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- converters (Dict [str , Any ] , optional) – None or an override how a column should be parsed to, for example Dict[“My Column”, str]
- has_header (bool , optional) – True if the first row is the header
- Returns: The value as Polars DataFrame or None if not found
- Return type: Optional[polars.DataFrame]
python_module(*, parameter_name: str | None = None, parameter: int | None = None) → ModuleType | None
Get the input as a Python module.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- Returns: The imported Python module
- Return type: Optional[ModuleType]
read_vector(file: IO[bytes]) → List[float]
Reads a vector of floats from the given file.
- Parameters: file (IO [bytes ]) – The file object to read from.
- Returns: The vector of floats read from the file.
- Return type: List[float]
return_arrow(*, parameter_name: str | None = None, parameter: int | None = None, table: DataFrame | Table) → None
return_bytes(*, parameter_name: str | None = None, parameter: int | None = None, content: bytes | str) → None
Write back data bytes.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the return parameter.
- parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
- content (bytes | str) – The bytes or string to write back.
- Returns: None
- Return type: None
return_csv(*, parameter_name: str | None = None, parameter: int | None = None, content: DataFrame | DataFrame | Series | Series, index_label: str | None = None) → None
Write back a DataFrame as a CSV file.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the return parameter.
- parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
- content (pl.DataFrame | pd.DataFrame | pl.Series | pd.Series) – A Polars or Pandas DataFrame or Series to write back as a CSV file.
- index_label (str , optional) – The label of the index column in the CSV file.
- Raises: ValueError – If the content type is invalid.
- Returns: None
- Return type: None
return_excel(*, parameter_name: str | None = None, parameter: int | None = None, content: DataFrame | DataFrame | Series | Series, index_label: str | None = None) → None
Write back a DataFrame as an Excel file.
This method writes the provided DataFrame (content) to an Excel file and returns it as an output. The Excel file is temporarily saved as a named temporary file with the .xlsx extension.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the return parameter.
- parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
- content (pl.DataFrame | pd.DataFrame | pl.Series | pd.Series) – A Polars or Pandas DataFrame or Series to write back as an Excel file.
- index_label (str , optional) – The label of the index column in the Excel file.
- Raises: ValueError – If the content type is invalid.
- Returns: This method does not return any value directly. Instead, it saves the Excel file and returns it as an output.
- Return type: None
return_file(file_name: str, *, parameter_name: str | None = None, parameter: int | None = None) → None
Write back a file.
- Parameters:
- file_name (str) – The path of the file to return.
- parameter_name (str , optional) – Either None if not used or the name of the return parameter.
- parameter (int , optional) – Either None if not used or the parameter number to return, 1 or greater.
- Returns: None
- Return type: None
return_outputs(output_class: Type[OutputT], *args: DataFrame | DataFrame | bytes | str | Table) → None
Return all outputs as files.
Example of a data class as output:
@dataclass
class Outputs(OutputBase):
result1: OutputType.AS_CSV
result2: OutputType.AS_ARROW
- Parameters:
- output_class (OutputBase) – A specification of output data to return
- args (pd.DataFrame | pl.DataFrame | bytes | str | pa.Table) – Ordered params of the data to return, like DataFrame or bytes
- Returns: None
- Return type: None
- Raises: ValueError – If the number of output parameters and output definitions are not the same
return_vector(*, parameter_name: str | None = None, parameter: int | None = None, content: List[str]) → None
Writes the vector to a file and uploads it to the output URL.
- Parameters:
- parameter_name (str , optional) – The name of the output parameter. Defaults to None.
- parameter (int , optional) – The index of the output parameter. Defaults to None.
- content (List [str ]) – The vector content to be written to the file.
- Returns: None
- Return type: None
string_value(*, parameter_name: str | None = None, parameter: int | None = None) → str | None
Get the input string value, either from the parameter or the name of the input.
- Parameters:
- parameter_name (str) – Either None if not used or the name of the input
- parameter (int) – Either None if not used or the parameter number of the input
- Returns: The value as string value or None if not found
- Return type: str or None
usp_file(*, parameter_name: str | None = None, parameter: int | None = None) → str
Get the input as an USP file.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter number of the input
- Returns: The path of the USP file
- Return type: str
value(*, parameter_name: str | None = None, parameter: int | None = None) → str | None
Get the value as a string, either from the index or the name of the input.
- Parameters:
- parameter_name (str , optional) – Either None if not used or the name of the input
- parameter (int , optional) – Either None if not used or the parameter of the input (1 or greater)
- Returns: The value as a string or None if not found
- Return type: Optional[str]
Dataset
class workflow_utils.dataset.Dataset(df: DataFrame, roles: list[Role] | None = None, schema: Schema | None = None)
Bases: object
cast_categorical(column: str) → None
Casts the specified column in the dataset to a categorical data type.
- Parameters: column (str) – The name of the column to be casted.
- Returns: None
- Return type: None
contextualize(roles: Dict[str, Role] | None = None, date_format: str | None = None) → None
contextualize_arrow_table(df: DataFrame, batchid: str, btime: str) → Table
Contextualizes the Arrow table by adding context information.
- Parameters:
- df (pl.DataFrame) – The input DataFrame.
- batchid (str) – The batch ID.
- btime (str) – The time of the batch.
- Returns: The contextualized Arrow table.
- Return type: pa.Table
contextualize_df(batchid: str, batchtime: str, date_format: str | None = None) → None
This function modifies the dataframe ‘df’ by adding and modifying certain columns.
- Parameters:
- batchid (str) – The name of the batch id column.
- batchtime (str) – The name of the batch time column.
- date_format (str , optional) – The date format for the batch time column. Defaults to None.
- Returns: None
- Return type: None
contextualize_schema(roles: Dict[str, Role] | None = None) → None
This function modifies the schema of the dataframe ‘df’ based on the roles provided.
- Parameters: roles (Dict [str , Role ] , optional) – A dictionary mapping column names to their roles.
- Returns: None
- Return type: None
copy() → Dataset
copy_metadata(source: Schema) → Schema
Copy metadata from a source schema to the current schema.
- Parameters: source (pa.Schema) – The source schema.
- Returns: The modified schema.
- Return type: pa.Schema
classmethod from_arrow_file(file_path: str) → Dataset
classmethod from_filelike(filelike: bytes) → Dataset
generate_primary_id() → None
Generates a primary ID column by concatenating the batch ID and age columns.
If the “Primary ID” column does not exist, it creates it by concatenating the batch ID and age columns. The batch ID column is obtained using the get_batch_id_column method, and the age column is obtained using the get_age_column method.
If the age column does not exist, the method returns without making any changes.
The primary ID column is created by concatenating the batch ID and age columns using the concat_str function from the pyspark.sql.functions module. The resulting column is then inserted at the beginning of the DataFrame using the insert_column method. The RoleType.PRIMARY_ID.value is also inserted at the beginning of the roles list.
- Returns: None
- Return type: None
get_age_column() → str | None
This function returns the name of the Age column in the schema.
- Parameters: schema (pa.schema) – The schema of the dataframe.
- Returns: The name of the Age column.
- Return type: str
get_batch_id_column() → str
This function returns the name of the BatchId column in the schema.
- Parameters: schema (pa.Schema) – The schema of the dataframe.
- Returns: The name of the BatchId column.
- Return type: str
- Raises: ValueError – If the BatchId column is not found in the schema.
get_categorical_columns() → List[str]
Returns a list of column names that contain categorical data.
- Returns: A list of column names that contain categorical data.
- Return type: List[str]
get_columns(schema: Schema, roles: list[str]) → List[str]
This function returns the names of the columns in the schema that have a certain role.
- Parameters:
- schema (pa.schema) – The schema of the dataframe.
- roles (list [str ]) – The roles to filter by.
- Returns: The names of the columns.
- Return type: list[str]
get_data_columns() → List[str]
This function returns the names of the Data columns in the schema.
- Parameters: schema (pa.schema) – The schema of the dataframe.
- Returns: The names of the Data columns.
- Return type: list[str]
get_non_data_columns() → List[str]
This function returns the names of the columns in the schema that are not Data columns.
- Parameters: schema (pa.schema) – The schema of the dataframe.
- Returns: The names of the non-Data columns.
- Return type: list[str]
get_phase_id_column() → str
Returns the phase ID column from the dataset schema.
If a column named ‘phaseid’ exists in the schema, it is returned. Otherwise, the first categorical column is returned.
- Returns: The name of the phase ID column.
- Return type: str
parse_roles() → None
This function parses the roles from the schema.
- Returns: The roles.
- Return type: list[Role]
remove_metadata(column: str) → Schema
This function removes the metadata from a column in the schema.
- Parameters: column (str) – The name of the column.
- Returns: The modified schema.
- Return type: pyarrow.Schema
remove_role_metadata(column: str) → None
Removes the metadata from the specified column in the dataset schema.
- Parameters: column (str) – The name of the column to remove metadata from.
- Returns: None
- Return type: None
set_role(column: str, role: Role) → Schema
This function sets the role of a column in the schema.
- Parameters:
- column (str) – The name of the column.
- role (str) – The role to set for the column.
- Returns: The modified schema.
- Return type: pyarrow.Schema
to_table() → Table
This function converts the dataframe ‘df’ to an Arrow table and casts it to the stored schema.
- Returns: The converted Arrow table.
- Return type: pyarrow.Table
update_after_removal() → None
This function updates the schema after columns have been removed from the dataframe.
- Returns: None
- Return type: None
update_roles() → None
Update the roles of the dataset based on the schema metadata.
This method iterates over each field in the schema and checks if it has metadata. If a field has metadata, a new Role object is created using the metadata and added to the roles list. If a field does not have metadata, a new Role object with an empty ColumnType is added to the roles list.
- Returns: None
class workflow_utils.dataset.RoleType(value)
Bases: Enum
An enumeration.
AGE = Role(ColumnType='Age')
BATCH_ID = Role(ColumnType='BatchId')
DATA = Role(ColumnType='Data')
PRIMARY_ID = Role(ColumnType='PrimaryId')
SECONDARY_ID = Role(ColumnType='SecondaryId')
workflow_utils.dataset.concat_datasets(datasets: List[Dataset]) → Dataset
This function concatenates a list of datasets into a single dataset.
- Parameters: datasets (List [Dataset ]) – The list of datasets.
- Returns: The concatenated dataset.
- Return type: Dataset
workflow_utils.dataset.parse_arrow_table(table: Table) → Dataset
Parse an Arrow table into a Dataset.
- Parameters: table (pa.Table) – The Arrow table to parse.
- Returns: The parsed Dataset object.
- Return type: Dataset
- Raises: None
- Example:
>>> table = pa.Table.from_pandas(df)
>>> dataset = parse_arrow_table(table)
workflow_utils.dataset.parse_roles(schema: Schema) → list[Role]
Parse the roles from the given schema.
- Parameters: schema (pa.Schema) – The schema to parse roles from.
- Returns: A list of Role objects parsed from the schema.
- Return type: list[Role]