Skip to content

Workflow Base Classes

Base classes for creating custom workflows.

Classes

Workflow

Abstract base class for workflow implementations.

from mamba_agents import Workflow, WorkflowConfig, WorkflowState

class MyWorkflow(Workflow[None, str, dict]):
    @property
    def name(self) -> str:
        return "my_workflow"

    def _create_initial_state(self, prompt: str) -> WorkflowState[dict]:
        return WorkflowState(context={"prompt": prompt})

    async def _execute(self, prompt, state, deps=None) -> str:
        # Implementation
        return "result"

WorkflowState

Tracks workflow progress and execution history.

WorkflowStep

Represents a single step in workflow execution.

WorkflowResult

Result of workflow execution.

API Reference

Workflow

Workflow(
    config: WorkflowConfig | None = None,
    hooks: WorkflowHooks[StateT, OutputT] | None = None,
)

Bases: ABC

Abstract base class for agentic workflows.

Workflows orchestrate Agent instances through multi-step execution patterns. Subclasses implement specific workflow types (ReAct, Plan-Execute, Reflection).

Example

from mamba_agents import Agent from mamba_agents.workflows import Workflow, WorkflowConfig

Create a custom workflow by extending Workflow

class MyWorkflow(Workflow[None, str, dict]): ... @property ... def name(self) -> str: ... return "my_workflow" ... ... def _create_initial_state(self, prompt: str) -> WorkflowState[dict]: ... return WorkflowState(context={"prompt": prompt}) ... ... async def _execute(self, prompt, state, deps): ... # Implement workflow logic ... return "result"

Initialize workflow.

PARAMETER DESCRIPTION
config

Workflow execution configuration.

TYPE: WorkflowConfig | None DEFAULT: None

hooks

Optional hooks for observability.

TYPE: WorkflowHooks[StateT, OutputT] | None DEFAULT: None

Source code in src/mamba_agents/workflows/base.py
def __init__(
    self,
    config: WorkflowConfig | None = None,
    hooks: WorkflowHooks[StateT, OutputT] | None = None,
) -> None:
    """Initialize workflow.

    Args:
        config: Workflow execution configuration.
        hooks: Optional hooks for observability.
    """
    from mamba_agents.workflows.config import WorkflowConfig
    from mamba_agents.workflows.hooks import WorkflowHooks

    self._config = config or WorkflowConfig()
    self._hooks: WorkflowHooks[StateT, OutputT] = hooks or WorkflowHooks()

name abstractmethod property

name: str

Get the workflow name/type identifier.

RETURNS DESCRIPTION
str

Workflow type identifier.

config property

Get workflow configuration.

hooks property

hooks: WorkflowHooks[StateT, OutputT]

Get workflow hooks.

run async

run(
    prompt: str, deps: DepsT | None = None
) -> WorkflowResult[OutputT, StateT]

Run the workflow.

PARAMETER DESCRIPTION
prompt

Initial user prompt/task.

TYPE: str

deps

Optional dependencies for agent calls.

TYPE: DepsT | None DEFAULT: None

RETURNS DESCRIPTION
WorkflowResult[OutputT, StateT]

WorkflowResult with output and execution details.

Source code in src/mamba_agents/workflows/base.py
async def run(
    self,
    prompt: str,
    deps: DepsT | None = None,
) -> WorkflowResult[OutputT, StateT]:
    """Run the workflow.

    Args:
        prompt: Initial user prompt/task.
        deps: Optional dependencies for agent calls.

    Returns:
        WorkflowResult with output and execution details.
    """
    start_time = datetime.now(UTC)
    state = self._create_initial_state(prompt)

    try:
        # Trigger workflow start hook
        if self._config.enable_hooks:
            await self._hooks.trigger_workflow_start(state)

        # Execute workflow with timeout
        if self._config.timeout_seconds:
            output = await asyncio.wait_for(
                self._execute(prompt, state, deps),
                timeout=self._config.timeout_seconds,
            )
        else:
            output = await self._execute(prompt, state, deps)

        state.is_complete = True
        duration = (datetime.now(UTC) - start_time).total_seconds()

        result: WorkflowResult[OutputT, StateT] = WorkflowResult.ok(
            output=output,
            state=state,
            duration=duration,
            reason="completed",
        )

        # Trigger workflow complete hook
        if self._config.enable_hooks:
            await self._hooks.trigger_workflow_complete(result)

        return result

    except TimeoutError:
        duration = (datetime.now(UTC) - start_time).total_seconds()
        state.is_failed = True
        return WorkflowResult.fail(
            error=f"Workflow exceeded timeout of {self._config.timeout_seconds}s",
            state=state,
            duration=duration,
            reason="timeout",
        )

    except Exception as e:
        duration = (datetime.now(UTC) - start_time).total_seconds()
        state.is_failed = True

        # Trigger error hook
        if self._config.enable_hooks:
            await self._hooks.trigger_workflow_error(state, e)

        return WorkflowResult.fail(
            error=str(e),
            state=state,
            duration=duration,
            reason="error",
        )

run_sync

run_sync(
    prompt: str, deps: DepsT | None = None
) -> WorkflowResult[OutputT, StateT]

Run the workflow synchronously.

PARAMETER DESCRIPTION
prompt

Initial user prompt/task.

TYPE: str

deps

Optional dependencies for agent calls.

TYPE: DepsT | None DEFAULT: None

RETURNS DESCRIPTION
WorkflowResult[OutputT, StateT]

WorkflowResult with output and execution details.

