Source code for beneath.instance
# Allows us to use Table as a type hint without an import cycle
# pylint: disable=wrong-import-position,ungrouped-imports
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from beneath.client import Client
from beneath.table import Table
from collections.abc import Mapping
from typing import Iterable, Union
import uuid
from beneath import config
from beneath.cursor import Cursor
[docs]class TableInstance:
"""
Represents an instance of a Table, i.e. a specific version that you can query/subscribe/write
to. Learn more about instances at https://about.beneath.dev/docs/concepts/tables/.
"""
# INITIALIZATION
def __init__(self):
self.table: Table = None
""" The table that this is an instance of """
self.instance_id: uuid.UUID = None
""" The table instance ID """
self.is_final: bool = None
""" True if the instance has been made final and is closed for further writes """
self.is_primary: bool = None
""" True if the instance is the primary instance for the table """
self.version: int = None
""" The instance's version number """
self._client: Client = None
@classmethod
def _make(cls, client: Client, table: Table, admin_data: dict) -> TableInstance:
instance = TableInstance()
instance._client = client
instance.table = table
instance._set_admin_data(admin_data)
return instance
@classmethod
def _make_dry(
cls,
client: Client,
table: Table,
version: int,
make_primary=False,
) -> TableInstance:
instance = TableInstance()
instance._client = client
instance.table = table
instance.instance_id = None
instance.version = version
instance.is_primary = make_primary
return instance
def _set_admin_data(self, admin_data: dict):
self.instance_id = uuid.UUID(hex=admin_data["tableInstanceID"])
self.version = admin_data["version"]
self.is_final = admin_data["madeFinalOn"] is not None
self.is_primary = admin_data["madePrimaryOn"] is not None
# STATE
def __repr__(self):
url = f"{config.BENEATH_FRONTEND_HOST}/{self.table._identifier}/{self.version}"
return f'<beneath.table.TableInstance("{url}")>'
# CONTROL PLANE
[docs] async def update(self, make_primary=None, make_final=None):
""" Updates the instance """
# handle real and dry cases
if self.instance_id:
if make_final:
await self._client.force_flush()
admin_data = await self._client.admin.tables.update_instance(
instance_id=str(self.instance_id),
make_primary=make_primary,
make_final=make_final,
)
self._set_admin_data(admin_data)
else:
self.is_final = self.is_final or make_final
self.is_primary = self.is_primary or make_primary
if make_primary:
self.table.primary_instance = self
[docs] async def delete(self):
""" Deletes the instance """
if self.instance_id:
await self._client.admin.tables.delete_instance(instance_id=str(self.instance_id))
if self.table.primary_instance == self:
self.table.primary_instance = None
# READING RECORDS
[docs] async def query_log(self, peek: bool = False) -> Cursor:
"""
Queries the table's log, returning a cursor for replaying every record
written to the instance or for subscribing to new changes records in the table.
Args:
peek (bool):
If true, returns a cursor for the most recent records and
lets you page through the log in reverse order.
"""
# handle dry case
if not self.instance_id:
raise Exception("cannot query a dry instance")
resp = await self._client.connection.query_log(instance_id=self.instance_id, peek=peek)
assert len(resp.replay_cursors) <= 1 and len(resp.change_cursors) <= 1
replay = resp.replay_cursors[0] if len(resp.replay_cursors) > 0 else None
changes = resp.change_cursors[0] if len(resp.change_cursors) > 0 else None
return Cursor(
connection=self._client.connection,
schema=self.table.schema,
replay_cursor=replay,
changes_cursor=changes,
)
# pylint: disable=redefined-builtin
[docs] async def query_index(self, filter: str = None) -> Cursor:
"""
Queries a sorted index of the records written to the table. The index contains
the newest record for each record key (see the table's schema for the key). Returns
a cursor for paging through the index.
Args:
filter (str):
A filter to apply to the index. Filters allow you to quickly
find specific record(s) in the index based on the record key.
For details on the filter syntax,
see https://about.beneath.dev/docs/reading-writing-data/index-filters/.
"""
# handle dry case
if not self.instance_id:
raise Exception("cannot query a dry instance")
resp = await self._client.connection.query_index(
instance_id=self.instance_id, filter=filter
)
assert len(resp.replay_cursors) <= 1 and len(resp.change_cursors) <= 1
replay = resp.replay_cursors[0] if len(resp.replay_cursors) > 0 else None
changes = resp.change_cursors[0] if len(resp.change_cursors) > 0 else None
return Cursor(
connection=self._client.connection,
schema=self.table.schema,
replay_cursor=replay,
changes_cursor=changes,
)
# WRITING RECORDS
[docs] async def write(self, records: Union[Mapping, Iterable[Mapping]]):
""" Convenience wrapper for ``Client.write`` """
await self._client.write(self, records)