maestro.neo4j package#

Submodules#

maestro.neo4j.invalidate module#

async maestro.neo4j.invalidate.invalidate_artefacts(neo4j_tx: Neo4jTransaction, maestro_ids: list[Annotated[UUID, UuidVersion(uuid_version=4)]], propagate: bool = False) None[source]#

Invalidate artefacts and optionally propagate invalidation to dependent workflow steps.

Parameters:
  • neo4j_tx – Neo4j transaction

  • maestro_ids – List of artefact maestro IDs to invalidate

  • propagate – If True, invalidate workflow steps using these artefacts as input

This will: 1. Invalidate the specified artefacts and their directly related nested artefacts 2. Invalidate workflow steps if all their (workflow step) outputs are invalidated 3. Optionally propagate invalidation to dependent workflow steps

async maestro.neo4j.invalidate.invalidate_workflow_steps(neo4j_tx: Neo4jTransaction, maestro_ids: list[Annotated[UUID, UuidVersion(uuid_version=4)]], propagate: bool = False) None[source]#

Invalidating a workflow step involves invalidatings its outputs and stdout/stderr nodes.

async maestro.neo4j.invalidate.invalidate_workflows(neo4j_tx: Neo4jTransaction, maestro_ids: list[Annotated[UUID, UuidVersion(uuid_version=4)]], propagate: bool = False) None[source]#

maestro.neo4j.neo4j module#

class maestro.neo4j.neo4j.Neo4jTransaction(uri: str | None = None, db_name: str | None = None, user: str | None = None, pwd: str | None = None, autocommit: bool = True)[source]#

Bases: object

async acquire_lock() None[source]#

Acquire a distributed lock using Neo4j’s built-in locking mechanism.

The n_locked property value itself is not important - the MERGE operation is what provides the distributed locking by acquiring an exclusive lock on the __LockNode. Setting n_locked is just a side effect of needing to do a write operation to get the lock.

async commit() None[source]#
async query(query: str, parameters: BaseModel | dict | None = None) list[Record][source]#
async rollback() None[source]#
maestro.neo4j.neo4j.literal_params_map(m: dict) str[source]#
async maestro.neo4j.neo4j.provide_neo4j_transaction() AsyncGenerator[Neo4jTransaction, None][source]#
maestro.neo4j.neo4j.serialize_params(obj: BaseModel | dict | None) dict[str, Any][source]#
pydantic model maestro.neo4j.neo4j.Query[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "Query",
   "type": "object",
   "properties": {
      "query": {
         "title": "Query",
         "type": "string"
      },
      "parameters": {
         "title": "Parameters",
         "type": "object"
      }
   },
   "required": [
      "query"
   ]
}

Fields:
field parameters: dict [Optional]#
field query: str [Required]#
pydantic model maestro.neo4j.neo4j.TransactionWriteQueries[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "TransactionWriteQueries",
   "type": "object",
   "properties": {
      "host": {
         "title": "Host",
         "type": "string"
      },
      "server_address": {
         "title": "Server Address",
         "type": "string"
      },
      "database_id": {
         "title": "Database Id",
         "type": "string"
      },
      "queries": {
         "items": {
            "$ref": "#/$defs/Query"
         },
         "title": "Queries",
         "type": "array"
      }
   },
   "$defs": {
      "Query": {
         "properties": {
            "query": {
               "title": "Query",
               "type": "string"
            },
            "parameters": {
               "title": "Parameters",
               "type": "object"
            }
         },
         "required": [
            "query"
         ],
         "title": "Query",
         "type": "object"
      }
   },
   "required": [
      "host",
      "server_address",
      "database_id"
   ]
}

Fields:
field database_id: str [Required]#
field host: str [Required]#
field queries: list[Query] [Optional]#
field server_address: str [Required]#

maestro.neo4j.neo4j_migrations module#

maestro.neo4j.neo4j_migrations.run_migrations(neo4j_uri: str, neo4j_user: str, neo4j_password: str) None[source]#

maestro.neo4j.neo4j_utils module#

