DataStream
- class workflow_utils.datastream.DataStream(config_path: str = '/app/config.json')
- async get_messages(*, parameter_name: str | None = None, parameter: int | None = None, count: int = 100) list[Message]
Get messages from a data stream as a list of Message objects.
- Parameters:
parameter_name (str, optional) – Either None if not used or the name of the input
parameter (int, optional) – Either None if not used or the parameter number of the input
- Returns:
The messages from the data stream as a list of Message objects or an empty list if no messages are available
- Return type:
list[Message]
- async publish_message(data: DataFrame | DataFrame | Series | Series | bytes, *, parameter_name: str | None = None, parameter: int | None = None, schema: TableSchema | None = None) None
Publish a message to a data stream.
- Parameters:
data (pd.DataFrame | pl.DataFrame | pl.Series | pd.Series | bytes) – A Polars or Pandas DataFrame or binary data publish on the queue.
parameter_name (str, optional) – Either None if not used or the name of the return parameter.
parameter (int, optional) – Either None if not used or the parameter number to return, 1 or greater.
- Returns:
None
- Return type:
None
- class workflow_utils.datastream.Message(data: bytes)
Represents a message from a data stream.
- schema() list[ColumnSchema]
Get the contextualization information of the message, returned as a list of ColumnInfo for each column.
- to_binary() bytes
Convert the message to binary data.
- to_pandas() DataFrame
Convert the message to a Pandas DataFrame.
- to_polars() DataFrame
Convert the message to a Polars DataFrame.