DataStream

class workflow_utils.datastream.DataStream(config_path: str = '/app/config.json')
async get_messages(*, parameter_name: str | None = None, parameter: int | None = None, count: int = 100) list[Message]

Get messages from a data stream as a list of Message objects.

Parameters:
  • parameter_name (str, optional) – Either None if not used or the name of the input

  • parameter (int, optional) – Either None if not used or the parameter number of the input

Returns:

The messages from the data stream as a list of Message objects or an empty list if no messages are available

Return type:

list[Message]

async publish_message(data: DataFrame | DataFrame | Series | Series | bytes, *, parameter_name: str | None = None, parameter: int | None = None, schema: TableSchema | None = None) None

Publish a message to a data stream.

Parameters:
  • data (pd.DataFrame | pl.DataFrame | pl.Series | pd.Series | bytes) – A Polars or Pandas DataFrame or binary data publish on the queue.

  • parameter_name (str, optional) – Either None if not used or the name of the return parameter.

  • parameter (int, optional) – Either None if not used or the parameter number to return, 1 or greater.

Returns:

None

Return type:

None

class workflow_utils.datastream.Message(data: bytes)

Represents a message from a data stream.

schema() list[ColumnSchema]

Get the contextualization information of the message, returned as a list of ColumnInfo for each column.

to_binary() bytes

Convert the message to binary data.

to_pandas() DataFrame

Convert the message to a Pandas DataFrame.

to_polars() DataFrame

Convert the message to a Polars DataFrame.

class workflow_utils.datastream.NatsMetadata(jwt: str, jet_stream_name: str, consumer_name: str, seed: str, server_url: str, inbox_prefix: str)
consumer_name: str
inbox_prefix: str
jet_stream_name: str
jwt: str
seed: str
server_url: str
workflow_utils.datastream.datastream_to_pandas(data: bytes) DataFrame

Convert a byte stream of Arrow data to a Pandas DataFrame.

This function reads a byte stream containing Arrow IPC data and converts it into a Pandas DataFrame.

Parameters:

data (bytes) – Byte stream containing Arrow IPC data.

Returns:

DataFrame containing the data from the byte stream.

Return type:

pd.DataFrame

workflow_utils.datastream.datastream_to_polars(data: bytes) DataFrame

Convert a byte stream to a Polars DataFrame.

This function reads a byte stream containing Arrow IPC data, converts it to an Arrow Table, and then converts the Arrow Table to a Polars DataFrame.

Parameters:

data (bytes) – The byte stream containing Arrow IPC data.

Returns:

The resulting Polars DataFrame.

Return type:

pl.DataFrame

Raises:

ValueError – If the conversion results in a Polars Series instead of a DataFrame.

workflow_utils.datastream.pandas_to_datastream(df: DataFrame, schema: list[ColumnSchema] | None = None) bytes

Convert a pandas DataFrame to a serialized datastream in bytes. This function takes a pandas DataFrame and an optional schema, converts the DataFrame to a PyArrow Table, and then serializes it to an IPC message in bytes format. :param df: The pandas DataFrame to be converted. :type df: pd.DataFrame :param schema: Optional list of ColumnSchema to define the schema of the resulting table.

If None, an empty schema will be used.

Returns:

The serialized datastream in bytes.

Return type:

bytes

workflow_utils.datastream.polars_to_datastream(df: DataFrame, schema: list[ColumnSchema] | None = None) bytes

Convert a Polars DataFrame to a serialized data stream in IPC format. This function takes a Polars DataFrame and an optional schema, converts the DataFrame to an Apache Arrow Table, and then serializes it to an IPC message in bytes format. :param df: The Polars DataFrame to be converted. :type df: pl.DataFrame :param schema: An optional list of column schemas to be applied to the DataFrame. :type schema: Optional[List[ColumnSchema]] :return: The serialized data stream in bytes. :rtype: bytes