Skip to content

Steps

A repository for automation of pipeline and operations. Implements workflow steps and artefacts. Based on the Maestro library.

Getting started

First, make sure docker containers are running:

docker compose up

From VScode, you can also run the devcontainer to get a development environment.

Open the command palette (Shift + Command + P (Mac) / Ctrl + Shift + P (Windows/Linux)) and select Remote-Containers: Reopen in Container. This will spin up all docker containers, and attach VScode to the steps container.

Updating the Maestro library

Maestro is a Python library, and is installed in the steps container. To update the library, run the following command:

docker compose exec steps poetry update maestro

Note: As long as maestro is pointing to the dev branch, this may break Steps - so don't do this unless you're prepared to deal with the fallout.

Maestro is currently pointing at the dev branch, and updating will ensure that the latest changes on dev are available. When Maestro is stabilizing, we will use tags for this.
If you during development should want to use a local version of Maestro, you can do so by adding the following line to volumes under the steps service definition in docker-compose.yml:
- <path to maestro root>/src/maestro/:/opt/poetry/.venv/lib/python3.12/site-packages/maestro

Adding new Python dependencies

Similarly, if you want to add a new Python dependency, modify pyproject.toml and run:

docker compose exec steps poetry update

Running tests

Unit tests with Pytest

There are many unit tests, including large tests that exercise parts of the pipelines. Some of these require downloading huge reference data files and lots of machine resources. Make sure the Docker deamon has at least 20 GB RAM, many CPU cores and 2 GB swap. (Resource limits can be changed in Docker Desktop).

docker compose exec steps pytest

Specific tests can be run with the -k flag, e.g. pytest -k "basepipe and not cuca".

Framework tests

These tests generate lots of test data suitable for testing/monitoring many workflows.

  1. Reset neo4j database and maestro stream, run this command before running main.py
    docker compose exec steps python /steps/ops/dev/reset_all.py
    
  2. Starts Steps
    docker compose exec -e MOCK_VCPIPE=true steps python /steps/src/main.py
    
  3. In a new terminal window, run following command to generate test data, the number of singles and trios can be changed in the command. You can run this command multiple times to add more test data cumulatively. The generated test data is eventually picked up by steps.
    docker compose exec steps /steps/tests/bin/generate_test_data.sh --total-singles 5 --total-trios 1 --testdir /tmp/maestro
    

To simulate pending pipelines and failed pipelines, use any combinations of the following options:

--pending-s-seqdatamaker
The number of single-analysis which are pending SeqDataMaker.
One pending single-analysis SeqDataMaker implies: - 1 sample - 1 pending SeqDataMaker - 1 pending single-analysis basepipe - 1 pending single-analysis annopipe

--pending-s-basepipe
The number of single-analysis which are pending basepipe. One pending single-analysis basepipe implies: - 1 sample - 1 successful SeqDataMaker - 1 pending single-analysis basepipe - 1 pending single analysis annopipe

--fail-s-basepipe
The number of single analyses which fail basepipe. One failed single-analysis basepipe implies: - 1 sample - 1 successful SeqDataMaker - 1 failed single analysis basepipe - 1 pending single analysis annopipe

--fail-s-annopipe
The number of single analyses which fail annopipe.
One failed single-analysis annopipe implies: - 1 sample - 1 successful SeqDataMaker - 1 successful single-analysis basepipe - 1 failed single analysis annopipe

--pending-t-seqdatamaker
The number of trio-analysis members which are pending SeqDataMaker.

--pending-t-basepipe
The number of trio-analysis members which are pending basepipe.

--fail-t-basepipe
The number of trio-analysis members which fail basepipe.

Pending trio-analysis SeqDataMaker, pending trio-analysis basepipes and failed trio-analysis basepipes are squashed into complete trios, when needed normal samples are added to make a complete a trio with pending or failed members.

