Client¶
-
class
beneath.
Client
(secret: Optional[str] = None, dry: bool = False, write_delay_ms: int = 1000)[source]¶ The main class for interacting with Beneath. Data-related features (like defining tables and reading/writing data) are implemented directly on Client, while control-plane features (like creating projects) are isolated in the admin member.
- Parameters
secret (str) – A beneath secret to use for authentication. If not set, uses the
BENEATH_SECRET
environment variable, and if that is not set either, uses the secret authenticated in the CLI (stored in~/.beneath
).dry (bool) –
If true, the client will not perform any mutations or writes, but generally perform reads as usual. It’s useful for testing.
The exact implication differs for different operations: Some mutations will be mocked, such as creating a table, others will fail with an exception. Write operations log records to the logger instead of transmitting to the server. Reads generally work, but throw an exception when called on mocked resources.
write_delay_ms (int) – The maximum amount of time to buffer written records before sending a batch write request over the network. Defaults to 1 second (1000 ms). Writing records in batches reduces the number of requests, which leads to lower cost (Beneath charges at least 1kb per request).
-
async
find_table
(table_path: str) → beneath.table.Table[source]¶ Finds an existing table and returns an object that you can use to read and write from/to the table.
- Parameters
path (str) – The path to the table in the format of “USERNAME/PROJECT/TABLE”
-
async
create_table
(table_path: str, schema: str, description: Optional[str] = None, meta: Optional[bool] = None, use_index: Optional[bool] = None, use_warehouse: Optional[bool] = None, retention: Optional[datetime.timedelta] = None, log_retention: Optional[datetime.timedelta] = None, index_retention: Optional[datetime.timedelta] = None, warehouse_retention: Optional[datetime.timedelta] = None, schema_kind: str = 'GraphQL', indexes: Optional[str] = None, update_if_exists: Optional[bool] = None) → beneath.table.Table[source]¶ Creates (or optionally updates if
update_if_exists=True
) a table and returns it.- Parameters
table_path (str) – The (desired) path to the table in the format of “USERNAME/PROJECT/TABLE”. The project must already exist. If the table doesn’t exist yet, it creates it.
schema (str) – The GraphQL schema for the table. To learn about the schema definition language, see https://about.beneath.dev/docs/reading-writing-data/schema-definition/.
description (str) – The description shown for the table in the web console. If not set, tries to infer a description from the schema.
retention (timedelta) – The amount of time to retain records written to the table. If not set, records will be stored forever.
schema_kind (str) – The parser to use for
schema
. Currently must be “GraphQL” (default).update_if_exists (bool) – If true and the table already exists, the provided info will be used to update the table (only supports non-breaking schema changes) before returning it.
-
async
query_warehouse
(query: str, analyze: bool = False, max_bytes_scanned: int = 10000000000, timeout_ms: int = 60000)[source]¶ Starts a warehouse (OLAP) SQL query, and returns a job for tracking its progress
- Parameters
query (str) – The analytical SQL query to run. To learn about the query language, see https://about.beneath.dev/docs/reading-writing-data/warehouse-queries/.
analyze (bool) – If true, analyzes the query and returns info about referenced tables and expected bytes scanned, but doesn’t actually run the query.
max_bytes_scanned (int) – Sets a limit on the number of bytes the query can scan. If exceeded, the job will fail with an error.
-
async
start
()[source]¶ Opens the client for writes. Can be called multiple times, but make sure to call
stop
correspondingly.
-
async
stop
()[source]¶ Closes the client for writes, ensuring buffered writes are flushed. If
start
was called multiple times, only the last corresponding call tostop
triggers a flush.
-
async
write
(instance: beneath.instance.TableInstance, records: Union[collections.abc.Mapping, Iterable[collections.abc.Mapping]])[source]¶ Writes one or more records to
instance
. By default, writes are buffered for up towrite_delay_ms
milliseconds before being transmitted to the server. See the Client constructor for details.To enabled writes, make sure to call
start
on the client (andstop
before terminating).- Parameters
instance (TableInstance) – The instance to write to. You can also call
instance.write
as a convenience wrapper.records – The records to write. Can be a single record (dict) or a list of records (iterable of dict).
-
async
write_full
(table_path: str, records: Union[Iterable[dict], pandas.core.frame.DataFrame], key: Optional[Union[str, List[str]]] = None, description: Optional[str] = None, recreate_on_schema_change=False)[source]¶ Infers a schema, creates a table, and writes a full dataset to Beneath. Each call will create a new primary version for the table, and delete the old primary version if/when the write completes succesfully.
- Parameters
table_path (str) – The (desired) path to the table in the format of “USERNAME/PROJECT/TABLE”. The project must already exist.
records (list(dict) | pandas.DataFrame) – The full dataset to write, either as a list of records or as a Pandas DataFrame. This function uses
beneath.infer_avro
to infer a schema for the table based on the records.key (str | list(str)) – The fields to use as the table’s key. If not set, will default to the dataframe index if
records
is a Pandas DataFrame, or add a column of incrementing numbers ifrecords
is a list.description (str) – A description for the table.
recreate_on_schema_change (bool) – If true, and there’s an existing table at
table_path
with a schema that is incompatible with the inferred schema forrecords
, it will delete the existing table and create a new one instead of throwing an error. Defaults to false.
-
async
checkpointer
(project_path: str, key_prefix: Optional[str] = None, metatable_name='checkpoints', metatable_create: bool = True, metatable_description='Stores checkpointed state for consumers, pipelines, and more') → beneath.checkpointer.Checkpointer[source]¶ Returns a checkpointer for the given project. Checkpointers store (small) key-value records useful for maintaining consumer and pipeline state. State is stored in a meta-table called “checkpoints” in the given project.
- Parameters
project_path (str) – Path to the project in which to store the checkpointer’s state
key_prefix (str) – If set, any
get
orset
call on the checkpointer will prepend the prefix to the key.metatable_name (str) – Name of the meta table in which to save checkpointed data
metatable_create (bool) – If true, the checkpointer will create the checkpoints meta-table if it does not already exists. If false, the checkpointer will throw an exception if the meta-table does not already exist. Defaults to True.
metatable_description (str) – An optional description to apply to the checkpoints meta-table. Defaults to a sensible description of checkpointing.
-
async
consumer
(table_path: str, version: Optional[int] = None, batch_size: int = 1000, subscription_path: Optional[str] = None, checkpointer: Optional[beneath.checkpointer.Checkpointer] = None, metatable_create: bool = True)[source]¶ Creates a consumer for the given table. Consumers make it easy to replay the history of a table and/or subscribe to new changes.
- Parameters
table_path (str) – Path to the table to subscribe to. The consumer will subscribe to the table’s primary version.
version (int) – The instance version to use for table. If not set, uses the primary instance.
batch_size (int) – Sets the max number of records to load in each network request. Defaults to 1000.
subscription_path (str) – Format “ORGANIZATION/PROJECT/NAME”. If set, the consumer will use a checkpointer to save cursors. That means processing will not restart from scratch if the process ends or crashes (as long as you use the same subscription name). To reset a subscription, call
reset
on the consumer.checkpointer (Checkpointer) – Only applies if
subscription_path
is set. Provides a specific checkpointer to use for consumer state. If not set, will create one in the subscription’s project.metatable_create (bool) – Only applies if
subscription_path
is set andcheckpointer
is not set. Passed through toclient.checkpointer
.