Skip to content

Maestro technical documentation

This page provides an overview of the technical aspects of Maestro, including explanation of key concepts and compontents.

Key concepts

Maestro contains generic definitions (base classes) of workflow steps and artefacts composing workflows in graphs:

  • Workflow steps are functional steps in a workflow.
  • Artefacts are any input/output necessary for workflow steps to perform their respective jobs.

The base definition of a workflow step include listening to/publishing messages through NATS and reading/persisting events in a database (Neo4j). The library exposes the base classes MessageWatcher, FileWatcher and Artefact for applications to subclass and utilize.

Architecture

The figure below gives an overview of the architecture used in Maestro:

Maestro

[TODO: Update figure]

See terminology for definitions of the terms used in the figure and elsewhere in the source.

Maestro API

Maestro's API provides communication with any system (internal or external). It is mostly used for fetching status of processes being ran by Maestro.

The API is available at http://localhost:8002/docs (default configuration) when running the service.

Maestro UI

The Maestro UI is a web interface that allows users to monitor the status of running and past jobs, and perform certain actions such as invalidating workflows/steps/artefacts and rerunning steps. The UI is accessible at http://localhost:8002/ (default configuration). See also the user manual.

Neo4j database

To easiest way to access the Neo4j database is through the Neo4j browser, available at http://localhost:7474/browser/ (default configuration).

Use Cypher queries to interact with the database. Examples:

  • Show all nodes in the graph:
    MATCH (n) RETURN n;
    
  • Delete all nodes and relationships:
    MATCH (n) DETACH DELETE n;
    

For migrations, see neo4j-python-migrations for details on naming conventions etc.

Maestro components and concepts

When using Maestro in an app (see setup), your app will rely on Maestro to process and execute workflow operations. In the following we will explain a few components that are important to understand before you start implementing pipelines and corresponding operations.

See also the terminology section for a brief definition of terms used in the following.

Workflow steps

[TODO]

Artefacts

Artefact classes are used to define input and output components of any operation (aka workflow step). Maestro provides a BaseArtefactModel to define datatypes for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has the following attributes:

Attribute Type Description
identifier str Identifier to differentiate one artefact from another (mandatory)
id int ID for the artefact (optional)
maestro_id str UUID for Maestro internal reference. A value is automatically provided by Maestro, but can optinally be overridden if needed.
labels list[str] Automatically set by Maestro
subjects list[str] Used by Maestro to set NATS subscription. Override only in case you need to customize how NATS works.

You can add more values or override an existing input/output arfetact value type. Example of an artefact model:

from maestro import BaseArtefactModel

