Steps
A repository for automation of pipeline and operations. Implements workflow steps and artefacts. Based on the Maestro library.
Getting started
First, make sure docker containers are running:
From VScode, you can also run the devcontainer
to get a development environment.
Open the command palette (Shift + Command + P (Mac) / Ctrl + Shift + P (Windows/Linux)) and select Remote-Containers: Reopen in Container
. This will spin up all docker containers, and attach VScode to the steps container.
Updating the Maestro library
Maestro is a Python library, and is installed in the steps
container. To update the library, run the following command:
Note: As long as maestro is pointing to the dev branch, this may break Steps - so don't do this unless you're prepared to deal with the fallout.
Maestro is currently pointing at the dev branch, and updating will ensure that the latest changes on dev are available. When Maestro is stabilizing, we will use tags for this.
If you during development should want to use a local version of Maestro, you can do so by adding the following line to volumes under the steps service definition in docker-compose.yml:
- <path to maestro root>/src/maestro/:/opt/poetry/.venv/lib/python3.12/site-packages/maestro
Adding new Python dependencies
Similarly, if you want to add a new Python dependency, modify pyproject.toml and run:
Running tests
Unit tests with Pytest
There are many unit tests, including large tests that exercise parts of the pipelines. Some of these require downloading huge reference data files and lots of machine resources. Make sure the Docker deamon has at least 20 GB RAM, many CPU cores and 2 GB swap. (Resource limits can be changed in Docker Desktop).
Specific tests can be run with the -k
flag, e.g. pytest -k "basepipe and not cuca"
.
Framework tests
These tests generate lots of test data suitable for testing/monitoring many workflows.
- Reset neo4j database and maestro stream, run this command before running main.py
- Starts Steps
- In a new terminal window, run following command to generate test data, the number of singles and trios can be changed in the command. You can run this command multiple times to add more test data cumulatively. The generated test data is eventually picked up by steps.
To simulate pending pipelines and failed pipelines, use any combinations of the following options:
--pending-s-seqdatamaker
The number of single-analysis which are pending SeqDataMaker.
One pending single-analysis SeqDataMaker implies:
- 1 sample
- 1 pending SeqDataMaker
- 1 pending single-analysis basepipe
- 1 pending single-analysis annopipe
--pending-s-basepipe
The number of single-analysis which are pending basepipe.
One pending single-analysis basepipe implies:
- 1 sample
- 1 successful SeqDataMaker
- 1 pending single-analysis basepipe
- 1 pending single analysis annopipe
--fail-s-basepipe
The number of single analyses which fail basepipe. One failed single-analysis basepipe implies:
- 1 sample
- 1 successful SeqDataMaker
- 1 failed single analysis basepipe
- 1 pending single analysis annopipe
--fail-s-annopipe
The number of single analyses which fail annopipe.
One failed single-analysis annopipe implies:
- 1 sample
- 1 successful SeqDataMaker
- 1 successful single-analysis basepipe
- 1 failed single analysis annopipe
--pending-t-seqdatamaker
The number of trio-analysis members which are pending SeqDataMaker.
--pending-t-basepipe
The number of trio-analysis members which are pending basepipe.
--fail-t-basepipe
The number of trio-analysis members which fail basepipe.
Pending trio-analysis SeqDataMaker, pending trio-analysis basepipes and failed trio-analysis basepipes are squashed into complete trios, when needed normal samples are added to make a complete a trio with pending or failed members.
Example 1:
2 pending trio-analysis basepipe and 1 failed trio-analysis basepipe implies:
- 3 samples
- 3 successful SeqDataMaker
- 2 pending trio-analysis basepipes
- 1 failed trio-analysis basepipe
- 1 pending triopipe
- 1 pending trio analysis annopipe
Example 2:
2 pending trio-analysis basepipe and 2 failed trio-analysis basepipe implies:
- 6 samples
- 3 successful SeqDataMaker
- 2 pending trio-analysis basepipes of trio 1
- 1 failed trio-analysis basepipe of trio 1
- 1 pending triopipe of trio 1
- 1 pending trio-analysis annopipe of trio 1
- 1 failed trio-analysis basepipe of trio 2
- 2 successful trio-analysis basepipes of trio 2
- 1 pending triopipe of trio 2
- 1 pending trio-analysis annopipe of trio 2
--fail-triopipe
The number of trio analyses which fail triopipe.
One failed triopipe implies:
- 3 samples
- 3 successful SeqDataMaker
- 3 successful trio-analysis basepipes
- 1 failed triopipe
- 1 pending trio-analysis annopipe
--fail-t-annopipe
The number of trio analyses which fail annopipe.
One failed trio-analysis annopipe implies:
- 3 samples
- 3 successful SeqDataMaker
- 3 successful trio-analysis basepipes
- 1 successful triopipe
- 1 failed trio-analysis annopipe
Pending and failed samples are guaranteed to exist. If the requested total number of singles and trios (1 trio == 3 samples) isn't reached, the script will generate normal samples to reach the total numbers.
docker compose exec steps /steps/tests/bin/generate_test_data.sh --total-singles 6 --pending-s-seqdatamaker=1 --pending-s-basepipe=1 --fail-s-basepipe=1 --fail-s-annopipe=1 --total-trios 5 --pending-t-seqdatamaker=1 --pending-t-basepipe=2 --fail-t-basepipe=2 --fail-triopipe=1 --fail-t-annopipe=1 --testdir /tmp/maestro
- Open webui to check the pipeline status
Go tolocalhost:8000
in your browser.
Accessing database
Easiest way to access database is by accessing http://localhost:7474/browser/
There, you can use Cypher to query the database. Simple example:
Delete all nodes and relationships:
Using Maestro
Steps relies on Maestro to process and execute pipeline operations. We will go through few important components you will need to understand to implement pipelines and corresponding operations.
Artefacts
Artefact classes are used to define input and output component of any operation (aka workflow step). Maestro provides BaseArtefactModel
to define datatype for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has following attributes
identifier
: string mandatory value to differentiate one artefact from anotherid
: integer id for the artefact, this is an optional attributemaestro_id
: string uuid for Maestro internal reference, the value is set by Maestro; can be overridden if neededlabels
: list of string automatically set by Maestrosubjects
: list of string, these are used by Maestro to set NATS subscription. Override only in case you need to customize how nats work
You can of course add more values or override existing input/output arfetact value type. For reference here's example of artefact model.
from maestro import BaseArtefactModel
class DemoInput(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
Make sure that you always use Maestro model class (BaseArtefactModel
) to define artefact model in your pipeline/operations or workflow step.
File watcher
Typically, a pipeline would have some files one wants to perform certain operations on. To this end, one would subclass the FileWatcher
class from Maestro. The base class is quite simple and can be extended as per pipelines needs. The FileWatcher
subclass will essentially monitor changes in the directory defined in path
and perform a given operation only on newly added files. It's important to add files to the directory when Steps is up and running.
To define file watcher class in your pipeline you will need five components
path
: path to the directory where files to be processed are locatedpattern
: Regex pattern of the filenames you wish to process, this way you can be selective about files you process in the path dirasync def process
: The actual operation you wish to perform on the file. Note the method should always be defined asasync
inputs
: Data model of your input, can be left as default or can be extended fromBaseArtefactModel
New: there is nowFileArtefactModel
available specifically forFileWatcher
, there are extra attributes likepath
,size
,updated_at
forFileArtefactModel
output
: Data model of your output, can be left as default or can be extended fromBaseArtefactModel
Here's a sample for a file watcher class, note that you will always have to use FileWatcher
class from Maestro to define your file watcher class.
from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher
class DemoInput(BaseArtefactModel):
...
class DemoInputAlternative(FileArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoFileWatcher(FileWatcher):
path = "/tmp/test_filewatcher"
pattern = ".*.txt"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoOutput(identifier=inputs[0].identifier)
return [output]
Note in process identifier
is made available via BaseArtefactModel
There's optional filter class variable you can override from FileWatcher
, currently it uses default filter to only watch for newly added files.
Message Watcher
This is probably the component you will use to define most steps in the pipeline. The message watcher class in Maestro is designed to watch all subclasses of message watcher and perform operation when all the inputs for the defined steps are found.
The implementation of message watcher subclass looks very similar to file watcher but has fewer components.
input_types
: set of artefact (extended fromBaseArtefactModel
) which will be used as input for the operation/process- Parameters: Optional parameters that can be passed to the step process/operation
async def process
: The actual operation you wish to perform on the inputs. Note the method should always be defined asasync
inputs
: Data model of your input, can be left as default or can be extended fromBaseArtefactModel
output
: Data model of your output, can be left as default or can be extended fromBaseArtefactModel
Here's a sample for a message watcher class, note that you will always have to use MessageWatcher
class from Maestro to define your message watcher class.
from maestro import BaseArtefactModel, MessageWatcher
class DemoInputA(BaseArtefactModel):
...
class DemoInputB(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoStep(MessageWatcher):
input_types = {DemoInputA, DemoInputB}
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
# perform any operation here
# Run some process here to create the output
output = DemoOutput(identifier=inputs[0].identifier)
return [output]
Order step job
To place an order for a MessageWatcher, you need to create a Job like this
import asyncio
from maestro import run_workflow_steps, order_workflow
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job
class DemoInputA(BaseArtefactModel):
...
class DemoInputB(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoFileWatcherA(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.sample"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1", other_identifier="personA2")
return [output]
class DemoFileWatcherB(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.txt"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoInputB(identifier=inputs[0].identifier, some_identifier="personB2")
return [output]
class DemoStep(MessageWatcher):
input_types = {DemoInputA, DemoInputB}
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
# perform any operation here
# Run some process here to create the output
output = DemoOutput(identifier=inputs[0].identifier)
return [output]
task = await run_workflow_steps()
job = Job(
inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
params={"demo_param": "demo"},
search_metadata=["sample_order_id: 1234"],
workflow_step="DemoStep",
)
workflow = WorkflowModel(
workflow_name="some_name",
jobs=[job],
search_metadata=["project_id: wf_1234"],
)
await order_workflows(workflow)
This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, DemoInputA
with identifier personA
and DemoInputB
with identifier personB
).
Note:
- you can pass additional values in the params dict to identify input that needs to be used for process method.
- the older way to order a job directly on MessageWatcher class is still available and can be used as follows, but it will deprecated after stable release.
- please update
JOB_TIMEOUT
env variable to an appropriate value in seconds if you suspect that job might take more than 24 hours to complete
job = Job(
inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
params={"demo_param": "demo"},
workflow_name="some_name",
search_metadata=["project_id: wf_1234"],
)
await DemoStep.order(job)
Misc maestro imports
There are several modules made available via maestro, more details for objects not covered in sections above
run_workflow_steps
async
method that check all artefact and watcher classes in the file and setupsNATS
and other async tasks required.- Accepts three options parameters:
message_watcher_classes
,file_watcher_classes
,artefact_classes
and should be of type list ofMessageWatcher
,FileWatcher
,BaseArtefactModel
respectively - If you only want to run specific watcher and artefact classes you can pass them to method like follows
Note: If any of the parameter classes are left empty or none then maestro will setup all the classes for that class
import asyncio
from maestro import run_workflow_steps
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job
class DemoInputA(BaseArtefactModel):
...
class DemoInputB(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoFileWatcherA(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.sample"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1")
return [output]
class DemoFileWatcherB(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.txt"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoInputB(identifier=inputs[0].identifier)
return [output]
class DemoStep(MessageWatcher):
input_types = {DemoInputA, DemoInputB}
# parameter
demo_param: str
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
# perform any operation here
# Run some process here to create the output
output = DemoOutput(identifier=inputs[0].identifier)
return [output]
task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])
Config
You can set several env variable in .env
file that are used in Maestro, for instance you can change default job timeout of 24 hours by updating JOB_TIMEOUT
in .env
file
Logging
Maestro uses Python's logging
module. The same can be used in Steps. There are several configurations one can use in the logging
module (see corresponding Python documentation
Frontend
There is also a frotend where you can check details and status of workflow steps you are running, it can accessed at http://0.0.0.0:8000/index/