exception maestro.neo4j.neo4j_utils.MultipleNodesFoundError[source]#

Bases: Exception

async maestro.neo4j.neo4j_utils.create_node(neo4j_tx: Neo4jTransaction, maestro_object: MaestroBase) UUID[source]#

Create node(s) in Neo4j from a MaestroBase object. Will create nested nodes and relationships, if any.

async maestro.neo4j.neo4j_utils.create_relationship(neo4j_tx: Neo4jTransaction, start_maestro_id: UUID, end_maestro_id: UUID, relationship_type: str, attributes: dict | None = None) list[Record][source]#

Create relationship between two nodes in Neo4j. If attributes are provided, they will be added to the relationship.

async maestro.neo4j.neo4j_utils.fetch_input(neo4j_tx: Neo4jTransaction, input_class: type[BaseArtefactModel], input_filter: dict) BaseArtefactModel[source]#
async maestro.neo4j.neo4j_utils.fetch_inputs(neo4j_tx: Neo4jTransaction, inputs: dict[str, tuple[~typing.Annotated[type[~maestro.models.BaseArtefactModel], ~pydantic.functional_serializers.PlainSerializer(func=~maestro.models.BaseArtefactModel.class_serializer, return_type=PydanticUndefined, when_used=always), ~pydantic.functional_validators.PlainValidator(func=~maestro.models.BaseArtefactModel.class_validator, json_schema_input_type=~typing.Any), ~pydantic.json_schema.WithJsonSchema(json_schema={'additionalProperties': True, 'properties': {'invalidated_at': {'anyOf': [{'format': 'date-time', 'type': 'string'}, {'type': 'null'}], 'default': None, 'title': 'Invalidated At'}, 'maestro_id': {'description': 'Unique identifier', 'format': 'uuid4', 'title': 'Maestro Id', 'type': 'string'}, 'additional_labels': {'default': [], 'description': 'Additional labels to be used in Neo4j', 'items': {'type': 'string'}, 'title': 'Additional Labels', 'type': 'array'}}, 'title': 'BaseArtefactModel', 'type': 'object'}, mode=None)], dict]]) dict[str, BaseArtefactModel][source]#
async maestro.neo4j.neo4j_utils.fetch_node_by_rel(neo4j_tx: Neo4jTransaction, start_node: type[MaestroBase], params: dict, end_node: type[MaestroBaseType]) MaestroBaseType | None[source]#
async maestro.neo4j.neo4j_utils.filter_duplicate_jobs(neo4j_tx: Neo4jTransaction, jobs: set[Job]) tuple[set[Job], set[Job]][source]#

Filter jobs and return those that have no match in the database as well the matching ones from the database.

async maestro.neo4j.neo4j_utils.get_valid_existing_job(neo4j_tx: Neo4jTransaction, job: Job) Job | None[source]#

Check for, and return, valid existing job matching input job, where “valid” means

  1. The job has the same hash (i.e. same params, inputs, and workflow step)

  2. The job is in a valid state (not cancelled or failed)

  3. The job is not tied to an invalidated workflow_step

async maestro.neo4j.neo4j_utils.merge_job_node(neo4j_tx: Neo4jTransaction, job: Job) None[source]#
async maestro.neo4j.neo4j_utils.read_node(neo4j_tx: Neo4jTransaction, model: type[MaestroBaseType], params: dict, where_clauses: list[str] | None = None) MaestroBaseType | None[source]#

Read node as MaestroBase object from Neo4j database - supports nested nodes

async maestro.neo4j.neo4j_utils.read_nodes(neo4j_tx: Neo4jTransaction, model: type[MaestroBaseType], params: dict, where_clauses: list[str] | None = None) list[MaestroBaseType][source]#

Read nodes as MaestroBase object from Neo4j database - supports nested nodes

maestro.neo4j.queue module#

async maestro.neo4j.queue.get_all_jobs(neo4j_tx: Neo4jTransaction) list[Job][source]#
async maestro.neo4j.queue.get_pending_jobs(neo4j_tx: Neo4jTransaction) list[Job][source]#