# allows us to use Client as a type hint without an import cycle
# see: https://www.stefaanlippens.net/circular-imports-type-hints-python.html
# pylint: disable=wrong-import-position,ungrouped-imports
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from beneath.client import Client
from typing import Iterable, Mapping, Union
import uuid
from beneath import config
from beneath.cursor import Cursor
from beneath.instance import TableInstance
from beneath.schema import Schema
from beneath.utils import TableIdentifier
[docs]class Table:
"""
Represents a data-plane connection to a table.
To find or create a table, see :class:`beneath.Client`.
Use it to get a TableInstance, which you can query, replay, subscribe and write to.
Learn more about tables and instances at https://about.beneath.dev/docs/concepts/tables/.
"""
table_id: uuid.UUID
"""
The table ID
"""
schema: Schema
"""
The table's schema
"""
primary_instance: TableInstance
"""
The current primary table instance.
This is probably the object you will use to write/query the table.
"""
use_log: bool
""" Whether log queries are supported for this table. """
use_index: bool
""" Whether index queries are supported for this table. """
use_warehouse: bool
""" Whether warehouse queries are supported for this table. """
_client: Client
_identifier: TableIdentifier
# INITIALIZATION
def __init__(self):
self.table_id: uuid.UUID = None
self.schema: Schema = None
self.primary_instance: TableInstance = None
self._client: Client = None
self._identifier: TableIdentifier = None
@classmethod
async def _make(cls, client: Client, identifier: TableIdentifier, admin_data=None) -> Table:
table = Table()
table._client = client
table._identifier = identifier
if not admin_data:
# pylint: disable=protected-access
admin_data = await table._load_admin_data()
table.table_id = uuid.UUID(hex=admin_data["tableID"])
table.schema = Schema(admin_data["avroSchema"])
if "primaryTableInstance" in admin_data:
if admin_data["primaryTableInstance"] is not None:
table.primary_instance = TableInstance._make(
client=client,
table=table,
admin_data=admin_data["primaryTableInstance"],
)
table.use_log = admin_data["useLog"]
table.use_index = admin_data["useIndex"]
table.use_warehouse = admin_data["useWarehouse"]
return table
@classmethod
async def _make_dry(
cls,
client: Client,
identifier: TableIdentifier,
avro_schema: str,
) -> Table:
table = Table()
table._client = client
table._identifier = identifier
table.table_id = None
table.schema = Schema(avro_schema)
table.primary_instance = await table.create_instance(version=0, make_primary=True)
table.use_log = True
table.use_index = True
table.use_warehouse = True
return table
async def _load_admin_data(self):
return await self._client.admin.tables.find_by_organization_project_and_name(
organization_name=self._identifier.organization,
project_name=self._identifier.project,
table_name=self._identifier.table,
)
# STATE
def __repr__(self):
return f'<beneath.table.Table("{config.BENEATH_FRONTEND_HOST}/{self._identifier}")>'
# INSTANCES
[docs] async def find_instances(self) -> Iterable[TableInstance]:
"""
Returns a list of all the table's instances.
Learn more about instances at https://about.beneath.dev/docs/concepts/tables/.
"""
# handle if dry
if not self.table_id:
if self.primary_instance:
return [self.primary_instance]
else:
return []
instances = await self._client.admin.tables.find_instances(str(self.table_id))
instances = [
TableInstance._make(client=self._client, table=self, admin_data=i) for i in instances
]
return instances
[docs] async def find_instance(self, version: int):
"""
Finds an instance by version number
Learn more about instances at https://about.beneath.dev/docs/concepts/tables/.
"""
# handle dry case
if not self.table_id:
if self.primary_instance and self.primary_instance.version == version:
return self.primary_instance
raise Exception("can't find instance by version for table created with a dry client")
if self.primary_instance and self.primary_instance.version == version:
return self.primary_instance
admin_data = await self._client.admin.tables.find_instance(
table_id=str(self.table_id),
version=version,
)
instance = TableInstance._make(client=self._client, table=self, admin_data=admin_data)
return instance
[docs] async def create_instance(
self,
version: int = None,
make_primary=None,
update_if_exists=None,
) -> TableInstance:
"""
Creates and returns a new instance for the table.
Learn more about instances at https://about.beneath.dev/docs/concepts/tables/.
Args:
version (int):
The version number to assign to the instance. If not set, will create a new instance
with a higher version number than any previous instance for the table.
make_primary (bool):
Immediately make the new instance the table's primary instance
update_if_exists (bool):
If true and an instance for ``version`` already exists, will update and return the
existing instance.
"""
# handle real and dry cases
if self.table_id:
admin_data = await self._client.admin.tables.create_instance(
table_id=str(self.table_id),
version=version,
make_primary=make_primary,
update_if_exists=update_if_exists,
)
instance = TableInstance._make(client=self._client, table=self, admin_data=admin_data)
else:
instance = TableInstance._make_dry(
client=self._client,
table=self,
version=(0 if version is None else version),
make_primary=make_primary,
)
if make_primary:
self.primary_instance = instance
return instance
# MANAGEMENT
[docs] async def delete(self):
"""
Deletes the table and all its instances and data.
"""
# handle if dry
if not self.table_id:
raise Exception("cannot delete dry table")
await self._client.admin.tables.delete(self.table_id)
# CURSORS
[docs] def restore_cursor(self, replay_cursor: bytes, changes_cursor: bytes):
"""
Restores a cursor previously obtained by querying one of the table's instances.
You must provide the cursor bytes, which can be found as properties of the Cursor object.
"""
return Cursor(
connection=self._client.connection,
schema=self.schema,
replay_cursor=replay_cursor,
changes_cursor=changes_cursor,
)
# WRITING RECORDS
[docs] async def write(self, records: Union[Mapping, Iterable[Mapping]]):
"""
Writes records to the table's primary instance.
This is a convenience wrapper for ``table.primary_instance.write(...)``.
"""
if not self.primary_instance:
raise Exception("cannot write because the table doesn't have a primary instance")
await self.primary_instance.write(records)