Technical concepts#

This page provides an overview of the technical aspects of Maestro, including explanation of key concepts and compontents.

Key concepts#

Maestro contains generic definitions (base classes) of workflow steps and artefacts composing workflows in graphs:

  • Workflow steps are functional steps in a workflow.

  • Artefacts are any input/output necessary for workflow steps to perform their respective jobs.

The base definition of a workflow step include listening to/publishing messages through NATS and reading/persisting events in a Neo4j database. The library exposes the base classes MessageWatcher, FileWatcher and BaseArtefactModel for applications to subclass and utilize.

Architecture#

The figure below gives an overview of the architecture used in Maestro:

Maestro architecture

[TODO]: Update figure

See Terminology for definitions of the terms used in the figure and elsewhere in the source.

Maestro API#

Maestro’s API provides communication with any system (internal or external). It is mostly used for fetching status of processes being ran by Maestro.

The API is available at http://localhost:8002/docs (default configuration) when running the service.

Maestro UI#

The Maestro UI is a web interface that allows users to monitor the status of running and past jobs, and perform certain actions such as invalidating workflows/steps/artefacts and reordering steps. The UI is accessible at http://localhost:8002/ (default configuration). See also the UI user manual.

All rendering is done server-side using Jinja2 templates.

Neo4j database#

To easiest way to access the Neo4j database is through the Neo4j browser, available at http://localhost:7474/browser/ (default configuration).

Use Cypher queries to interact with the database. Examples:

  • Show all nodes in the graph:

    MATCH (n) RETURN n;
    
  • Delete all nodes and relationships:

    MATCH (n) DETACH DELETE n;
    

For migrations, see neo4j-python-migrations for details on naming conventions etc.

Maestro components and concepts#

When using Maestro in an app (see Getting started), your app will rely on Maestro to process and execute workflow operations. In the following we will explain a few components that are important to understand before you start implementing pipelines and corresponding operations.

See also the Terminology section for a brief definition of terms used in the following.

Workflow steps#

[TODO]

Artefacts#

Artefact classes are used to define input and output components of any operation (aka workflow step). Maestro provides a BaseArtefactModel to define datatypes for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has the following attributes:

Attribute

Type

Description

id

int

ID for the artefact (optional)

maestro_id

str

UUID for Maestro internal reference. A value is automatically provided by Maestro, but can optinally be overridden if needed.

labels

list[str]

Automatically set by Maestro.

subjects

list[str]

Used by Maestro to set NATS subscription. Override only in case you need to customize how NATS works.

You can add more values or override an existing input/output arfetact value type. Example of an artefact model:

from maestro import BaseArtefactModel

