Technical concepts#
This page provides an overview of the technical aspects of Maestro, including explanation of key concepts and compontents.
Key concepts#
Maestro contains generic definitions (base classes) of workflow steps and artefacts composing workflows in graphs:
Workflow steps are functional steps in a workflow.
Artefacts are any input/output necessary for workflow steps to perform their respective jobs.
The base definition of a workflow step include listening to/publishing messages through NATS and reading/persisting events in a Neo4j database. The library exposes the base classes MessageWatcher, FileWatcher and BaseArtefactModel for applications to subclass and utilize.
Architecture#
The figure below gives an overview of the architecture used in Maestro:

[TODO]: Update figure
See Terminology for definitions of the terms used in the figure and elsewhere in the source.
Maestro API#
Maestro’s API provides communication with any system (internal or external). It is mostly used for fetching status of processes being ran by Maestro.
The API is available at http://localhost:8002/docs (default configuration) when running the service.
Maestro UI#
The Maestro UI is a web interface that allows users to monitor the status of running and past jobs, and perform certain actions such as invalidating workflows/steps/artefacts and reordering steps. The UI is accessible at http://localhost:8002/ (default configuration). See also the UI user manual.
All rendering is done server-side using Jinja2 templates.
Neo4j database#
To easiest way to access the Neo4j database is through the Neo4j browser, available at http://localhost:7474/browser/ (default configuration).
Use Cypher queries to interact with the database. Examples:
Show all nodes in the graph:
MATCH (n) RETURN n;
Delete all nodes and relationships:
MATCH (n) DETACH DELETE n;
For migrations, see neo4j-python-migrations for details on naming conventions etc.
Maestro components and concepts#
When using Maestro in an app (see Getting started), your app will rely on Maestro to process and execute workflow operations. In the following we will explain a few components that are important to understand before you start implementing pipelines and corresponding operations.
See also the Terminology section for a brief definition of terms used in the following.
Workflow steps#
[TODO]
Artefacts#
Artefact classes are used to define input and output components of any operation (aka workflow step). Maestro provides a BaseArtefactModel to define datatypes for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has the following attributes:
Attribute |
Type |
Description |
---|---|---|
|
int |
ID for the artefact (optional) |
|
str |
UUID for Maestro internal reference. A value is automatically provided by Maestro, but can optinally be overridden if needed. |
|
list[str] |
Automatically set by Maestro. |
|
list[str] |
Used by Maestro to set NATS subscription. Override only in case you need to customize how NATS works. |
You can add more values or override an existing input/output arfetact value type. Example of an artefact model:
from maestro import BaseArtefactModel
class DemoInput(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
Warning
Make sure that you always use Maestro’s model class (BaseArtefactModel) to define the artefact model in your pipeline/operations or workflow step.
File watcher#
The FileWatcher class from Maestro can be used whenever your app needs to pick up one or more files to process. The base class is quite simple and can be extended as per pipelines needs. The file watcher will watch for changes in the directory defined in path
and perform operations only on newly added files.
Warning
It is important that the app is up and running when you add files to the directory.
To define a file watcher class in your pipeline you will need five components:
Component |
Description |
---|---|
|
Path to the directory where files to be processed are located. |
|
Regex pattern of the filenames you wish to process. With this you can further refine which files you want to process in the path directory. |
|
This component is NOT mandatory. Can be integer 1, 2 or 3 to setup priority of a file watcher class; defaults to 1 (lowest/normal). |
|
The actual operation you wish to perform on the file. Note: The method should always be defined as |
|
Data model of your input. Use existing default or extend BaseArtefactModel as needed. |
|
Data model of your output. Use existing default or extend BaseArtefactModel as needed. |
Here’s an example for a file watcher class. Note that you will always have to use the FileWatcher class from Maestro to define the file watcher class in your app.
from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher
class DemoInput(BaseArtefactModel):
...
class DemoInputAlternative(FileArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoFileWatcher(FileWatcher):
path = "/tmp/test_filewatcher"
pattern = ".*.txt"
async def process(self, *args, **kwargs):
output = DemoOutput()
return [output]
There’s an optional filter class variable you can override from FileWatcher. The default filter specifies to only watch for newly added files.
Message watcher#
This is probably the component you will use to define most steps in a workflow. The message watcher class in Maestro is designed to watch all subclasses of itself and perform operations when all the inputs for a defined workflow step are found.
The implementation of message watcher subclasses looks very similar to file watcher, but has fewer components:
Component |
Description |
---|---|
Parameters |
Optional parameters that can be passed to the step process/operation |
|
Path to the directory where files to be processed are located. |
|
The actual operation you wish to perform on the file. Note: The method should always be defined as |
|
Data model of your input. Use existing default or extend BaseArtefactModel as needed. |
|
Data model of your output. Use existing default or extend BaseArtefactModel as needed. |
Here’s an example of a message watcher class. Note that you will always have to use the MessageWatcher class from Maestro to define the message watcher class in your app.
from maestro import BaseArtefactModel, MessageWatcher
class DemoInputA(BaseArtefactModel):
...
class DemoInputB(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoStep(MessageWatcher):
async def process(self, *args, **kwargs):
# perform any operation here
# Run some process here to create the output
output = DemoOutput()
return [output]
Order job#
To place an order for a MessageWatcher, you need to create a Job like this:
import asyncio
from maestro import run_workflow_steps, order_workflow
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job
class DemoInputA(BaseArtefactModel):
other_identifier: str
class DemoInputB(BaseArtefactModel):
some_identifier: str
class DemoOutput(BaseArtefactModel):
...
class DemoFileWatcherA(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.sample"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoInputA(other_identifier=inputs[0].other_identifier, steps_id="demo steps 1")
return [output]
class DemoFileWatcherB(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.txt"
async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
output = DemoInputB(some_identifier=inputs[0].some_identifier)
return [output]
class DemoStep(MessageWatcher):
async def process(self, *args, **kwargs):
# perform any operation here
# Run some process here to create the output
output = DemoOutput()
return [output]
task = await run_workflow_steps()
job = Job(
inputs=[("DemoInputA", {"other_identifier": "personA2"}), ("DemoInputB", {"some_identifier": "personB2"})],
params={"demo_param": "demo"},
search_metadata=["sample_order_id: 1234"],
workflow_step="DemoStep",
)
workflow = WorkflowModel(
workflow_id="some_id",
workflow_name="some_name",
jobs=[job],
search_metadata=["project_id: wf_1234"],
)
await order_workflows(workflow)
This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, DemoInputA
with other_identifier=personA2
and DemoInputB
with some_identifier=personB2
).
Note
You can pass additional values in the params dict to identify input that needs to be used for process method.
Please update job pending_timeout
and running_timeout
to an appropriate value (in seconds) if you expect that a job might take less or more than 24 hours to complete. By default, Maestro will allow a job to stay in pending or running state for maximum of 86400 seconds (24 hours).
Running workflow steps#
run_workflow_steps
is an async
method that check all artefact and watcher classes in the file and setups NATS
and other async tasks required. It accepts three options parameters:
Parameter |
Type (list of) |
---|---|
|
|
|
|
|
Note
If any of the parameter classes are left empty or none, Maestro will set up all the classes for that class.
If you only want to run specific watcher and artefact classes you can pass them to method like follows:
import asyncio
from maestro import run_workflow_steps
from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job
class DemoInputA(BaseArtefactModel):
...
class DemoInputB(BaseArtefactModel):
...
class DemoOutput(BaseArtefactModel):
...
class DemoFileWatcherA(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.sample"
async def process(self, *args, **kwargs):
output = DemoInputA(steps_id="demo steps 1")
return [output]
class DemoFileWatcherB(FileWatcher):
path = "tmp/test_filewatcher"
pattern = ".*.txt"
async def process(self, *args, **kwargs):
output = DemoInputB()
return [output]
class DemoStep(MessageWatcher):
input_types = {DemoInputA, DemoInputB}
# parameter
demo_param: str
async def process(self, *args, **kwargs):
# perform any operation here
# Run some process here to create the output
output = DemoOutput()
return [output]
task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])
Logging#
Maestro uses Python’s logging
module, same can be used in your app. Several configurations are available; see Logging facility for Python.
Terminology#
Note: Terms explained elsewhere in the table are in italics.
Term |
Description |
---|---|
Artefact |
Any class subclassing BaseArtefactModel. An artefact represents either results from a processed order or a processed file watcher. |
Cypher |
Cypher is the query language used to query Neo4j (see official Neo4j docs). |
File watcher |
Any class that subclasses FileWatcher. Represents a workflow step to watch a specific path for changes (default: added files), and trigger the FileWatcher.process method on any change. |
Inspector |
[TODO: WIP] System to inspect logs and Neo4j for errors and invalid state. |
JetStream |
Built-in persistence system for NATS (see official NATS docs). |
Job |
A job is the result of an order, queued in the Neo4j db as node with a Job-label |
Message watcher |
Any class inheriting from MessageWatcher. Represents a workflow step that takes input artefacts and, typically, produces some output artefacts. Only ever instantiated by a worker. |
Monitoring |
[TODO: WIP] API and user interface for monitoring current and past jobs, and (yet to be determined) perform predefined actions such as cancel and restart. |
Neo4j |
Neo4j is the graph database where all results are stored. The nodes here are either artefact nodes or workflow step nodes, connected with a relationship of type |
Order |
An order is a NATS message to request a specific message watcher to do work to the subject |
Worker |
A worker is a subprocess responsible for processing a job when it’s ready. |
Worker queue |
The worker queue contains all orders that are ready to start, waiting to be processed by a worker as soon as system resource situation allows it. |
Workflow step |
Super-class of MessageWatcher and FileWatcher. Each workflow step is designed to have input and output. The workflow step model has three main meta-components:
The actual schema can be found in the API reference. |