class DemoInput(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

Note

Make sure that you always use Maestro's model class (BaseArtefactModel) to define the artefact model in your pipeline/operations or workflow step.

File watcher

The FileWatcher class from Maestro can be used whenever your app needs to pick up one or more files to process. The base class is quite simple and can be extended as per pipelines needs. The file watcher will watch for changes in the directory defined in path and perform operations only on newly added files.

Warning

It is important that the app is up and running when you add files to the directory.

To define a file watcher class in your pipeline you will need five components:

Component Description
path Path to the directory where files to be processed are located
pattern Regex pattern of the filenames you wish to process. With this you can further refine which files you want to process in the path directory.
async def process The actual operation you wish to perform on the file.
Note: The method should always be defined as async.
inputs Data model of your input. Use existing default or extend from BaseArtefactModel as needed.
output Data model of your output. Use existing default or extend from BaseArtefactModel as needed.

Here's an example for a file watcher class. Note that you will always have to use the FileWatcher class from Maestro to define the file watcher class in your app.

from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher

class DemoInput(BaseArtefactModel):
    ...

class DemoInputAlternative(FileArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcher(FileWatcher):

    path = "/tmp/test_filewatcher"
    pattern = ".*.txt"

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

Note: In this process, identifier is made available via BaseArtefactModel.

There's an optional filter class variable you can override from FileWatcher. The default filter specifies to only watch for newly added files.

Message watcher

This is probably the component you will use to define most steps in a workflow. The message watcher class in Maestro is designed to watch all subclasses of itself and perform operations when all the inputs for a defined workflow step are found.

The implementation of message watcher subclasses looks very similar to file watcher, but has fewer components:

Component Description
input_types Set of artefacts (extended from BaseArtefactModel) which will be used as input for the operation/process
Parameters Optional parameters that can be passed to the step process/operation
async def process The actual operation you wish to perform on the inputs.
Note: The method should always be defined as async.
inputs Data model of your input. Use existing default or extend from BaseArtefactModel as needed.
output Data model of your output. Use existing default or extend from BaseArtefactModel as needed.

Here's an example of a message watcher class. Note that you will always have to use the MessageWatcher class from Maestro to define the message watcher class in your app.

from maestro import BaseArtefactModel, MessageWatcher

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

Order job

To place an order for a MessageWatcher, you need to create a Job like this:

import asyncio

from maestro import run_workflow_steps, order_workflow
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1", other_identifier="personA2")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(identifier=inputs[0].identifier, some_identifier="personB2")
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

task = await run_workflow_steps()

job = Job(
    inputs=[("DemoInputA", {"identifier": "personA", "other_identifier": "personA2"}), ("DemoInputB", {"identifier": "personB", "some_identifier": "personB2"})],
    params={"demo_param": "demo"},
    search_metadata=["sample_order_id: 1234"],
    workflow_step="DemoStep",
)

workflow = WorkflowModel(
    workflow_id="some_id",
    workflow_name="some_name",
    jobs=[job],
    search_metadata=["project_id: wf_1234"],
)
await order_workflows(workflow)

This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, DemoInputA with identifier personA and DemoInputB with identifier personB).

Notes:

  • You can pass additional values in the params dict to identify input that needs to be used for process method.
  • Please update JOB_TIMEOUT env variable to an appropriate value in seconds if you suspect that job might take more than 24 hours to complete

Running workflow steps

run_workflow_steps is an async method that check all artefact and watcher classes in the file and setups NATS and other async tasks required. It accepts three options parameters:

Parameter Type (list of)
message_watcher_classes MessageWatcher
file_watcher_classes FileWatcher
artefact_classes BaseArtefactModel

Note: If any of the parameter classes are left empty or none, Maestro will set up all the classes for that class.

If you only want to run specific watcher and artefact classes you can pass them to method like follows:

import asyncio

from maestro import run_workflow_steps
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(identifier=inputs[0].identifier, steps_id="demo steps 1")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(identifier=inputs[0].identifier)
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}
    # parameter
    demo_param: str

    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput(identifier=inputs[0].identifier)
        return [output]

task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])

Logging

Maestro uses Python's logging module, same can be used in your app. Several configurations are available; see Logging facility for Python.

Terminology

Note: Terms explained elewhere in the table are in italics.

Term Description
Artefact Any class subclassing BaseArtefact. An artefact represents either results from a processed order or a processed file watcher.
Cypher Cypher is the query language used to query Neo4j (see official docs).
File watcher Any class that subclasses FileWatcher. Represents a workflow step to watch a specific path for changes (default: added files), and trigger the FileWatcher.process method on any change.
Inspector [TODO: WIP] System to inspect logs and Neo4j for errors and invalid state.
JetStream Built-in persistence system for NATS (see official docs).
Job A job is the result of an order, queued in the Neo4j db as node with a "Job"-label
Message watcher Any class inheriting from MessageWatcher. Represents a workflow step that takes input artefacts and, typically, produces some output artefacts. Only ever instantiated by a worker.
Monitoring [TODO: WIP] API and user interface for monitoring current and past jobs, and (yet to be determined) perform predefined actions such as cancel and restart.
NATS Messaging system (see official docs).
Neo4j Neo4j is the graph database where all results are stored. The nodes here are either artefact nodes or workflow step nodes, connected with a relationship of type INPUT or OUTPUT, representing a processed job.
Order An order is a NATS message to request a specific message watcher to do work to the subject workflow_step.<message watcher type>.order.
Worker A worker is a subprocess responsible for processing a job when it's ready.
Worker pool The worker pool is the set of workers instantiated on start up.
Worker queue The worker queue contains all orders that are ready to start, waiting to be picked up by a worker as soon as it's ready.
Workflow step Super-class of MessageWatcher and FileWatcher. Each workflow step is designed to have input and output. The workflow step model has three main meta-components:

1. Details - tracks id, label, datetime, etc. relevant to the workflow step itself
2. Input and Output - Artefact acting as input and output for the workflow step
3. Standard error and output log - Standard error/output captured while processing the workflow step.

The actual schema can be found in the source code.