Skip to content

Steps

A repository for automation of pipeline and operations. Implements workflow steps and artefacts. Based on the Maestro library.

Getting started

As long as a newish Docker is installed, the following should work:

docker compose up

From vscode, you can also run the devcontainer to get a development environment.

Open the command palette (Shift + Command + P (Mac) / Ctrl + Shift + P (Windows/Linux)) and select Remote-Containers: Reopen in Container. This will spin up all docker containers, and attach vscode to the steps container.

Updating the Maestro library

Maestro is a python dependencies, and is installed in the steps container. To update the library, run the following command:

docker compose exec steps poetry update maestro

Maestro is currently pointing at the dev branch, and updating will ensure that the latest changes on dev are available. When Maestro is stabilizing, we should use tags for this.

Adding new python dependencies

Similarly, if you want to add a new python dependency, modify pyproject.toml and run:

docker compose exec steps poetry update

Accessing database

Easiest way to access database is by accessing http://localhost:7474/browser/

There, you can use Cypher to query the database. Simple example:

MATCH (n) RETURN n;

Delete all nodes and relationships:

MATCH (n) DETACH DELETE n;

Using Maestro

Steps relies on Maestro to process and execute pipeline operations. We will go through few important components you will need to understand to implement pipelines and corresponding operations.

Artefacts

Artefact classes are used to define input and output component of any operation (aka workflow step). Maestro provides BaseArtefactModel to define datatype for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has following attributes

  • identifier: string mandatory value to differentiate one artefact from another
  • id: integer id for the artefact, this is an optional attribute
  • maestro_id: string uuid for Maestro internal reference, the value is set by Maestro; can be overridden if needed
  • labels: list of string automatically set by Maestro
  • subjects: list of string, these are used by Maestro to set NATS subscription. Override only in case you need to customize how nats work

You can of course add more values or override existing input/output arfetact value type. For reference here's example of artefact model.

from maestro import BaseArtefactModel

class DemoInput(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

Make sure that you always use Maestro model class (BaseArtefactModel) to define artefact model in your pipeline/operations or workflow step.

File watcher

Typically in a pipeline would have a file you want to process operation on, to do this you can use FileWatcher class from Maestro. The base class is quite simple and can be extended as per pipelines needs. The file watcher will essentially watch changes in directory defined in path and perform operation only on newly added files. It's important to add files to the directory when steps is up and running

To define file watcher class in your pipeline you will need five components

  1. path: path to the directory where files to be processed are located
  2. pattern: Regex pattern of the filenames you wish to process, this way you can be selective about files you process in the path dir
  3. async def process: The actual operation you wish to perform on the file. Note the method should always be defined as async
  4. inputs: Data model of your input, can be left as default or can be extended from BaseArtefactModel New: there is now FileArtefactModel available specifically for FileWatcher, there are extra attributes like path, size, updated_at for FileArtefactModel
  5. output: Data model of your output, can be left as default or can be extended from BaseArtefactModel

Here's a sample for a file watcher class, note that you will always have to use FileWatcher class from Maestro to define your file watcher class.

from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher

class DemoInput(BaseArtefactModel):
    ...

class DemoInputAlternative(FileArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcher(FileWatcher):

    path = "/tmp/test_filewatcher"
    pattern = ".*.txt"

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

Note in process identifier is made available via BaseArtefactModel

There's optional filter class variable you can override from FileWatcher, currently it uses default filter to only watch for newly added files.

Message Watcher

This is probably the component you will use to define most steps in the pipeline. The message watcher class in Maestro is designed to watch all subclasses of message watcher and perform operation when all the inputs for the defined steps are found.

The implementation of message watcher subclass looks very similar to file watcher but has fewer components.

  1. input_types: set of artefact (extended from BaseArtefactModel) which will be used as input for the operation/process
  2. Parameters: Optional parameters that can be passed to the step process/operation
  3. async def process: The actual operation you wish to perform on the inputs. Note the method should always be defined as async
  4. inputs: Data model of your input, can be left as default or can be extended from BaseArtefactModel
  5. output: Data model of your output, can be left as default or can be extended from BaseArtefactModel

Here's a sample for a message watcher class, note that you will always have to use MessageWatcher class from Maestro to define your message watcher class.

from maestro import BaseArtefactModel, MessageWatcher

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

Order step job

To place an order for a MessageWatcher, you need to create a Job like this

import asyncio

from maestro import run_workflow_steps, order_workflow
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1", other_identifier="personA2")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(identifier=inputs[0].identifier, some_identifier="personB2")
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

task = await run_workflow_steps()

job = Job(
    inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
    params={"demo_param": "demo"},
    search_metadata=["sample_order_id: 1234"],
    workflow_step="DemoStep",
)

workflow = WorkflowModel(
    workflow_id="some_id",
    workflow_name="some_name",
    jobs=[job],
    search_metadata=["project_id: wf_1234"],
)
await order_workflows(workflow)

This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, DemoInputA with identifier personA and DemoInputB with identifier personB).

Note:

  • you can pass additional values in the params dict to identify input that needs to be used for process method.
  • the older way to order a job directly on MessageWatcher class is still available and can be used as follows, but it will deprecated after stable release.
  • please update JOB_TIMEOUT env variable to an appropriate value in seconds if you suspect that job might take more than 24 hours to complete
job = Job(
    inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
    params={"demo_param": "demo"},
    workflow_id="some_id",
    workflow_name="some_name",
    search_metadata=["project_id: wf_1234"],
)
await DemoStep.order(job)

Misc maestro imports

There are several modules made available via maestro, more details for objects not covered in sections above

run_workflow_steps

  • async method that check all artefact and watcher classes in the file and setups NATS and other async tasks required.
  • Accepts three options parameters: message_watcher_classes, file_watcher_classes, artefact_classes and should be of type list of MessageWatcher, FileWatcher, BaseArtefactModel respectively
  • If you only want to run specific watcher and artefact classes you can pass them to method like follows

Note: If any of the parameter classes are left empty or none then maestro will setup all the classes for that class

import asyncio

from maestro import run_workflow_steps
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(identifier=inputs[0].identifier)
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}
    # parameter
    demo_param: str

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])

Config

You can set several env variable in .env file that are used in Maestro, for instance you can change default job timeout of 24 hours by updating JOB_TIMEOUT in .env file

Logging

Maestro uses python logging module, same can be used in steps implementation. There several configuration one can use in the module, all the information can be here

Frontend

There is also a frotend where you can check details and status of workflow steps you are running, it can accessed at http://0.0.0.0:8000/index/