Easy helpers

Beneath includes a handful of “easy” helpers that wrap several common steps in one package-level operation.

async beneath.easy.consume(table_path: str, cb: Callable[[collections.abc.Mapping], Awaitable], version: Optional[int] = None, replay_only: bool = False, changes_only: bool = False, subscription_path: Optional[str] = None, reset_subscription: bool = False, stop_when_idle: bool = False, max_concurrency: int = 1)[source]

Shorthand for creating a client, creating a table consumer, replaying the table’s history and subscribing to changes.

Parameters
  • table_path (str) – Path to the table to subscribe to. The consumer will subscribe to the table’s primary version.

  • cb (async def fn(record)) – Async function for processing a record.

  • version (int) – The instance version to use for table. If not set, uses the primary instance.

  • replay_only (bool) – If true, will not read changes, but only replay historical records. Defaults to False.

  • changes_only (bool) – If true, will not replay historical records, but only subscribe to new changes. Defaults to False.

  • subscription_path (str) – Format “ORGANIZATION/PROJECT/NAME”. If set, the consumer will use a checkpointer to save cursors. That means processing will not restart from scratch if the process ends or crashes (as long as you use the same subscription name). To reset a subscription, call reset on the consumer.

  • reset_subscription (bool) – If true and subscription_path is set, will reset the consumer and start the subscription from scratch.

  • stop_when_idle (bool) – If true, will return when “caught up” and no new changes are available. Defaults to False.

  • max_concurrency (int) – The maximum number of callbacks to call concurrently. Defaults to 1.

async beneath.easy.load_full(table_path: str, version: Optional[int] = None, filter: Optional[str] = None, to_dataframe=True, max_bytes=26214400, max_records=None, batch_size=1000, warn_max=True)Iterable[collections.abc.Mapping][source]

Shorthand for creating a client, finding a table, and reading (filtered) records from it.

Parameters
  • table_path (str) – The path to the table in the format of “USERNAME/PROJECT/TABLE”

  • version (int) – The instance version to read from. If not set, defaults to the table’s primary instance.

  • filter (str) – A filter to apply to the index. Filters allow you to quickly find specific record(s) in the index based on the record key. For details on the filter syntax, see https://about.beneath.dev/docs/reading-writing-data/index-filters/.

  • to_dataframe (bool) – If true, will return the result as a Pandas dataframe. Defaults to true.

  • max_bytes (int) – Sets the maximum number of bytes to read before returning with a warning. Defaults to 25 MB (Avro-encoded).

  • max_records (int) – Sets the maximum number of records to read before returning with a warning. Defaults to unlimited (see max_bytes).

  • batch_size (int) – Sets the number of records to fetch in each network request. Defaults to 1000. One call to query_index may make many network requests (until all records have been loaded or max_bytes or max_records is breached).

  • warn_max (bool) – If true, will emit a warning if max_bytes or max_records were breached and the function returned without loading the full result. Defaults to true.

async beneath.easy.query_warehouse(sql: str, analyze: bool = False, max_bytes_scanned: int = 10000000000, to_dataframe=True, max_bytes=26214400, max_records=None, batch_size=1000, warn_max=True)Union[Iterable[collections.abc.Mapping], beneath.job.Job][source]

Shorthand for creating a client, running a warehouse (OLAP) query, and reading the result. If analyze=True, the analyzed job is returned instead of the result.

Parameters
  • sql (str) – The analytical SQL query to run. To learn about the query language, see https://about.beneath.dev/docs/reading-writing-data/warehouse-queries/.

  • analyze (bool) – If true, analyzes the query and returns info about referenced tables and expected bytes scanned, but doesn’t actually run the query.

  • max_bytes_scanned (int) – Sets a limit on the number of bytes the query can scan. If exceeded, the job will fail with an error. Defaults to 10GB.

  • to_dataframe (bool) – If true, will return the result as a Pandas dataframe. Defaults to true.

  • max_bytes (int) – Sets the maximum number of bytes to read before returning with a warning. Defaults to 25 MB (Avro-encoded).

  • max_records (int) – Sets the maximum number of records to read before returning with a warning. Defaults to unlimited (see max_bytes).

  • batch_size (int) – Sets the number of records to fetch in each network request. Defaults to 1000. One call to query_index may make many network requests (until all records have been loaded or max_bytes or max_records is breached).

  • warn_max (bool) – If true, will emit a warning if max_bytes or max_records were breached and the function returned without loading the full result. Defaults to true.

async beneath.easy.write_full(table_path: str, records: Union[Iterable[dict], pandas.core.frame.DataFrame], key: Optional[Union[str, List[str]]] = None, description: Optional[str] = None, recreate_on_schema_change=False)[source]

Shorthand for creating a client, inferring a table schema, creating a table, and writing data to the table. It wraps Client.write_full. Calling write_full multiple times for the same table, will create and write to multiple versions, but will not append data an existing version.

Parameters
  • table_path (str) – The (desired) path to the table in the format of “USERNAME/PROJECT/TABLE”. The project must already exist.

  • records (list(dict) | pandas.DataFrame) – The full dataset to write, either as a list of records or as a Pandas DataFrame. This function uses beneath.infer_avro to infer a schema for the table based on the records.

  • key (str | list(str)) – The fields to use as the table’s key. If not set, will default to the dataframe index if records is a Pandas DataFrame, or add a column of incrementing numbers if records is a list.

  • description (str) – A description for the table.

  • recreate_on_schema_change (bool) – If true, and there’s an existing table at table_path with a schema that is incompatible with the inferred schema for records, it will delete the existing table and create a new one instead of throwing an error. Defaults to false.