DataStream
- class workflow_utils.datastream.DataStream(config_path: str = '/app/config.json')
- async get_messages(*, parameter_name: str | None = None, parameter: int | None = None, count: int = 100) list[Message]
Get messages from a data stream as a list of Message objects.
- Parameters:
parameter_name (str, optional) – Either None if not used or the name of the input
parameter (int, optional) – Either None if not used or the parameter number of the input
- Returns:
The messages from the data stream as a list of Message objects or an empty list if no messages are available
- Return type:
list[Message]
- async publish_message(data: DataFrame | DataFrame | Series | Series | bytes, *, parameter_name: str | None = None, parameter: int | None = None, schema: TableSchema | None = None) None
Publish a message to a data stream.
- Parameters:
data (pd.DataFrame | pl.DataFrame | pl.Series | pd.Series | bytes) – A Polars or Pandas DataFrame or binary data publish on the queue.
parameter_name (str, optional) – Either None if not used or the name of the return parameter.
parameter (int, optional) – Either None if not used or the parameter number to return, 1 or greater.
- Returns:
None
- Return type:
None
- class workflow_utils.datastream.Message(data: bytes)
Represents a message from a data stream.
- schema() list[ColumnSchema]
Get the contextualization information of the message, returned as a list of ColumnInfo for each column.
- to_binary() bytes
Convert the message to binary data.
- to_pandas() DataFrame
Convert the message to a Pandas DataFrame.
- to_polars() DataFrame
Convert the message to a Polars DataFrame.
- class workflow_utils.datastream.NatsMetadata(jwt: str, jet_stream_name: str, consumer_name: str, seed: str, server_url: str, inbox_prefix: str)
- consumer_name: str
- inbox_prefix: str
- jet_stream_name: str
- jwt: str
- seed: str
- server_url: str
- workflow_utils.datastream.datastream_to_pandas(data: bytes) DataFrame
Convert a byte stream of Arrow data to a Pandas DataFrame.
This function reads a byte stream containing Arrow IPC data and converts it into a Pandas DataFrame.
- Parameters:
data (bytes) – Byte stream containing Arrow IPC data.
- Returns:
DataFrame containing the data from the byte stream.
- Return type:
pd.DataFrame
- workflow_utils.datastream.datastream_to_polars(data: bytes) DataFrame
Convert a byte stream to a Polars DataFrame.
This function reads a byte stream containing Arrow IPC data, converts it to an Arrow Table, and then converts the Arrow Table to a Polars DataFrame.
- Parameters:
data (bytes) – The byte stream containing Arrow IPC data.
- Returns:
The resulting Polars DataFrame.
- Return type:
pl.DataFrame
- Raises:
ValueError – If the conversion results in a Polars Series instead of a DataFrame.
- workflow_utils.datastream.pandas_to_datastream(df: DataFrame, schema: list[ColumnSchema] | None = None) bytes
Convert a pandas DataFrame to a serialized datastream in bytes. This function takes a pandas DataFrame and an optional schema, converts the DataFrame to a PyArrow Table, and then serializes it to an IPC message in bytes format. :param df: The pandas DataFrame to be converted. :type df: pd.DataFrame :param schema: Optional list of ColumnSchema to define the schema of the resulting table.
If None, an empty schema will be used.
- Returns:
The serialized datastream in bytes.
- Return type:
bytes
- workflow_utils.datastream.polars_to_datastream(df: DataFrame, schema: list[ColumnSchema] | None = None) bytes
Convert a Polars DataFrame to a serialized data stream in IPC format. This function takes a Polars DataFrame and an optional schema, converts the DataFrame to an Apache Arrow Table, and then serializes it to an IPC message in bytes format. :param df: The Polars DataFrame to be converted. :type df: pl.DataFrame :param schema: An optional list of column schemas to be applied to the DataFrame. :type schema: Optional[List[ColumnSchema]] :return: The serialized data stream in bytes. :rtype: bytes