Source code in src/mamba_agents/workflows/base.py
def run_sync(
    self,
    prompt: str,
    deps: DepsT | None = None,
) -> WorkflowResult[OutputT, StateT]:
    """Run the workflow synchronously.

    Args:
        prompt: Initial user prompt/task.
        deps: Optional dependencies for agent calls.

    Returns:
        WorkflowResult with output and execution details.
    """
    return asyncio.run(self.run(prompt, deps))

WorkflowState dataclass

WorkflowState(
    current_step: int = 1,
    total_steps: int = 0,
    iteration_count: int = 0,
    is_complete: bool = False,
    is_failed: bool = False,
    steps: list[WorkflowStep[Any]] = list(),
    context: StateT | None = None,
    metadata: dict[str, Any] = dict(),
)

Current state of workflow execution.

Tracks progress, decisions, and execution history.

ATTRIBUTE DESCRIPTION
current_step

Current step number (1-indexed).

TYPE: int

total_steps

Total steps executed so far.

TYPE: int

iteration_count

Current iteration count within workflow.

TYPE: int

is_complete

Whether workflow has completed.

TYPE: bool

is_failed

Whether workflow failed.

TYPE: bool

steps

List of all executed steps.

TYPE: list[WorkflowStep[Any]]

context

Workflow-specific state data.

TYPE: StateT | None

metadata

Additional workflow metadata.

TYPE: dict[str, Any]

add_step

add_step(step: WorkflowStep[Any]) -> None

Add a completed step to history.

Source code in src/mamba_agents/workflows/base.py
def add_step(self, step: WorkflowStep[Any]) -> None:
    """Add a completed step to history."""
    self.steps.append(step)
    self.total_steps = len(self.steps)
    self.current_step = self.total_steps + 1

get_latest_step

get_latest_step() -> WorkflowStep[Any] | None

Get the most recent step.

Source code in src/mamba_agents/workflows/base.py
def get_latest_step(self) -> WorkflowStep[Any] | None:
    """Get the most recent step."""
    return self.steps[-1] if self.steps else None

get_step

get_step(step_number: int) -> WorkflowStep[Any] | None

Get step by number (1-indexed).

Source code in src/mamba_agents/workflows/base.py
def get_step(self, step_number: int) -> WorkflowStep[Any] | None:
    """Get step by number (1-indexed)."""
    idx = step_number - 1
    return self.steps[idx] if 0 <= idx < len(self.steps) else None

WorkflowStep dataclass

WorkflowStep(
    step_number: int,
    step_type: str,
    description: str,
    input_data: Any,
    output_data: OutputT | None = None,
    agent_result: Any = None,
    error: str | None = None,
    started_at: datetime = (lambda: now(UTC))(),
    completed_at: datetime | None = None,
    metadata: dict[str, Any] = dict(),
)

A single step in workflow execution.

ATTRIBUTE DESCRIPTION
step_number

Sequential step number (1-indexed).

TYPE: int

step_type

Type identifier (e.g., "agent_call", "decision", "reflection").

TYPE: str

description

Human-readable step description.

TYPE: str

input_data

Input provided to this step.

TYPE: Any

output_data

Output produced by this step.

TYPE: OutputT | None

agent_result

AgentResult if step invoked an agent.

TYPE: Any

error

Error message if step failed.

TYPE: str | None

started_at

Step start timestamp.

TYPE: datetime

completed_at

Step completion timestamp.

TYPE: datetime | None

metadata

Additional step-specific metadata.

TYPE: dict[str, Any]

duration_seconds property

duration_seconds: float | None

Calculate step duration in seconds.

success property

success: bool

Check if step completed successfully.

WorkflowResult dataclass

WorkflowResult(
    success: bool,
    output: OutputT | None = None,
    state: WorkflowState[StateT] | None = None,
    error: str | None = None,
    total_steps: int = 0,
    total_iterations: int = 0,
    duration_seconds: float = 0.0,
    termination_reason: str = "unknown",
)

Result of workflow execution.

ATTRIBUTE DESCRIPTION
success

Whether workflow completed successfully.

TYPE: bool

output

Final workflow output.

TYPE: OutputT | None

state

Final workflow state.

TYPE: WorkflowState[StateT] | None

error

Error message if workflow failed.

TYPE: str | None

total_steps

Total steps executed.

TYPE: int

total_iterations

Total iterations across all steps.

TYPE: int

duration_seconds

Total execution time.

TYPE: float

termination_reason

Why the workflow stopped.

TYPE: str

ok classmethod

ok(
    output: OutputT,
    state: WorkflowState[StateT],
    duration: float,
    reason: str = "completed",
) -> WorkflowResult[OutputT, StateT]

Create successful result.

Source code in src/mamba_agents/workflows/base.py
@classmethod
def ok(
    cls,
    output: OutputT,
    state: WorkflowState[StateT],
    duration: float,
    reason: str = "completed",
) -> WorkflowResult[OutputT, StateT]:
    """Create successful result."""
    return cls(
        success=True,
        output=output,
        state=state,
        total_steps=state.total_steps,
        total_iterations=state.iteration_count,
        duration_seconds=duration,
        termination_reason=reason,
    )

fail classmethod

fail(
    error: str,
    state: WorkflowState[StateT] | None = None,
    duration: float = 0.0,
    reason: str = "error",
) -> WorkflowResult[Any, StateT]

Create failed result.

Source code in src/mamba_agents/workflows/base.py
@classmethod
def fail(
    cls,
    error: str,
    state: WorkflowState[StateT] | None = None,
    duration: float = 0.0,
    reason: str = "error",
) -> WorkflowResult[Any, StateT]:
    """Create failed result."""
    return cls(
        success=False,
        error=error,
        state=state,
        total_steps=state.total_steps if state else 0,
        total_iterations=state.iteration_count if state else 0,
        duration_seconds=duration,
        termination_reason=reason,
    )