Pipeline

class beneath.Pipeline(action: Optional[beneath.pipeline.base_pipeline.Action] = None, strategy: Optional[beneath.pipeline.base_pipeline.Strategy] = None, version: Optional[int] = None, service_path: Optional[str] = None, service_read_quota: Optional[int] = None, service_write_quota: Optional[int] = None, service_scan_quota: Optional[int] = None, parse_args: bool = False, client: Optional[beneath.client.Client] = None, write_delay_ms: int = 1000, disable_checkpoints: bool = False)[source]

Pipelines are a construct built on top of the Beneath primitives to manage the reading of input tables, creation of output tables, data generation and derivation logic, and more.

This simple implementation supports four combinatorial operations: generate, read_table, apply, and write_table. It’s suitable for generating tables, consuming tables, and one-to-N derivation of one table to another. It’s not currently suitable for advanced aggregation or multi-machine parallel processing.

It supports (light) stateful transformations via a key-value based checkpointer, which is useful for tracking generator and consumer progress in between invocations (or crashes).

Parameters
  • action (Action) – The action to run when calling main or execute

  • strategy (Strategy) – The processing strategy to apply when action=”test” or action=”run”

  • version (int) – The version number for output tables. Incrementing the version number will cause the pipeline to create new output table instances and replay input tables.

  • service_path (str) – Path for a service to create for the pipeline when action=”stage”. The service will be assigned correct permissions for reading input tables and writing to output tables and checkpoints. The service can be used to create a secret for deploying the pipeline to production.

  • service_read_quota (int) – Read quota for the service staged for the pipeline (see service_path)

  • service_write_quota (int) – Write quota for the service staged for the pipeline (see service_path)

  • service_scan_quota (int) – Scan quota for the service staged for the pipeline (see service_path)

  • client (Client) – Client to use for the pipeline. If not set, initializes a new client (see its init for details on which secret gets used).

  • write_delay_ms (int) – Passed to Client initializer if client arg isn’t passed

  • disable_checkpoints (bool) – If true, will not create a checkpointer, and consumers will not save state. Defaults to false.

Example:

# pipeline.py

# This pipeline generates a table `ticks` with a record for every minute
# since 1st Jan 2021.

# To test locally:
#     python ./pipeline.py test
# To prepare for running:
#     python ./pipeline.py stage USERNAME/PROJECT/ticker
# To run (after stage):
#     python ./pipeline.py run USERNAME/PROJECT/ticker
# To teardown:
#     python ./pipeline.py teardown USERNAME/PROJECT/ticker

import asyncio
import beneath
from datetime import datetime, timedelta, timezone

start = datetime(year=2021, month=1, day=1, tzinfo=timezone.utc)

async def ticker(p: beneath.Pipeline):
    last_tick = await p.checkpoints.get("last", default=start)
    while True:
        now = datetime.now(tz=last_tick.tzinfo)
        next_tick = last_tick + timedelta(minutes=1)

        if next_tick >= now:
            yield beneath.PIPELINE_IDLE
            wait = next_tick - now
            await asyncio.sleep(wait.total_seconds())

        yield {"time": next_tick}
        await p.checkpoints.set("last", next_tick)
        last_tick = next_tick

if __name__ == "__main__":

    p = beneath.Pipeline(parse_args=True)
    p.description = "Pipeline that emits a tick for every minute since 1st Jan 2021"

    ticks = p.generate(ticker)
    p.write_table(
        ticks,
        table_path="ticks",
        schema='''
            type Tick @schema {
                time: Timestamp! @key
            }
        ''',
    )

    p.main()
generate(fn: Callable[[beneath.pipeline.base_pipeline.BasePipeline], AsyncIterator[collections.abc.Mapping]])beneath.pipeline.pipeline.Transform[source]

Pipeline step for generating records.

Parameters

fn (async def fn(pipeline)) – An async iterator that generates records. The pipeline is passed as an input arg.

read_table(table_path: str)beneath.pipeline.pipeline.Transform[source]

Pipeline step for consuming the primary instance of a table.

Parameters

table_path (str) – The table to consume

apply(prev_transform: beneath.pipeline.pipeline.Transform, fn: Callable[[collections.abc.Mapping], Awaitable[Union[collections.abc.Mapping, Iterable[collections.abc.Mapping]]]], max_concurrency: Optional[int] = None)beneath.pipeline.pipeline.Transform[source]

Pipeline step that transforms records emitted by a previous step.

Parameters
  • prev_transform (Transform) – The pipeline step to apply on. Can be a generate, read_table, or other apply step.

  • fn (async def fn(record) -> (None | record | [record])) – Function applied to each incoming record. Can return None, one record or a list of records, which will propagate to the next pipeline step.

write_table(prev_transform: beneath.pipeline.pipeline.Transform, table_path: str, schema: Optional[str] = None, description: Optional[str] = None, retention: Optional[datetime.timedelta] = None, schema_kind: str = 'GraphQL')[source]

Pipeline step that writes incoming records from the previous step to a table.

Parameters
  • prev_transform (Transform) – The pipeline step to apply on. Can be a generate, read_table, or other apply step.

  • table_path (str) – The table to output to. If schema is provided, the table will be created when running the stage action. If the path doesn’t include a username and project name, it will attempt to find or create the table in the pipeline’s service’s project (see service_path in the constructor).

  • schema (str) – A GraphQL schema for creating the output table.

  • description (str) – A description for the table (only applicable if schema is set).

  • retention (timedelta) – The amount of time to retain written data in the table. By default, records are saved forever.

  • schema_kind (str) – The parser to use for schema. Currently must be “GraphQL” (default).

async execute()

Executes the pipeline in accordance with the action and strategy set on init (called by main).

main()

Executes the pipeline in an asyncio event loop and gracefully handles SIGINT and SIGTERM

class beneath.Action(value)[source]

Actions that a pipeline can execute

test = 'test'

Does a dry run of the pipeline, where input tables are read, but no output tables are created. Records output by the pipeline will be logged, but not written.

stage = 'stage'

Creates all the resources the pipeline needs. These include output tables, a checkpoints meta table, and a service with the correct read and write permissions. The service can be used to create a secret for deploying the pipeline to production.

run = 'run'

Runs the pipeline, reading inputs, applying transformations, and writing to output tables. You must execute the “stage” action before running.

teardown = 'teardown'

The reverse action of “stage”. Deletes all resources created for the pipeline.

class beneath.Strategy(value)[source]

Strategies for running a pipeline (only apply when action="run")

continuous = 'continuous'

Attempts to keep the pipeline running forever. Will replay inputs and stay subscribed for changes, and continously flush outputs. May terminate if a generator returns (see Pipeline.generate for more).

delta = 'delta'

Stops the pipeline when there are no more changes to consume. On the first run, it replays inputs, and on subsequent runs, it will consume and process all changes since the last run, and then stop.

batch = 'batch'

Replays all inputs on every run, and never consumes changes. It will finalize table instances once it is done, so you must increase the version number on every run.