Example 1:
2 pending trio-analysis basepipe and 1 failed trio-analysis basepipe implies: - 3 samples - 3 successful SeqDataMaker - 2 pending trio-analysis basepipes - 1 failed trio-analysis basepipe - 1 pending triopipe - 1 pending trio analysis annopipe

Example 2:
2 pending trio-analysis basepipe and 2 failed trio-analysis basepipe implies: - 6 samples - 3 successful SeqDataMaker - 2 pending trio-analysis basepipes of trio 1 - 1 failed trio-analysis basepipe of trio 1 - 1 pending triopipe of trio 1 - 1 pending trio-analysis annopipe of trio 1 - 1 failed trio-analysis basepipe of trio 2 - 2 successful trio-analysis basepipes of trio 2 - 1 pending triopipe of trio 2 - 1 pending trio-analysis annopipe of trio 2

--fail-triopipe
The number of trio analyses which fail triopipe. One failed triopipe implies: - 3 samples - 3 successful SeqDataMaker - 3 successful trio-analysis basepipes - 1 failed triopipe - 1 pending trio-analysis annopipe

--fail-t-annopipe
The number of trio analyses which fail annopipe.
One failed trio-analysis annopipe implies: - 3 samples - 3 successful SeqDataMaker - 3 successful trio-analysis basepipes - 1 successful triopipe - 1 failed trio-analysis annopipe

Pending and failed samples are guaranteed to exist. If the requested total number of singles and trios (1 trio == 3 samples) isn't reached, the script will generate normal samples to reach the total numbers.

docker compose exec steps /steps/tests/bin/generate_test_data.sh --total-singles 6 --pending-s-seqdatamaker=1 --pending-s-basepipe=1 --fail-s-basepipe=1 --fail-s-annopipe=1 --total-trios 5 --pending-t-seqdatamaker=1 --pending-t-basepipe=2 --fail-t-basepipe=2 --fail-triopipe=1 --fail-t-annopipe=1 --testdir /tmp/maestro

  1. Open webui to check the pipeline status
    Go to localhost:8000 in your browser.

Accessing database

Easiest way to access database is by accessing http://localhost:7474/browser/

There, you can use Cypher to query the database. Simple example:

MATCH (n) RETURN n;

Delete all nodes and relationships:

MATCH (n) DETACH DELETE n;

Using Maestro

Steps relies on Maestro to process and execute pipeline operations. We will go through few important components you will need to understand to implement pipelines and corresponding operations.

Artefacts

Artefact classes are used to define input and output component of any operation (aka workflow step). Maestro provides BaseArtefactModel to define datatype for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has following attributes

  • identifier: string mandatory value to differentiate one artefact from another
  • id: integer id for the artefact, this is an optional attribute
  • maestro_id: string uuid for Maestro internal reference, the value is set by Maestro; can be overridden if needed
  • labels: list of string automatically set by Maestro
  • subjects: list of string, these are used by Maestro to set NATS subscription. Override only in case you need to customize how nats work

You can of course add more values or override existing input/output arfetact value type. For reference here's example of artefact model.

from maestro import BaseArtefactModel

