Technical concepts
==================

This page provides an overview of the technical aspects of Maestro, including explanation of key concepts and compontents.

Key concepts
------------

Maestro contains generic definitions (base classes) of workflow steps and artefacts composing workflows in graphs:

-  `Workflow steps`_ are functional steps in a workflow.
-  `Artefacts`_ are any input/output necessary for workflow steps to perform their respective jobs.

The base definition of a workflow step include listening to/publishing messages through NATS and reading/persisting events in a `Neo4j database`_. The library exposes the base classes `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_, `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_ and `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ for applications to subclass and utilize.

Architecture
------------

The figure below gives an overview of the architecture used in Maestro:

.. image:: ../../img/architecture.png
   :alt: Maestro architecture
   :align: center

[TODO]: Update figure

See `Terminology`_ for definitions of the terms used in the figure and elsewhere in the source.

Maestro API
-----------

Maestro’s API provides communication with any system (internal or external). It is mostly used for fetching status of processes being ran by Maestro.

The API is available at http://localhost:8002/docs (default configuration) when running the service.

Maestro UI
----------

The Maestro UI is a web interface that allows users to monitor the status of running and past jobs, and perform certain actions such as invalidating workflows/steps/artefacts and reordering steps. The UI is accessible at http://localhost:8002/ (default configuration). See also the :doc:`UI user manual <manual>`.

All rendering is done server-side using `Jinja2 <https://jinja.palletsprojects.com/en/3.0.x/>`_ templates.

Neo4j database
--------------

To easiest way to access the Neo4j database is through the Neo4j browser, available at http://localhost:7474/browser/ (default configuration).

Use `Cypher <https://neo4j.com/docs/cypher-manual/current/introduction/>`_ queries to interact with the database. Examples:

-  Show all nodes in the graph: 

   .. code:: cypher

      MATCH (n) RETURN n;
-  Delete all nodes and relationships:

   .. code:: cypher

      MATCH (n) DETACH DELETE n;

For migrations, see `neo4j-python-migrations <https://pypi.org/project/neo4j-python-migrations/>`_ for details on naming conventions etc.

Maestro components and concepts
-------------------------------

When using Maestro in an app (see :doc:`getting_started`), your app will rely on Maestro to process and execute workflow operations. In the following we will explain a few components that are important to understand before you start implementing pipelines and corresponding operations.

See also the `Terminology`_ section for a brief definition of terms used in the following.

Workflow steps
~~~~~~~~~~~~~~

[TODO]

Artefacts 
~~~~~~~~~

Artefact classes are used to define input and output components of any operation (aka *workflow step*). Maestro provides a `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ to define datatypes for attributes you would like for input/output in a workflow step. The base class model provided by Maestro has the following attributes:

.. csv-table::
    :header: "Attribute", "Type", "Description"

    "``id``",         "int",          "ID for the artefact (*optional*)"
    "``maestro_id``", "str",          "UUID for Maestro internal reference. A value is automatically provided by Maestro, but can optinally be overridden if needed."
    "``labels``",     "list[str]",    "Automatically set by Maestro."
    "``subjects``",   "list[str]",    "Used by Maestro to set NATS subscription. Override only in case you need to customize how NATS works."

You can add more values or override an existing input/output arfetact value type. Example of an artefact model:

.. code:: python

   from maestro import BaseArtefactModel

   class DemoInput(BaseArtefactModel):
       ...

   class DemoOutput(BaseArtefactModel):
       ...

.. warning::

   Make sure that you always use Maestro's model class (`BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_) to define the artefact model in your pipeline/operations or workflow step.

File watcher
~~~~~~~~~~~~

The `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_ class from Maestro can be used whenever your app needs to pick up one or more files to process. The base class is quite simple and can be extended as per pipelines needs. The file watcher will watch for changes in the directory defined in ``path`` and perform operations only on newly added files.

.. warning::

   It is important that the app is up and running when you add files to the directory.

To define a file watcher class in your pipeline you will need five components:

.. csv-table::
    :header: "Component", "Description"

    "``path``",               "Path to the directory where files to be processed are located."
    "``pattern``",            "Regex pattern of the filenames you wish to process. With this you can further refine which files you want to process in the path directory."
    "``priority``",           "This component is NOT mandatory. Can be integer 1, 2 or 3 to setup priority of a file watcher class; defaults to 1 (lowest/normal)."
    "``async def process``",  "The actual operation you wish to perform on the file. **Note:** The method should always be defined as ``async``."
    "``inputs``",             "Data model of your input. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed."
    "``output``",             "Data model of your output. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed."

Here’s an example for a file watcher class. Note that you will always have to use the `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_ class from Maestro to define the file watcher class in your app.

