maestro.workflow_step package#

Submodules#

maestro.workflow_step.abstract_workflow_step module#

class maestro.workflow_step.abstract_workflow_step.AbstractWorkflowStep[source]#

Bases: ABC

abstract classmethod labels() list[str][source]#
abstract async process(*args: Any, **kwargs: Any) Sequence[BaseArtefactModel][source]#
abstract classmethod subjects() set[str][source]#

maestro.workflow_step.file_watcher module#

class maestro.workflow_step.file_watcher.AddedFilesFilter(*, ignore_dirs: Sequence[str] | None = None, ignore_entity_patterns: Sequence[str] | None = None, ignore_paths: Sequence[Path | str] | None = None)[source]#

Bases: DefaultFilter

watchfiles filter for only added files

class maestro.workflow_step.file_watcher.FileWatcher[source]#

Bases: WorkflowStepBase

Monitor specific paths for changes.

Subclass this file with a path and a process method to create a watcher.

Class variables:

path: Path to watch pattern: Regex pattern to match absolute path of filenames filter: Filter to apply to watchfiles (default: AddedFilesFilter - only watch for added files)

Wrapper around watchfiles.awatch. See documentation for more details: https://watchfiles.helpmanual.io/

filter: ClassVar[BaseFilter] = AddedFilesFilter(_ignore_dirs={'.git', '.svn', 'node_modules', '.tox', '.hypothesis', '.venv', '__pycache__', '.mypy_cache', '.idea', '.pytest_cache', '.hg'}, _ignore_entity_regexes=(re.compile('\\.py[cod]$'), re.compile('\\.___jb_...___$'), re.compile('\\.sw.$'), re.compile('~$'), re.compile('^\\.\\#'), re.compile('^\\.DS_Store$'), re.compile('^flycheck_')), _ignore_paths=())#
classmethod labels() list[str][source]#
path: ClassVar[str | Path]#
pattern: ClassVar[str | None] = None#
priority: ClassVar[JobPriorityEnum | None] = 1#
async classmethod run() None[source]#
classmethod workflow_id_and_name(path: Path) tuple[Annotated[UUID, UuidVersion(uuid_version=4)], str][source]#

This can be overridden to return a specific workflow_id and workflow_name, possibly derived from the path or file contents.

maestro.workflow_step.message_watcher module#

class maestro.workflow_step.message_watcher.MessageWatcher[source]#

Bases: WorkflowStepBase

Base class for watching NATS messages on “order.workflow_step.<type>” and “artefact.<artefact type>.*” (where artefact type is the input types)

When an order is ready to process, the process method is called with the inputs as arguments by a worker.

classmethod labels() list[str][source]#

maestro.workflow_step.workflow_step module#

class maestro.workflow_step.workflow_step.WorkflowStepBase[source]#

Bases: AbstractWorkflowStep

concurrency_limit: ClassVar[int | None] = None#
async halt() None[source]#

Temporarily halt workflow step.

Returns when job is no longer halted, i.e. either resumed or cancelled via the UI.

classmethod labels() list[str][source]#
async publish(workflow_step: WorkflowStepModelWithArtefacts) None[source]#
classmethod subjects() set[str][source]#