Consumer

class beneath.Consumer[source]

Consumers are used to replay/subscribe to a table. If the consumer is initialized with a project and subscription name, it will checkpoint its progress to avoid reprocessing the same data every time the process starts.

instance: beneath.instance.TableInstance

The table instance the consumer is subscribed to

cursor: beneath.cursor.Cursor

The cursor used to replay and subscribe the table. You can use it to get the current state of the the underlying replay and changes cursors.

async reset()[source]

Resets the consumer’s replay and changes cursor.

async replay(cb: Callable[[collections.abc.Mapping], Awaitable], max_concurrency: int = 1)[source]

Calls the callback with every historical record in the table in the order they were written. Returns when all historical records have been processed.

Parameters
  • cb (async def fn(record)) – Async function for processing a record.

  • max_concurrency (int) – The maximum number of callbacks to call concurrently. Defaults to 1.

async subscribe(cb: Callable[[collections.abc.Mapping], Awaitable], max_concurrency: int = 1, replay_only: bool = False, changes_only: bool = False, stop_when_idle: bool = False)[source]

Replays the table and subscribes for new changes (runs forever unless stop_when_idle=True or the instance is finalized). Calls the callback for every record.

Parameters
  • cb (async def fn(record)) – Async function for processing a record.

  • max_concurrency (int) – The maximum number of callbacks to call concurrently. Defaults to 1.

  • replay_only (bool) – If true, will not read changes, but only replay historical records. Defaults to False.

  • changes_only (bool) – If true, will not replay historical records, but only subscribe to new changes. Defaults to False.

  • stop_when_idle (bool) – If true, will return when “caught up” and no new changes are available. Defaults to False.

iterate(batches: bool = False, replay_only: bool = False, changes_only: bool = False, stop_when_idle: bool = False)[source]

Replays the table and subscribes for new changes (runs forever unless stop_when_idle=True or the instance is finalized). Yields every record (or batch if batches=True).

Parameters
  • batches (bool) – If true, yields batches of records as they’re loaded (instead of individual records)

  • replay_only (bool) – If true, will not read changes, but only replay historical records. Defaults to False.

  • changes_only (bool) – If true, will not replay historical records, but only subscribe to new changes. Defaults to False.

  • stop_when_idle (bool) – If true, will return when “caught up” and no new changes are available. Defaults to False.