.. code:: python

   from maestro import BaseArtefactModel, FileArtefactModel, FileWatcher

   class DemoInput(BaseArtefactModel):
       ...

   class DemoInputAlternative(FileArtefactModel):
       ...

   class DemoOutput(BaseArtefactModel):
       ...

   class DemoFileWatcher(FileWatcher):

       path = "/tmp/test_filewatcher"
       pattern = ".*.txt"

       async def process(self, *args, **kwargs):
           output = DemoOutput()
           return [output]

There’s an optional filter class variable you can override from `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_. The default filter specifies to only watch for newly added files.

Message watcher
~~~~~~~~~~~~~~~

This is probably the component you will use to define most steps in a workflow. The message watcher class in Maestro is designed to watch all subclasses of itself and perform operations when all the inputs for a defined workflow step are found. 

The implementation of message watcher subclasses looks very similar to file watcher, but has fewer components:

.. csv-table::
    :header: "Component", "Description"

    "Parameters",         "Optional parameters that can be passed to the step process/operation"
    "``path``",               "Path to the directory where files to be processed are located."
    "``async def process``",  "The actual operation you wish to perform on the file. **Note:** The method should always be defined as ``async``."
    "``inputs``",             "Data model of your input. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed."
    "``output``",             "Data model of your output. Use existing default or extend `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_ as needed."

Here’s an example of a message watcher class. Note that you will always have to use the `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_ class from Maestro to define the message watcher class in your app. 

.. code:: python

   from maestro import BaseArtefactModel, MessageWatcher

   class DemoInputA(BaseArtefactModel):
       ...

   class DemoInputB(BaseArtefactModel):
       ...

   class DemoOutput(BaseArtefactModel):
       ...

   class DemoStep(MessageWatcher):
       async def process(self, *args, **kwargs):
           # perform any operation here
           # Run some process here to create the output
           output = DemoOutput()
           return [output]

Order job
~~~~~~~~~

To place an order for a `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_, you need to create a Job like this:

.. code:: python

   import asyncio

   from maestro import run_workflow_steps, order_workflow
   from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

   class DemoInputA(BaseArtefactModel):
       other_identifier: str

   class DemoInputB(BaseArtefactModel):
       some_identifier: str

   class DemoOutput(BaseArtefactModel):
       ...

   class DemoFileWatcherA(FileWatcher):
       path = "tmp/test_filewatcher"
       pattern = ".*.sample"
       async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
           output = DemoInputA(other_identifier=inputs[0].other_identifier, steps_id="demo steps 1")
           return [output]

   class DemoFileWatcherB(FileWatcher):
       path = "tmp/test_filewatcher"
       pattern = ".*.txt"
       async def process(self, inputs: list[BaseArtefactModel], *args, **kwargs):
           output = DemoInputB(some_identifier=inputs[0].some_identifier)
           return [output]

   class DemoStep(MessageWatcher):
       async def process(self, *args, **kwargs):
           # perform any operation here
           # Run some process here to create the output
           output = DemoOutput()
           return [output]

   task = await run_workflow_steps()

   job = Job(
       inputs=[("DemoInputA", {"other_identifier": "personA2"}), ("DemoInputB", {"some_identifier": "personB2"})],
       params={"demo_param": "demo"},
       search_metadata=["sample_order_id: 1234"],
       workflow_step="DemoStep",
   )

   workflow = WorkflowModel(
       workflow_id="some_id",
       workflow_name="some_name",
       jobs=[job],
       search_metadata=["project_id: wf_1234"],
   )
   await order_workflows(workflow)

This will place an order on the queue, which will be picked up and processed when the input artefacts are ready (in this case, ``DemoInputA`` with ``other_identifier=personA2`` and ``DemoInputB`` with ``some_identifier=personB2``).

.. note::

   You can pass additional values in the params dict to identify input that needs to be used for process method.
   
   Please update job ``pending_timeout`` and ``running_timeout`` to an appropriate value (in seconds) if you expect that a job might take less or more than 24 hours to complete. By default, Maestro will allow a job to stay in pending or running state for maximum of 86400 seconds (24 hours).

Running workflow steps
~~~~~~~~~~~~~~~~~~~~~~

