Source code for beneath.job
# allows us to use Client as a type hint without an import cycle
# see: https://www.stefaanlippens.net/circular-imports-type-hints-python.html
# pylint: disable=wrong-import-position,ungrouped-imports
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from beneath.client import Client
import asyncio
from enum import Enum
from typing import List
import uuid
from beneath.cursor import Cursor
from beneath.proto import gateway_pb2
from beneath.schema import Schema
[docs]class JobStatus(Enum):
""" Represents the status of a Job """
pending = "pending" #: Query is waiting to be scheduled
running = "running" #: Query is running
done = "done" #: Query has finished running
POLL_FREQUENCY = 1.0
[docs]class Job:
"""
Job represents a warehouse (OLAP) SQL query job. Unlike index queries, warehouse queries
take seconds or minutes to execute, so we use a Job to follow their progress until we can
fetch the results.
"""
client: Client
job_id: bytes
""" The unique job identifier """
schema: Schema
""" The schema of the query result """
status: JobStatus
""" The current status of the job """
referenced_instance_ids: List[uuid.UUID]
""" The IDs of the table instances referenced in the query """
bytes_scanned: int
""" The number of bytes scanned by the query """
result_size_bytes: int
""" The number of bytes in the query result (not always accurate) """
result_size_records: int
""" The number of records in the query result (not always accurate) """
_job_data: gateway_pb2.WarehouseJob
# INITIALIZATION
def __init__(self, client: Client, job_id: bytes, job_data=None):
self.client = client
self.job_id = job_id
self.schema = None
self.status = None
self.bytes_scanned = None
self.result_size_bytes = None
self.result_size_records = None
self._set_job_data(job_data)
[docs] async def poll(self):
""" Polls for updates to the job. Throws an exception if the job errored. """
self._check_is_not_dry()
resp = await self.client.connection.poll_warehouse_job(self.job_id)
self._set_job_data(resp.job)
def _set_job_data(self, job_data):
self._job_data = job_data
if not job_data:
return
if job_data.error:
raise Exception(f"warehouse query error: {job_data.error}")
if not self.schema:
if job_data:
if job_data.result_avro_schema:
self.schema = Schema(job_data.result_avro_schema)
if job_data.status == gateway_pb2.WarehouseJob.PENDING:
self.status = JobStatus.pending
elif job_data.status == gateway_pb2.WarehouseJob.RUNNING:
self.status = JobStatus.running
elif job_data.status == gateway_pb2.WarehouseJob.DONE:
self.status = JobStatus.done
else:
raise Exception(f"unknown job status in job: {str(job_data)}")
if job_data.referenced_instance_ids:
self.referenced_instance_ids = [
uuid.UUID(bytes=uid) for uid in job_data.referenced_instance_ids
]
self.bytes_scanned = job_data.bytes_scanned
self.result_size_bytes = job_data.result_size_bytes
self.result_size_records = job_data.result_size_records
def _check_is_not_dry(self):
if not self.job_id:
raise Exception("cannot poll dry run job")
# READ
[docs] async def get_cursor(self) -> Cursor:
""" Returns a cursor for paging through the query results """
self._check_is_not_dry()
# poll until completed
while self.status != JobStatus.done: # not completed
if self._job_data: # don't sleep if we haven't done first fetch yet
await asyncio.sleep(POLL_FREQUENCY)
await self.poll()
# we know job completed without error (poll raises job errors)
cursor_bytes = self._job_data.replay_cursors[0] if self._job_data.replay_cursors else None
return Cursor(
connection=self.client.connection,
schema=self.schema,
replay_cursor=cursor_bytes,
changes_cursor=None,
)