class DemoInput(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

Warning

Make sure that you always use Maestro’s model class (BaseArtefactModel) to define the artefact model in your pipeline/operations or workflow step.

File watcher#

The FileWatcher class from Maestro can be used whenever your app needs to pick up one or more files to process. The base class is quite simple and can be extended as per pipelines needs. The file watcher will watch for changes in the directory defined in path and perform operations only on newly added files.

Warning

It is important that the app is up and running when you add files to the directory.

To define a file watcher class in your pipeline you will need five components:

Component

Description

path

Path to the directory where files to be processed are located.

pattern

Regex pattern of the filenames you wish to process. With this you can further refine which files you want to process in the path directory.

priority

This component is NOT mandatory. Can be integer 1, 2 or 3 to setup priority of a file watcher class; defaults to 1 (lowest/normal).

async def process

The actual operation you wish to perform on the file. Note: The method should always be defined as async.

inputs

Data model of your input. Use existing default or extend BaseArtefactModel as needed.

output

Data model of your output. Use existing default or extend BaseArtefactModel as needed.

Here’s an example for a file watcher class. Note that you will always have to use the FileWatcher class from Maestro to define the file watcher class in your app.

from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher

class DemoInput(BaseArtefactModel):
    ...

class DemoInputAlternative(FileArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcher(FileWatcher):

    path = "/tmp/test_filewatcher"
    pattern = ".*.txt"

    async def process(self, *args, **kwargs):
        output = DemoOutput()
        return [output]

There’s an optional filter class variable you can override from FileWatcher. The default filter specifies to only watch for newly added files.

Message watcher#

This is probably the component you will use to define most steps in a workflow. The message watcher class in Maestro is designed to watch all subclasses of itself and perform operations when all the inputs for a defined workflow step are found.

The implementation of message watcher subclasses looks very similar to file watcher, but has fewer components:

Component

Description

Parameters

Optional parameters that can be passed to the step process/operation

path

Path to the directory where files to be processed are located.

async def process

The actual operation you wish to perform on the file. Note: The method should always be defined as async.

inputs

Data model of your input. Use existing default or extend BaseArtefactModel as needed.

output

Data model of your output. Use existing default or extend BaseArtefactModel as needed.

Here’s an example of a message watcher class. Note that you will always have to use the MessageWatcher class from Maestro to define the message watcher class in your app.

from maestro import BaseArtefactModel, MessageWatcher

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoStep(MessageWatcher):
    async def process(self, *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput()
        return [output]

Order job#

To place an order for a MessageWatcher, you need to create a Job like this:

import asyncio

from maestro import run_workflow_steps, order_workflow
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    other_identifier: str

class DemoInputB(BaseArtefactModel):
    some_identifier: str

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputA(other_identifier=inputs[0].other_identifier, steps_id="demo steps 1")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
        output = DemoInputB(some_identifier=inputs[0].some_identifier)
        return [output]

class DemoStep(MessageWatcher):
    async def process(self, *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput()
        return [output]

task = await run_workflow_steps()

job = Job(
    inputs=[("DemoInputA", {"other_identifier": "personA2"}), ("DemoInputB", {"some_identifier": "personB2"})],
    params={"demo_param": "demo"},
    search_metadata=["sample_order_id: 1234"],
    workflow_step="DemoStep",
)

workflow = WorkflowModel(
    workflow_id="some_id",
    workflow_name="some_name",
    jobs=[job],
    search_metadata=["project_id: wf_1234"],
)
await order_workflows(workflow)

This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, DemoInputA with other_identifier=personA2 and DemoInputB with some_identifier=personB2).

Note

You can pass additional values in the params dict to identify input that needs to be used for process method.

Please update job pending_timeout and running_timeout to an appropriate value (in seconds) if you expect that a job might take less or more than 24 hours to complete. By default, Maestro will allow a job to stay in pending or running state for maximum of 86400 seconds (24 hours).

Running workflow steps#

run_workflow_steps is an async method that check all artefact and watcher classes in the file and setups NATS and other async tasks required. It accepts three options parameters:

Parameter

Type (list of)

message_watcher_classes

MessageWatcher

file_watcher_classes

FileWatcher

artefact_classes

BaseArtefactModel

Note

If any of the parameter classes are left empty or none, Maestro will set up all the classes for that class.

If you only want to run specific watcher and artefact classes you can pass them to method like follows:

import asyncio

from maestro import run_workflow_steps
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

class DemoInputA(BaseArtefactModel):
    ...

class DemoInputB(BaseArtefactModel):
    ...

class DemoOutput(BaseArtefactModel):
    ...

class DemoFileWatcherA(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.sample"
    async def process(self, *args, **kwargs):
        output = DemoInputA(steps_id="demo steps 1")
        return [output]

class DemoFileWatcherB(FileWatcher):
    path = "tmp/test_filewatcher"
    pattern = ".*.txt"
    async def process(self, *args, **kwargs):
        output = DemoInputB()
        return [output]

class DemoStep(MessageWatcher):
    input_types = {DemoInputA, DemoInputB}
    # parameter
    demo_param: str

    async def process(self, *args, **kwargs):
        # perform any operation here
        # Run some process here to create the output
        output = DemoOutput()
        return [output]

task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])

Logging#

Maestro uses Python’s logging module, same can be used in your app. Several configurations are available; see Logging facility for Python.

Terminology#

Note: Terms explained elsewhere in the table are in italics.

Term

Description

Artefact

Any class subclassing BaseArtefactModel. An artefact represents either results from a processed order or a processed file watcher.

Cypher

Cypher is the query language used to query Neo4j (see official Neo4j docs).

File watcher

Any class that subclasses FileWatcher. Represents a workflow step to watch a specific path for changes (default: added files), and trigger the FileWatcher.process method on any change.

Inspector

[TODO: WIP] System to inspect logs and Neo4j for errors and invalid state.

JetStream

Built-in persistence system for NATS (see official NATS docs).

Job

A job is the result of an order, queued in the Neo4j db as node with a Job-label

Message watcher

Any class inheriting from MessageWatcher. Represents a workflow step that takes input artefacts and, typically, produces some output artefacts. Only ever instantiated by a worker.

Monitoring

[TODO: WIP] API and user interface for monitoring current and past jobs, and (yet to be determined) perform predefined actions such as cancel and restart.

Neo4j

Neo4j is the graph database where all results are stored. The nodes here are either artefact nodes or workflow step nodes, connected with a relationship of type INPUT or OUTPUT, representing a processed job.

Order

An order is a NATS message to request a specific message watcher to do work to the subject workflow_step.<message watcher type>.order.

Worker

A worker is a subprocess responsible for processing a job when it’s ready.

Worker queue

The worker queue contains all orders that are ready to start, waiting to be processed by a worker as soon as system resource situation allows it.

Workflow step

Super-class of MessageWatcher and FileWatcher. Each workflow step is designed to have input and output. The workflow step model has three main meta-components:

  • Details - tracks id, label, datetime, etc. relevant to the workflow step itself.

  • Input and Output - Artefact acting as input and output for the workflow step.

  • Standard error and output log - Standard error/output captured while processing the workflow step.

The actual schema can be found in the API reference.