``run_workflow_steps`` is an ``async`` method that check all artefact and watcher classes in the file and setups ``NATS`` and other async tasks required. It accepts three options parameters:

.. csv-table::
    :header: "Parameter", "Type (list of)"

    "``message_watcher_classes``",   "`MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_"
    "``file_watcher_classes``",      "`FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_"
    "``artefact_classes``",          "`BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_"

.. note::
   
   If any of the parameter classes are left empty or none, Maestro will set up all the classes for that class.

If you only want to run specific watcher and artefact classes you can pass them to method like follows:

.. code:: python

   import asyncio

   from maestro import run_workflow_steps
   from maestro import BaseArtefactModel, FileWatcher, MessageWatcher, Job

   class DemoInputA(BaseArtefactModel):
       ...

   class DemoInputB(BaseArtefactModel):
       ...

   class DemoOutput(BaseArtefactModel):
       ...

   class DemoFileWatcherA(FileWatcher):
       path = "tmp/test_filewatcher"
       pattern = ".*.sample"
       async def process(self, *args, **kwargs):
           output = DemoInputA(steps_id="demo steps 1")
           return [output]

   class DemoFileWatcherB(FileWatcher):
       path = "tmp/test_filewatcher"
       pattern = ".*.txt"
       async def process(self, *args, **kwargs):
           output = DemoInputB()
           return [output]

   class DemoStep(MessageWatcher):
       input_types = {DemoInputA, DemoInputB}
       # parameter
       demo_param: str

       async def process(self, *args, **kwargs):
           # perform any operation here
           # Run some process here to create the output
           output = DemoOutput()
           return [output]

   task = await run_workflow_steps(message_watcher_classes=[DemoFileWatcherB])

Logging
~~~~~~~

Maestro uses Python’s ``logging`` module, same can be used in your app. Several configurations are available; see `Logging facility for Python <https://docs.python.org/3/library/logging.html>`_.

Terminology
-----------

*Note:* Terms explained elsewhere in the table are in *italics*.

.. table::

   ===============   ====
   Term              Description
   ===============   ====
   Artefact          Any class subclassing `BaseArtefactModel <../maestro.html#maestro.models.BaseArtefactModel>`_. An artefact represents either results from a processed *order* or a processed *file watcher*.
   Cypher            Cypher is the query language used to query Neo4j (see `official Neo4j docs <https://neo4j.com/docs/cypher-manual/current/introduction/>`_).
   File watcher      Any class that subclasses `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_. Represents a workflow step to watch a specific path for changes (default: added files), and trigger the `FileWatcher.process` method on any change.
   Inspector         [TODO: WIP] System to inspect logs and *Neo4j* for errors and invalid state.
   JetStream         Built-in persistence system for *NATS* (see `official NATS docs <https://docs.nats.io/nats-concepts/jetstream>`_).
   Job               A job is the result of an *order*, queued in the Neo4j db as node with a *Job*-label
   Message watcher   Any class inheriting from `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_. Represents a workflow step that takes input *artefacts* and, typically, produces some output *artefacts*. Only ever instantiated by a *worker*.
   Monitoring        [TODO: WIP] API and user interface for monitoring current and past *jobs*, and (yet to be determined) perform predefined actions such as cancel and restart.
   Neo4j             Neo4j is the graph database where all results are stored. The nodes here are either *artefact* nodes or *workflow step* nodes, connected with a relationship of type ``INPUT`` or ``OUTPUT``, representing a processed *job*.
   Order             An order is a *NATS* message to request a specific *message watcher* to do work to the subject ``workflow_step.<message watcher type>.order``.
   Worker            A worker is a subprocess responsible for processing a *job* when it's ready.
   Worker queue      The worker queue contains all orders that are ready to start, waiting to be processed by a *worker* as soon as system resource situation allows it.
   Workflow step     Super-class of `MessageWatcher <../maestro.workflow_step.html#maestro.workflow_step.message_watcher.MessageWatcher>`_ and `FileWatcher <../maestro.workflow_step.html#maestro.workflow_step.file_watcher.FileWatcher>`_. Each workflow step is designed to have input and output. The workflow step model has three main meta-components:

                     - Details - tracks id, label, datetime, etc. relevant to the workflow step itself.
                     - Input and Output - Artefact acting as input and output for the workflow step.
                     - Standard error and output log - Standard error/output captured while processing the workflow step.

                     The actual schema can be found in the `API reference <../maestro.html#module-maestro.models>`_.
   ===============   ====