class DemoInput(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

Make sure that you always use Maestro model class (BaseArtefactModel) to define artefact model in your pipeline/operations or workflow step.

File watcher

Typically, a pipeline would have some files one wants to perform certain operations on. To this end, one would subclass the FileWatcher class from Maestro. The base class is quite simple and can be extended as per pipelines needs. The FileWatcher subclass will essentially monitor changes in the directory defined in path and perform a given operation only on newly added files. It's important to add files to the directory when Steps is up and running.

To define file watcher class in your pipeline you will need five components

  1. path: path to the directory where files to be processed are located
  2. pattern: Regex pattern of the filenames you wish to process, this way you can be selective about files you process in the path dir
  3. async def process: The actual operation you wish to perform on the file. Note the method should always be defined as async
  4. inputs: Data model of your input, can be left as default or can be extended from BaseArtefactModel New: there is now FileArtefactModel available specifically for FileWatcher, there are extra attributes like path, size, updated_at for FileArtefactModel
  5. output: Data model of your output, can be left as default or can be extended from BaseArtefactModel

Here's a sample for a file watcher class, note that you will always have to use FileWatcher class from Maestro to define your file watcher class.

from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher

class DemoInput(BaseArtefactModel):
    ...

class DemoInputAlternative(FileArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcher(FileWatcher):

    path = "/tmp/test_filewatcher"
    pattern = ".*.txt"

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

Note in process identifier is made available via BaseArtefactModel

There's optional filter class variable you can override from FileWatcher, currently it uses default filter to only watch for newly added files.

Message Watcher

This is probably the component you will use to define most steps in the pipeline. The message watcher class in Maestro is designed to watch all subclasses of message watcher and perform operation when all the inputs for the defined steps are found.

The implementation of message watcher subclass looks very similar to file watcher but has fewer components.

  1. input_types: set of artefact (extended from BaseArtefactModel) which will be used as input for the operation/process
  2. Parameters: Optional parameters that can be passed to the step process/operation
  3. async def process: The actual operation you wish to perform on the inputs. Note the method should always be defined as async
  4. inputs: Data model of your input, can be left as default or can be extended from BaseArtefactModel
  5. output: Data model of your output, can be left as default or can be extended from BaseArtefactModel

Here's a sample for a message watcher class, note that you will always have to use MessageWatcher class from Maestro to define your message watcher class.

from maestro import BaseArtefactModel, MessageWatcher

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

Order step job

To place an order for a MessageWatcher, you need to create a Job like this

import asyncio

from maestro import run_workflow_steps, order_workflow
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1", other_identifier="personA2")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(identifier=inputs[0].identifier, some_identifier="personB2")
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

task = await run_workflow_steps()

job = Job(
    inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
    params={"demo_param": "demo"},
    search_metadata=["sample_order_id: 1234"],
    workflow_step="DemoStep",
)

workflow = WorkflowModel(
    workflow_name="some_name",
    jobs=[job],
    search_metadata=["project_id: wf_1234"],
)
await order_workflows(workflow)

This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, DemoInputA with identifier personA and DemoInputB with identifier personB).

Note:

  • you can pass additional values in the params dict to identify input that needs to be used for process method.
  • the older way to order a job directly on MessageWatcher class is still available and can be used as follows, but it will deprecated after stable release.
  • please update JOB_TIMEOUT env variable to an appropriate value in seconds if you suspect that job might take more than 24 hours to complete
job = Job(
    inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
    params={"demo_param": "demo"},
    workflow_name="some_name",
    search_metadata=["project_id: wf_1234"],
)
await DemoStep.order(job)

Misc maestro imports

There are several modules made available via maestro, more details for objects not covered in sections above

run_workflow_steps

  • async method that check all artefact and watcher classes in the file and setups NATS and other async tasks required.
  • Accepts three options parameters: message_watcher_classes, file_watcher_classes, artefact_classes and should be of type list of MessageWatcher, FileWatcher, BaseArtefactModel respectively
  • If you only want to run specific watcher and artefact classes you can pass them to method like follows

Note: If any of the parameter classes are left empty or none then maestro will setup all the classes for that class

import asyncio

from maestro import run_workflow_steps
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(identifier=inputs[0].identifier)
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}
    # parameter
    demo_param: str

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])

Config

You can set several env variable in .env file that are used in Maestro, for instance you can change default job timeout of 24 hours by updating JOB_TIMEOUT in .env file

Logging

Maestro uses Python's logging module. The same can be used in Steps. There are several configurations one can use in the logging module (see corresponding Python documentation

Frontend

There is also a frotend where you can check details and status of workflow steps you are running, it can accessed at http://0.0.0.0:8000/index/