Technical concepts ================== This page provides an overview of the technical aspects of Maestro, including explanation of key concepts and compontents. Key concepts ------------ Maestro contains generic definitions (base classes) of workflow steps and artefacts composing workflows in graphs: - `Workflow steps`_ are functional steps in a workflow. - `Artefacts`_ are any input/output necessary for workflow steps to perform their respective jobs. The base definition of a workflow step include listening to/publishing messages through NATS and reading/persisting events in a `Neo4j database`_. The library exposes the base classes `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_, `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_ and `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ for applications to subclass and utilize. Architecture ------------ The figure below gives an overview of the architecture used in Maestro: .. image:: ../../img/architecture.png :alt: Maestro architecture :align: center [TODO]: Update figure See `Terminology`_ for definitions of the terms used in the figure and elsewhere in the source. Maestro API ----------- Maestro’s API provides communication with any system (internal or external). It is mostly used for fetching status of processes being ran by Maestro. The API is available at http://localhost:8002/docs (default configuration) when running the service. Maestro UI ---------- The Maestro UI is a web interface that allows users to monitor the status of running and past jobs, and perform certain actions such as invalidating workflows/steps/artefacts and reordering steps. The UI is accessible at http://localhost:8002/ (default configuration). See also the :doc:`UI user manual <manual>`. All rendering is done server-side using `Jinja2 <https://jinja.palletsprojects.com/en/3.0.x/>`_ templates. Neo4j database -------------- To easiest way to access the Neo4j database is through the Neo4j browser, available at http://localhost:7474/browser/ (default configuration). Use `Cypher <https://neo4j.com/docs/cypher-manual/current/introduction/>`_ queries to interact with the database. Examples: - Show all nodes in the graph: .. code:: cypher MATCH (n) RETURN n; - Delete all nodes and relationships: .. code:: cypher MATCH (n) DETACH DELETE n; For migrations, see `neo4j-python-migrations <https://pypi.org/project/neo4j-python-migrations/>`_ for details on naming conventions etc. Maestro components and concepts ------------------------------- When using Maestro in an app (see :doc:`getting_started`), your app will rely on Maestro to process and execute workflow operations. In the following we will explain a few components that are important to understand before you start implementing pipelines and corresponding operations. See also the `Terminology`_ section for a brief definition of terms used in the following. Workflow steps ~~~~~~~~~~~~~~ [TODO] Artefacts ~~~~~~~~~ Artefact classes are used to define input and output components of any operation (aka *workflow step*). Maestro provides a `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ to define datatypes for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has the following attributes: .. csv-table:: :header: "Attribute", "Type", "Description" "``id``", "int", "ID for the artefact (*optional*)" "``maestro_id``", "str", "UUID for Maestro internal reference. A value is automatically provided by Maestro, but can optinally be overridden if needed." "``labels``", "list[str]", "Automatically set by Maestro." "``subjects``", "list[str]", "Used by Maestro to set NATS subscription. Override only in case you need to customize how NATS works." You can add more values or override an existing input/output arfetact value type. Example of an artefact model: .. code:: python from maestro import BaseArtefactModel class DemoInput(BaseArtefactModel): ... class DemoOutput(BaseArtefactModel): ... .. warning:: Make sure that you always use Maestro's model class (`BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_) to define the artefact model in your pipeline/operations or workflow step. File watcher ~~~~~~~~~~~~ The `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_ class from Maestro can be used whenever your app needs to pick up one or more files to process. The base class is quite simple and can be extended as per pipelines needs. The file watcher will watch for changes in the directory defined in ``path`` and perform operations only on newly added files. .. warning:: It is important that the app is up and running when you add files to the directory. To define a file watcher class in your pipeline you will need five components: .. csv-table:: :header: "Component", "Description" "``path``", "Path to the directory where files to be processed are located." "``pattern``", "Regex pattern of the filenames you wish to process. With this you can further refine which files you want to process in the path directory." "``priority``", "This component is NOT mandatory. Can be integer 1, 2 or 3 to setup priority of a file watcher class; defaults to 1 (lowest/normal)." "``async def process``", "The actual operation you wish to perform on the file. **Note:** The method should always be defined as ``async``." "``inputs``", "Data model of your input. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed." "``output``", "Data model of your output. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed." Here’s an example for a file watcher class. Note that you will always have to use the `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_ class from Maestro to define the file watcher class in your app. .. code:: python from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher class DemoInput(BaseArtefactModel): ... class DemoInputAlternative(FileArtefactModel): ... class DemoOutput(BaseArtefactModel): ... class DemoFileWatcher(FileWatcher): path = "/tmp/test_filewatcher" pattern = ".*.txt" async def process(self, *args, **kwargs): output = DemoOutput() return [output] There’s an optional filter class variable you can override from `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_. The default filter specifies to only watch for newly added files. Message watcher ~~~~~~~~~~~~~~~ This is probably the component you will use to define most steps in a workflow. The message watcher class in Maestro is designed to watch all subclasses of itself and perform operations when all the inputs for a defined workflow step are found. The implementation of message watcher subclasses looks very similar to file watcher, but has fewer components: .. csv-table:: :header: "Component", "Description" "Parameters", "Optional parameters that can be passed to the step process/operation" "``path``", "Path to the directory where files to be processed are located." "``async def process``", "The actual operation you wish to perform on the file. **Note:** The method should always be defined as ``async``." "``inputs``", "Data model of your input. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed." "``output``", "Data model of your output. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed." Here’s an example of a message watcher class. Note that you will always have to use the `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_ class from Maestro to define the message watcher class in your app. .. code:: python from maestro import BaseArtefactModel, MessageWatcher class DemoInputA(BaseArtefactModel): ... class DemoInputB(BaseArtefactModel): ... class DemoOutput(BaseArtefactModel): ... class DemoStep(MessageWatcher): async def process(self, *args, **kwargs): # perform any operation here # Run some process here to create the output output = DemoOutput() return [output] Order job ~~~~~~~~~ To place an order for a `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_, you need to create a Job like this: .. code:: python import asyncio from maestro import run_workflow_steps, order_workflow from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job class DemoInputA(BaseArtefactModel): other_identifier: str class DemoInputB(BaseArtefactModel): some_identifier: str class DemoOutput(BaseArtefactModel): ... class DemoFileWatcherA(FileWatcher): path = "tmp/test_filewatcher" pattern = ".*.sample" async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs): output = DemoInputA(other_identifier=inputs[0].other_identifier, steps_id="demo steps 1") return [output] class DemoFileWatcherB(FileWatcher): path = "tmp/test_filewatcher" pattern = ".*.txt" async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs): output = DemoInputB(some_identifier=inputs[0].some_identifier) return [output] class DemoStep(MessageWatcher): async def process(self, *args, **kwargs): # perform any operation here # Run some process here to create the output output = DemoOutput() return [output] task = await run_workflow_steps() job = Job( inputs=[("DemoInputA", {"other_identifier": "personA2"}), ("DemoInputB", {"some_identifier": "personB2"})], params={"demo_param": "demo"}, search_metadata=["sample_order_id: 1234"], workflow_step="DemoStep", ) workflow = WorkflowModel( workflow_id="some_id", workflow_name="some_name", jobs=[job], search_metadata=["project_id: wf_1234"], ) await order_workflows(workflow) This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, ``DemoInputA`` with ``other_identifier=personA2`` and ``DemoInputB`` with ``some_identifier=personB2``). .. note:: You can pass additional values in the params dict to identify input that needs to be used for process method. Please update job ``pending_timeout`` and ``running_timeout`` to an appropriate value (in seconds) if you expect that a job might take less or more than 24 hours to complete. By default, Maestro will allow a job to stay in pending or running state for maximum of 86400 seconds (24 hours). Running workflow steps ~~~~~~~~~~~~~~~~~~~~~~ ``run_workflow_steps`` is an ``async`` method that check all artefact and watcher classes in the file and setups ``NATS`` and other async tasks required. It accepts three options parameters: .. csv-table:: :header: "Parameter", "Type (list of)" "``message_watcher_classes``", "`MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_" "``file_watcher_classes``", "`FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_" "``artefact_classes``", "`BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_" .. note:: If any of the parameter classes are left empty or none, Maestro will set up all the classes for that class. If you only want to run specific watcher and artefact classes you can pass them to method like follows: .. code:: python import asyncio from maestro import run_workflow_steps from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job class DemoInputA(BaseArtefactModel): ... class DemoInputB(BaseArtefactModel): ... class DemoOutput(BaseArtefactModel): ... class DemoFileWatcherA(FileWatcher): path = "tmp/test_filewatcher" pattern = ".*.sample" async def process(self, *args, **kwargs): output = DemoInputA(steps_id="demo steps 1") return [output] class DemoFileWatcherB(FileWatcher): path = "tmp/test_filewatcher" pattern = ".*.txt" async def process(self, *args, **kwargs): output = DemoInputB() return [output] class DemoStep(MessageWatcher): input_types = {DemoInputA, DemoInputB} # parameter demo_param: str async def process(self, *args, **kwargs): # perform any operation here # Run some process here to create the output output = DemoOutput() return [output] task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB]) Logging ~~~~~~~ Maestro uses Python’s ``logging`` module, same can be used in your app. Several configurations are available; see `Logging facility for Python <https://docs.python.org/3/library/logging.html>`_. Terminology ----------- *Note:* Terms explained elsewhere in the table are in *italics*. .. table:: =============== ==== Term Description =============== ==== Artefact Any class subclassing `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_. An artefact represents either results from a processed *order* or a processed *file watcher*. Cypher Cypher is the query language used to query Neo4j (see `official Neo4j docs <https://neo4j.com/docs/cypher-manual/current/introduction/>`_). File watcher Any class that subclasses `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_. Represents a workflow step to watch a specific path for changes (default: added files), and trigger the `FileWatcher.process` method on any change. Inspector [TODO: WIP] System to inspect logs and *Neo4j* for errors and invalid state. JetStream Built-in persistence system for *NATS* (see `official NATS docs <https://docs.nats.io/nats-concepts/jetstream>`_). Job A job is the result of an *order*, queued in the Neo4j db as node with a *Job*-label Message watcher Any class inheriting from `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_. Represents a workflow step that takes input *artefacts* and, typically, produces some output *artefacts*. Only ever instantiated by a *worker*. Monitoring [TODO: WIP] API and user interface for monitoring current and past *jobs*, and (yet to be determined) perform predefined actions such as cancel and restart. Neo4j Neo4j is the graph database where all results are stored. The nodes here are either *artefact* nodes or *workflow step* nodes, connected with a relationship of type ``INPUT`` or ``OUTPUT``, representing a processed *job*. Order An order is a *NATS* message to request a specific *message watcher* to do work to the subject ``workflow_step.<message watcher type>.order``. Worker A worker is a subprocess responsible for processing a *job* when it's ready. Worker queue The worker queue contains all orders that are ready to start, waiting to be processed by a *worker* as soon as system resource situation allows it. Workflow step Super-class of `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_ and `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_. Each workflow step is designed to have input and output. The workflow step model has three main meta-components: - Details - tracks id, label, datetime, etc. relevant to the workflow step itself. - Input and Output - Artefact acting as input and output for the workflow step. - Standard error and output log - Standard error/output captured while processing the workflow step. The actual schema can be found in the `API reference <../maestro.html#module-maestro.models>`_. =============== ====