Easy helpers¶
Beneath includes a handful of “easy” helpers that wrap several common steps in one package-level operation.
-
async
beneath.easy.
consume
(table_path: str, cb: Callable[[collections.abc.Mapping], Awaitable], version: Optional[int] = None, replay_only: bool = False, changes_only: bool = False, subscription_path: Optional[str] = None, reset_subscription: bool = False, stop_when_idle: bool = False, max_concurrency: int = 1)[source]¶ Shorthand for creating a client, creating a table consumer, replaying the table’s history and subscribing to changes.
- Parameters
table_path (str) – Path to the table to subscribe to. The consumer will subscribe to the table’s primary version.
cb (async def fn(record)) – Async function for processing a record.
version (int) – The instance version to use for table. If not set, uses the primary instance.
replay_only (bool) – If true, will not read changes, but only replay historical records. Defaults to False.
changes_only (bool) – If true, will not replay historical records, but only subscribe to new changes. Defaults to False.
subscription_path (str) – Format “ORGANIZATION/PROJECT/NAME”. If set, the consumer will use a checkpointer to save cursors. That means processing will not restart from scratch if the process ends or crashes (as long as you use the same subscription name). To reset a subscription, call
reset
on the consumer.reset_subscription (bool) – If true and
subscription_path
is set, will reset the consumer and start the subscription from scratch.stop_when_idle (bool) – If true, will return when “caught up” and no new changes are available. Defaults to False.
max_concurrency (int) – The maximum number of callbacks to call concurrently. Defaults to 1.
-
async
beneath.easy.
load_full
(table_path: str, version: Optional[int] = None, filter: Optional[str] = None, to_dataframe=True, max_bytes=26214400, max_records=None, batch_size=1000, warn_max=True) → Iterable[collections.abc.Mapping][source]¶ Shorthand for creating a client, finding a table, and reading (filtered) records from it.
- Parameters
table_path (str) – The path to the table in the format of “USERNAME/PROJECT/TABLE”
version (int) – The instance version to read from. If not set, defaults to the table’s primary instance.
filter (str) – A filter to apply to the index. Filters allow you to quickly find specific record(s) in the index based on the record key. For details on the filter syntax, see https://about.beneath.dev/docs/reading-writing-data/index-filters/.
to_dataframe (bool) – If true, will return the result as a Pandas dataframe. Defaults to true.
max_bytes (int) – Sets the maximum number of bytes to read before returning with a warning. Defaults to 25 MB (Avro-encoded).
max_records (int) – Sets the maximum number of records to read before returning with a warning. Defaults to unlimited (see
max_bytes
).batch_size (int) – Sets the number of records to fetch in each network request. Defaults to 1000. One call to
query_index
may make many network requests (until all records have been loaded ormax_bytes
ormax_records
is breached).warn_max (bool) – If true, will emit a warning if
max_bytes
ormax_records
were breached and the function returned without loading the full result. Defaults to true.
-
async
beneath.easy.
query_warehouse
(sql: str, analyze: bool = False, max_bytes_scanned: int = 10000000000, to_dataframe=True, max_bytes=26214400, max_records=None, batch_size=1000, warn_max=True) → Union[Iterable[collections.abc.Mapping], beneath.job.Job][source]¶ Shorthand for creating a client, running a warehouse (OLAP) query, and reading the result. If analyze=True, the analyzed job is returned instead of the result.
- Parameters
sql (str) – The analytical SQL query to run. To learn about the query language, see https://about.beneath.dev/docs/reading-writing-data/warehouse-queries/.
analyze (bool) – If true, analyzes the query and returns info about referenced tables and expected bytes scanned, but doesn’t actually run the query.
max_bytes_scanned (int) – Sets a limit on the number of bytes the query can scan. If exceeded, the job will fail with an error. Defaults to 10GB.
to_dataframe (bool) – If true, will return the result as a Pandas dataframe. Defaults to true.
max_bytes (int) – Sets the maximum number of bytes to read before returning with a warning. Defaults to 25 MB (Avro-encoded).
max_records (int) – Sets the maximum number of records to read before returning with a warning. Defaults to unlimited (see
max_bytes
).batch_size (int) – Sets the number of records to fetch in each network request. Defaults to 1000. One call to
query_index
may make many network requests (until all records have been loaded ormax_bytes
ormax_records
is breached).warn_max (bool) – If true, will emit a warning if
max_bytes
ormax_records
were breached and the function returned without loading the full result. Defaults to true.
-
async
beneath.easy.
write_full
(table_path: str, records: Union[Iterable[dict], pandas.core.frame.DataFrame], key: Optional[Union[str, List[str]]] = None, description: Optional[str] = None, recreate_on_schema_change=False)[source]¶ Shorthand for creating a client, inferring a table schema, creating a table, and writing data to the table. It wraps
Client.write_full
. Callingwrite_full
multiple times for the same table, will create and write to multiple versions, but will not append data an existing version.- Parameters
table_path (str) – The (desired) path to the table in the format of “USERNAME/PROJECT/TABLE”. The project must already exist.
records (list(dict) | pandas.DataFrame) – The full dataset to write, either as a list of records or as a Pandas DataFrame. This function uses
beneath.infer_avro
to infer a schema for the table based on the records.key (str | list(str)) – The fields to use as the table’s key. If not set, will default to the dataframe index if
records
is a Pandas DataFrame, or add a column of incrementing numbers ifrecords
is a list.description (str) – A description for the table.
recreate_on_schema_change (bool) – If true, and there’s an existing table at
table_path
with a schema that is incompatible with the inferred schema forrecords
, it will delete the existing table and create a new one instead of throwing an error. Defaults to false.