Client

class beneath.Client(secret: Optional[str] = None, dry: bool = False, write_delay_ms: int = 1000)[source]

The main class for interacting with Beneath. Data-related features (like defining tables and reading/writing data) are implemented directly on Client, while control-plane features (like creating projects) are isolated in the admin member.

Parameters
  • secret (str) – A beneath secret to use for authentication. If not set, uses the BENEATH_SECRET environment variable, and if that is not set either, uses the secret authenticated in the CLI (stored in ~/.beneath).

  • dry (bool) –

    If true, the client will not perform any mutations or writes, but generally perform reads as usual. It’s useful for testing.

    The exact implication differs for different operations: Some mutations will be mocked, such as creating a table, others will fail with an exception. Write operations log records to the logger instead of transmitting to the server. Reads generally work, but throw an exception when called on mocked resources.

  • write_delay_ms (int) – The maximum amount of time to buffer written records before sending a batch write request over the network. Defaults to 1 second (1000 ms). Writing records in batches reduces the number of requests, which leads to lower cost (Beneath charges at least 1kb per request).

async find_table(table_path: str)beneath.table.Table[source]

Finds an existing table and returns an object that you can use to read and write from/to the table.

Parameters

path (str) – The path to the table in the format of “USERNAME/PROJECT/TABLE”

async create_table(table_path: str, schema: str, description: Optional[str] = None, meta: Optional[bool] = None, use_index: Optional[bool] = None, use_warehouse: Optional[bool] = None, retention: Optional[datetime.timedelta] = None, log_retention: Optional[datetime.timedelta] = None, index_retention: Optional[datetime.timedelta] = None, warehouse_retention: Optional[datetime.timedelta] = None, schema_kind: str = 'GraphQL', indexes: Optional[str] = None, update_if_exists: Optional[bool] = None)beneath.table.Table[source]

Creates (or optionally updates if update_if_exists=True) a table and returns it.

Parameters
  • table_path (str) – The (desired) path to the table in the format of “USERNAME/PROJECT/TABLE”. The project must already exist. If the table doesn’t exist yet, it creates it.

  • schema (str) – The GraphQL schema for the table. To learn about the schema definition language, see https://about.beneath.dev/docs/reading-writing-data/schema-definition/.

  • description (str) – The description shown for the table in the web console. If not set, tries to infer a description from the schema.

  • retention (timedelta) – The amount of time to retain records written to the table. If not set, records will be stored forever.

  • schema_kind (str) – The parser to use for schema. Currently must be “GraphQL” (default).

  • update_if_exists (bool) – If true and the table already exists, the provided info will be used to update the table (only supports non-breaking schema changes) before returning it.

async query_warehouse(query: str, analyze: bool = False, max_bytes_scanned: int = 10000000000, timeout_ms: int = 60000)[source]

Starts a warehouse (OLAP) SQL query, and returns a job for tracking its progress

Parameters
  • query (str) – The analytical SQL query to run. To learn about the query language, see https://about.beneath.dev/docs/reading-writing-data/warehouse-queries/.

  • analyze (bool) – If true, analyzes the query and returns info about referenced tables and expected bytes scanned, but doesn’t actually run the query.

  • max_bytes_scanned (int) – Sets a limit on the number of bytes the query can scan. If exceeded, the job will fail with an error.

async start()[source]

Opens the client for writes. Can be called multiple times, but make sure to call stop correspondingly.

async stop()[source]

Closes the client for writes, ensuring buffered writes are flushed. If start was called multiple times, only the last corresponding call to stop triggers a flush.

async write(instance: beneath.instance.TableInstance, records: Union[collections.abc.Mapping, Iterable[collections.abc.Mapping]])[source]

Writes one or more records to instance. By default, writes are buffered for up to write_delay_ms milliseconds before being transmitted to the server. See the Client constructor for details.

To enabled writes, make sure to call start on the client (and stop before terminating).

Parameters
  • instance (TableInstance) – The instance to write to. You can also call instance.write as a convenience wrapper.

  • records – The records to write. Can be a single record (dict) or a list of records (iterable of dict).

async force_flush()[source]

Forces the client to flush buffered writes without stopping

async write_full(table_path: str, records: Union[Iterable[dict], pandas.core.frame.DataFrame], key: Optional[Union[str, List[str]]] = None, description: Optional[str] = None, recreate_on_schema_change=False)[source]

Infers a schema, creates a table, and writes a full dataset to Beneath. Each call will create a new primary version for the table, and delete the old primary version if/when the write completes succesfully.

Parameters
  • table_path (str) – The (desired) path to the table in the format of “USERNAME/PROJECT/TABLE”. The project must already exist.

  • records (list(dict) | pandas.DataFrame) – The full dataset to write, either as a list of records or as a Pandas DataFrame. This function uses beneath.infer_avro to infer a schema for the table based on the records.

  • key (str | list(str)) – The fields to use as the table’s key. If not set, will default to the dataframe index if records is a Pandas DataFrame, or add a column of incrementing numbers if records is a list.

  • description (str) – A description for the table.

  • recreate_on_schema_change (bool) – If true, and there’s an existing table at table_path with a schema that is incompatible with the inferred schema for records, it will delete the existing table and create a new one instead of throwing an error. Defaults to false.

async checkpointer(project_path: str, key_prefix: Optional[str] = None, metatable_name='checkpoints', metatable_create: bool = True, metatable_description='Stores checkpointed state for consumers, pipelines, and more')beneath.checkpointer.Checkpointer[source]

Returns a checkpointer for the given project. Checkpointers store (small) key-value records useful for maintaining consumer and pipeline state. State is stored in a meta-table called “checkpoints” in the given project.

Parameters
  • project_path (str) – Path to the project in which to store the checkpointer’s state

  • key_prefix (str) – If set, any get or set call on the checkpointer will prepend the prefix to the key.

  • metatable_name (str) – Name of the meta table in which to save checkpointed data

  • metatable_create (bool) – If true, the checkpointer will create the checkpoints meta-table if it does not already exists. If false, the checkpointer will throw an exception if the meta-table does not already exist. Defaults to True.

  • metatable_description (str) – An optional description to apply to the checkpoints meta-table. Defaults to a sensible description of checkpointing.

async consumer(table_path: str, version: Optional[int] = None, batch_size: int = 1000, subscription_path: Optional[str] = None, checkpointer: Optional[beneath.checkpointer.Checkpointer] = None, metatable_create: bool = True)[source]

Creates a consumer for the given table. Consumers make it easy to replay the history of a table and/or subscribe to new changes.

Parameters
  • table_path (str) – Path to the table to subscribe to. The consumer will subscribe to the table’s primary version.

  • version (int) – The instance version to use for table. If not set, uses the primary instance.

  • batch_size (int) – Sets the max number of records to load in each network request. Defaults to 1000.

  • subscription_path (str) – Format “ORGANIZATION/PROJECT/NAME”. If set, the consumer will use a checkpointer to save cursors. That means processing will not restart from scratch if the process ends or crashes (as long as you use the same subscription name). To reset a subscription, call reset on the consumer.

  • checkpointer (Checkpointer) – Only applies if subscription_path is set. Provides a specific checkpointer to use for consumer state. If not set, will create one in the subscription’s project.

  • metatable_create (bool) – Only applies if subscription_path is set and checkpointer is not set. Passed through to client.checkpointer.