DataStream

class workflow_utils.datastream.DataStream(config_path: str = '/app/config.json')
async get_messages(*, parameter_name: str | None = None, parameter: int | None = None, count: int = 100) list[Message]

Get messages from a data stream as a list of Message objects.

Parameters:
  • parameter_name (str, optional) – Either None if not used or the name of the input

  • parameter (int, optional) – Either None if not used or the parameter number of the input

Returns:

The messages from the data stream as a list of Message objects or an empty list if no messages are available

Return type:

list[Message]

async publish_message(data: DataFrame | DataFrame | Series | Series | bytes, *, parameter_name: str | None = None, parameter: int | None = None, schema: TableSchema | None = None) None

Publish a message to a data stream.

Parameters:
  • data (pd.DataFrame | pl.DataFrame | pl.Series | pd.Series | bytes) – A Polars or Pandas DataFrame or binary data publish on the queue.

  • parameter_name (str, optional) – Either None if not used or the name of the return parameter.

  • parameter (int, optional) – Either None if not used or the parameter number to return, 1 or greater.

Returns:

None

Return type:

None

class workflow_utils.datastream.Message(data: bytes)

Represents a message from a data stream.

schema() list[ColumnSchema]

Get the contextualization information of the message, returned as a list of ColumnInfo for each column.

to_binary() bytes

Convert the message to binary data.

to_pandas() DataFrame

Convert the message to a Pandas DataFrame.

to_polars() DataFrame

Convert the message to a Polars DataFrame.

class workflow_utils.datastream.NatsMetadata(jwt: str, jet_stream_name: str, consumer_name: str, seed: str, server_url: str, inbox_prefix: str)
consumer_name: str
inbox_prefix: str
jet_stream_name: str
jwt: str
seed: str
server_url: str