maestro.neo4j package#
Submodules#
maestro.neo4j.invalidate module#
- async maestro.neo4j.invalidate.invalidate_artefacts(neo4j_tx: Neo4jTransaction, maestro_ids: list[Annotated[UUID, UuidVersion(uuid_version=4)]], propagate: bool = False) None [source]#
Invalidate artefacts and optionally propagate invalidation to dependent workflow steps.
- Parameters:
neo4j_tx – Neo4j transaction
maestro_ids – List of artefact maestro IDs to invalidate
propagate – If True, invalidate workflow steps using these artefacts as input
This will: 1. Invalidate the specified artefacts and their directly related nested artefacts 2. Invalidate workflow steps if all their (workflow step) outputs are invalidated 3. Optionally propagate invalidation to dependent workflow steps
- async maestro.neo4j.invalidate.invalidate_workflow_steps(neo4j_tx: Neo4jTransaction, maestro_ids: list[Annotated[UUID, UuidVersion(uuid_version=4)]], propagate: bool = False) None [source]#
Invalidating a workflow step involves invalidatings its outputs and stdout/stderr nodes.
- async maestro.neo4j.invalidate.invalidate_workflows(neo4j_tx: Neo4jTransaction, maestro_ids: list[Annotated[UUID, UuidVersion(uuid_version=4)]], propagate: bool = False) None [source]#
maestro.neo4j.neo4j module#
- class maestro.neo4j.neo4j.Neo4jTransaction(uri: str | None = None, db_name: str | None = None, user: str | None = None, pwd: str | None = None, autocommit: bool = True)[source]#
Bases:
object
- async acquire_lock() None [source]#
Acquire a distributed lock using Neo4j’s built-in locking mechanism.
The n_locked property value itself is not important - the MERGE operation is what provides the distributed locking by acquiring an exclusive lock on the __LockNode. Setting n_locked is just a side effect of needing to do a write operation to get the lock.
- async maestro.neo4j.neo4j.provide_neo4j_transaction() AsyncGenerator[Neo4jTransaction, None] [source]#
- pydantic model maestro.neo4j.neo4j.Query[source]#
Bases:
BaseModel
Show JSON schema
{ "title": "Query", "type": "object", "properties": { "query": { "title": "Query", "type": "string" }, "parameters": { "title": "Parameters", "type": "object" } }, "required": [ "query" ] }
- Fields:
- field parameters: dict [Optional]#
- field query: str [Required]#
- pydantic model maestro.neo4j.neo4j.TransactionWriteQueries[source]#
Bases:
BaseModel
Show JSON schema
{ "title": "TransactionWriteQueries", "type": "object", "properties": { "host": { "title": "Host", "type": "string" }, "server_address": { "title": "Server Address", "type": "string" }, "database_id": { "title": "Database Id", "type": "string" }, "queries": { "items": { "$ref": "#/$defs/Query" }, "title": "Queries", "type": "array" } }, "$defs": { "Query": { "properties": { "query": { "title": "Query", "type": "string" }, "parameters": { "title": "Parameters", "type": "object" } }, "required": [ "query" ], "title": "Query", "type": "object" } }, "required": [ "host", "server_address", "database_id" ] }
- field database_id: str [Required]#
- field host: str [Required]#
- field server_address: str [Required]#
maestro.neo4j.neo4j_migrations module#
maestro.neo4j.neo4j_utils module#
- async maestro.neo4j.neo4j_utils.create_node(neo4j_tx: Neo4jTransaction, maestro_object: MaestroBase) UUID [source]#
Create node(s) in Neo4j from a MaestroBase object. Will create nested nodes and relationships, if any.
- async maestro.neo4j.neo4j_utils.create_relationship(neo4j_tx: Neo4jTransaction, start_maestro_id: UUID, end_maestro_id: UUID, relationship_type: str, attributes: dict | None = None) list[Record] [source]#
Create relationship between two nodes in Neo4j. If attributes are provided, they will be added to the relationship.
- async maestro.neo4j.neo4j_utils.fetch_input(neo4j_tx: Neo4jTransaction, input_class: type[BaseArtefactModel], input_filter: dict) BaseArtefactModel [source]#
- async maestro.neo4j.neo4j_utils.fetch_inputs(neo4j_tx: Neo4jTransaction, inputs: dict[str, tuple[~typing.Annotated[type[~maestro.models.BaseArtefactModel], ~pydantic.functional_serializers.PlainSerializer(func=~maestro.models.BaseArtefactModel.class_serializer, return_type=PydanticUndefined, when_used=always), ~pydantic.functional_validators.PlainValidator(func=~maestro.models.BaseArtefactModel.class_validator, json_schema_input_type=~typing.Any), ~pydantic.json_schema.WithJsonSchema(json_schema={'additionalProperties': True, 'properties': {'invalidated_at': {'anyOf': [{'format': 'date-time', 'type': 'string'}, {'type': 'null'}], 'default': None, 'title': 'Invalidated At'}, 'maestro_id': {'description': 'Unique identifier', 'format': 'uuid4', 'title': 'Maestro Id', 'type': 'string'}, 'additional_labels': {'default': [], 'description': 'Additional labels to be used in Neo4j', 'items': {'type': 'string'}, 'title': 'Additional Labels', 'type': 'array'}}, 'title': 'BaseArtefactModel', 'type': 'object'}, mode=None)], dict]]) dict[str, BaseArtefactModel] [source]#
- async maestro.neo4j.neo4j_utils.fetch_node_by_rel(neo4j_tx: Neo4jTransaction, start_node: type[MaestroBase], params: dict, end_node: type[MaestroBaseType]) MaestroBaseType | None [source]#
- async maestro.neo4j.neo4j_utils.filter_duplicate_jobs(neo4j_tx: Neo4jTransaction, jobs: set[Job]) tuple[set[Job], set[Job]] [source]#
Filter jobs and return those that have no match in the database as well the matching ones from the database.
- async maestro.neo4j.neo4j_utils.get_valid_existing_job(neo4j_tx: Neo4jTransaction, job: Job) Job | None [source]#
Check for, and return, valid existing job matching input job, where “valid” means
The job has the same hash (i.e. same params, inputs, and workflow step)
The job is in a valid state (not cancelled or failed)
The job is not tied to an invalidated workflow_step
- async maestro.neo4j.neo4j_utils.merge_job_node(neo4j_tx: Neo4jTransaction, job: Job) None [source]#
- async maestro.neo4j.neo4j_utils.read_node(neo4j_tx: Neo4jTransaction, model: type[MaestroBaseType], params: dict, where_clauses: list[str] | None = None) MaestroBaseType | None [source]#
Read node as MaestroBase object from Neo4j database - supports nested nodes
- async maestro.neo4j.neo4j_utils.read_nodes(neo4j_tx: Neo4jTransaction, model: type[MaestroBaseType], params: dict, where_clauses: list[str] | None = None) list[MaestroBaseType] [source]#
Read nodes as MaestroBase object from Neo4j database - supports nested nodes
maestro.neo4j.queue module#
- async maestro.neo4j.queue.get_all_jobs(neo4j_tx: Neo4jTransaction) list[Job] [source]#
- async maestro.neo4j.queue.get_pending_jobs(neo4j_tx: Neo4jTransaction) list[Job] [source]#