Pipeline¶
-
class
beneath.
Pipeline
(action: Optional[beneath.pipeline.base_pipeline.Action] = None, strategy: Optional[beneath.pipeline.base_pipeline.Strategy] = None, version: Optional[int] = None, service_path: Optional[str] = None, service_read_quota: Optional[int] = None, service_write_quota: Optional[int] = None, service_scan_quota: Optional[int] = None, parse_args: bool = False, client: Optional[beneath.client.Client] = None, write_delay_ms: int = 1000, disable_checkpoints: bool = False)[source]¶ Pipelines are a construct built on top of the Beneath primitives to manage the reading of input tables, creation of output tables, data generation and derivation logic, and more.
This simple implementation supports four combinatorial operations:
generate
,read_table
,apply
, andwrite_table
. It’s suitable for generating tables, consuming tables, and one-to-N derivation of one table to another. It’s not currently suitable for advanced aggregation or multi-machine parallel processing.It supports (light) stateful transformations via a key-value based checkpointer, which is useful for tracking generator and consumer progress in between invocations (or crashes).
- Parameters
action (Action) – The action to run when calling
main
orexecute
strategy (Strategy) – The processing strategy to apply when action=”test” or action=”run”
version (int) – The version number for output tables. Incrementing the version number will cause the pipeline to create new output table instances and replay input tables.
service_path (str) – Path for a service to create for the pipeline when action=”stage”. The service will be assigned correct permissions for reading input tables and writing to output tables and checkpoints. The service can be used to create a secret for deploying the pipeline to production.
service_read_quota (int) – Read quota for the service staged for the pipeline (see
service_path
)service_write_quota (int) – Write quota for the service staged for the pipeline (see
service_path
)service_scan_quota (int) – Scan quota for the service staged for the pipeline (see
service_path
)client (Client) – Client to use for the pipeline. If not set, initializes a new client (see its init for details on which secret gets used).
write_delay_ms (int) – Passed to
Client
initializer ifclient
arg isn’t passeddisable_checkpoints (bool) – If true, will not create a checkpointer, and consumers will not save state. Defaults to false.
Example:
# pipeline.py # This pipeline generates a table `ticks` with a record for every minute # since 1st Jan 2021. # To test locally: # python ./pipeline.py test # To prepare for running: # python ./pipeline.py stage USERNAME/PROJECT/ticker # To run (after stage): # python ./pipeline.py run USERNAME/PROJECT/ticker # To teardown: # python ./pipeline.py teardown USERNAME/PROJECT/ticker import asyncio import beneath from datetime import datetime, timedelta, timezone start = datetime(year=2021, month=1, day=1, tzinfo=timezone.utc) async def ticker(p: beneath.Pipeline): last_tick = await p.checkpoints.get("last", default=start) while True: now = datetime.now(tz=last_tick.tzinfo) next_tick = last_tick + timedelta(minutes=1) if next_tick >= now: yield beneath.PIPELINE_IDLE wait = next_tick - now await asyncio.sleep(wait.total_seconds()) yield {"time": next_tick} await p.checkpoints.set("last", next_tick) last_tick = next_tick if __name__ == "__main__": p = beneath.Pipeline(parse_args=True) p.description = "Pipeline that emits a tick for every minute since 1st Jan 2021" ticks = p.generate(ticker) p.write_table( ticks, table_path="ticks", schema=''' type Tick @schema { time: Timestamp! @key } ''', ) p.main()
-
generate
(fn: Callable[[beneath.pipeline.base_pipeline.BasePipeline], AsyncIterator[collections.abc.Mapping]]) → beneath.pipeline.pipeline.Transform[source]¶ Pipeline step for generating records.
- Parameters
fn (async def fn(pipeline)) – An async iterator that generates records. The pipeline is passed as an input arg.
-
read_table
(table_path: str) → beneath.pipeline.pipeline.Transform[source]¶ Pipeline step for consuming the primary instance of a table.
- Parameters
table_path (str) – The table to consume
-
apply
(prev_transform: beneath.pipeline.pipeline.Transform, fn: Callable[[collections.abc.Mapping], Awaitable[Union[collections.abc.Mapping, Iterable[collections.abc.Mapping]]]], max_concurrency: Optional[int] = None) → beneath.pipeline.pipeline.Transform[source]¶ Pipeline step that transforms records emitted by a previous step.
- Parameters
prev_transform (Transform) – The pipeline step to apply on. Can be a generate, read_table, or other apply step.
fn (async def fn(record) -> (None | record | [record])) – Function applied to each incoming record. Can return
None
, one record or a list of records, which will propagate to the next pipeline step.
-
write_table
(prev_transform: beneath.pipeline.pipeline.Transform, table_path: str, schema: Optional[str] = None, description: Optional[str] = None, retention: Optional[datetime.timedelta] = None, schema_kind: str = 'GraphQL')[source]¶ Pipeline step that writes incoming records from the previous step to a table.
- Parameters
prev_transform (Transform) – The pipeline step to apply on. Can be a generate, read_table, or other apply step.
table_path (str) – The table to output to. If
schema
is provided, the table will be created when running thestage
action. If the path doesn’t include a username and project name, it will attempt to find or create the table in the pipeline’s service’s project (seeservice_path
in the constructor).schema (str) – A GraphQL schema for creating the output table.
description (str) – A description for the table (only applicable if schema is set).
retention (timedelta) – The amount of time to retain written data in the table. By default, records are saved forever.
schema_kind (str) – The parser to use for
schema
. Currently must be “GraphQL” (default).
-
async
execute
()¶ Executes the pipeline in accordance with the action and strategy set on init (called by
main
).
-
main
()¶ Executes the pipeline in an asyncio event loop and gracefully handles SIGINT and SIGTERM
-
class
beneath.
Action
(value)[source]¶ Actions that a pipeline can execute
-
test
= 'test'¶ Does a dry run of the pipeline, where input tables are read, but no output tables are created. Records output by the pipeline will be logged, but not written.
-
stage
= 'stage'¶ Creates all the resources the pipeline needs. These include output tables, a checkpoints meta table, and a service with the correct read and write permissions. The service can be used to create a secret for deploying the pipeline to production.
-
run
= 'run'¶ Runs the pipeline, reading inputs, applying transformations, and writing to output tables. You must execute the “stage” action before running.
-
teardown
= 'teardown'¶ The reverse action of “stage”. Deletes all resources created for the pipeline.
-
-
class
beneath.
Strategy
(value)[source]¶ Strategies for running a pipeline (only apply when
action="run"
)-
continuous
= 'continuous'¶ Attempts to keep the pipeline running forever. Will replay inputs and stay subscribed for changes, and continously flush outputs. May terminate if a generator returns (see
Pipeline.generate
for more).
-
delta
= 'delta'¶ Stops the pipeline when there are no more changes to consume. On the first run, it replays inputs, and on subsequent runs, it will consume and process all changes since the last run, and then stop.
-
batch
= 'batch'¶ Replays all inputs on every run, and never consumes changes. It will finalize table instances once it is done, so you must increase the
version
number on every run.
-