Skip to content

API: Orchestrators

This section covers the classes responsible for executing agent workflows.

safeagent.orchestrator

SimpleOrchestrator

Minimal DAG runner: each node is a function, edges define dependencies, with audit and lineage tagging.

Source code in src/safeagent/orchestrator.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SimpleOrchestrator:
    """Minimal DAG runner: each node is a function, edges define dependencies, with audit and lineage tagging."""

    def __init__(self):
        # Map node name to function
        self.nodes: Dict[str, Callable[..., Any]] = {}
        # Map node name to list of dependent node names
        self.edges: Dict[str, List[str]] = {}
        self.gov = GovernanceManager()

    def add_node(self, name: str, func: Callable[..., Any]):
        """Register a function under the given node name."""
        self.nodes[name] = func
        self.edges.setdefault(name, [])

    def add_edge(self, src: str, dest: str):
        """Specify that 'dest' depends on 'src'."""
        if src not in self.nodes or dest not in self.nodes:
            raise ValueError(f"Either '{src}' or '{dest}' is not registered as a node.")
        self.edges[src].append(dest)

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute all nodes in topological order, audit pipeline start/end, and tag lineage on outputs.

        Args:
            inputs (Dict[str, Any]): Global inputs (e.g., 'user_input', 'user_id').

        Returns:
            Dict[str, Any]: Mapping of node name to its return value.
        """
        results: Dict[str, Any] = {}
        visited = set()

        # Audit pipeline start
        self.gov.audit(user_id=inputs.get("user_id", "system"), action="pipeline_start", resource="orchestrator")

        def execute(node: str):
            if node in visited:
                return results.get(node)
            visited.add(node)
            func = self.nodes[node]
            kwargs = {}
            import inspect
            params = inspect.signature(func).parameters
            for name in params:
                if name in results:
                    kwargs[name] = results[name]
                elif name.startswith("node_") and name[5:] in results:
                    kwargs[name] = results[name[5:]]
                elif name in inputs:
                    kwargs[name] = inputs[name]
            output = func(**kwargs)
            # Tag lineage on dict outputs
            if isinstance(output, dict):
                output = self.gov.tag_lineage(output, source=node)
            results[node] = output
            return output

        for node in self.nodes:
            execute(node)

        # Audit pipeline end
        self.gov.audit(user_id=inputs.get("user_id", "system"), action="pipeline_end", resource="orchestrator")

        return results

add_edge(src, dest)

Specify that 'dest' depends on 'src'.

Source code in src/safeagent/orchestrator.py
19
20
21
22
23
def add_edge(self, src: str, dest: str):
    """Specify that 'dest' depends on 'src'."""
    if src not in self.nodes or dest not in self.nodes:
        raise ValueError(f"Either '{src}' or '{dest}' is not registered as a node.")
    self.edges[src].append(dest)

add_node(name, func)

Register a function under the given node name.

Source code in src/safeagent/orchestrator.py
14
15
16
17
def add_node(self, name: str, func: Callable[..., Any]):
    """Register a function under the given node name."""
    self.nodes[name] = func
    self.edges.setdefault(name, [])

run(inputs)

Execute all nodes in topological order, audit pipeline start/end, and tag lineage on outputs.

Parameters:

Name Type Description Default
inputs Dict[str, Any]

Global inputs (e.g., 'user_input', 'user_id').

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Mapping of node name to its return value.

Source code in src/safeagent/orchestrator.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    Execute all nodes in topological order, audit pipeline start/end, and tag lineage on outputs.

    Args:
        inputs (Dict[str, Any]): Global inputs (e.g., 'user_input', 'user_id').

    Returns:
        Dict[str, Any]: Mapping of node name to its return value.
    """
    results: Dict[str, Any] = {}
    visited = set()

    # Audit pipeline start
    self.gov.audit(user_id=inputs.get("user_id", "system"), action="pipeline_start", resource="orchestrator")

    def execute(node: str):
        if node in visited:
            return results.get(node)
        visited.add(node)
        func = self.nodes[node]
        kwargs = {}
        import inspect
        params = inspect.signature(func).parameters
        for name in params:
            if name in results:
                kwargs[name] = results[name]
            elif name.startswith("node_") and name[5:] in results:
                kwargs[name] = results[name[5:]]
            elif name in inputs:
                kwargs[name] = inputs[name]
        output = func(**kwargs)
        # Tag lineage on dict outputs
        if isinstance(output, dict):
            output = self.gov.tag_lineage(output, source=node)
        results[node] = output
        return output

    for node in self.nodes:
        execute(node)

    # Audit pipeline end
    self.gov.audit(user_id=inputs.get("user_id", "system"), action="pipeline_end", resource="orchestrator")

    return results

safeagent.stateful_orchestrator

EdgeRegistrationError

Bases: OrchestratorError

Raised during an invalid attempt to register an edge.

Source code in src/safeagent/stateful_orchestrator.py
19
20
21
22
23
class EdgeRegistrationError(OrchestratorError):
    """Raised during an invalid attempt to register an edge."""
    def __init__(self, node_name: str, message: str):
        self.node_name = node_name
        super().__init__("{}: '{}'".format(message, node_name))

NodeNotFoundError

Bases: OrchestratorError

Raised when a node name is not found in the graph.

Source code in src/safeagent/stateful_orchestrator.py
13
14
15
16
17
class NodeNotFoundError(OrchestratorError):
    """Raised when a node name is not found in the graph."""
    def __init__(self, node_name: str):
        self.node_name = node_name
        super().__init__("Node '{}' not found in the graph.".format(node_name))

OrchestratorError

Bases: Exception

Base exception for all stateful orchestrator errors.

Source code in src/safeagent/stateful_orchestrator.py
 9
10
11
class OrchestratorError(Exception):
    """Base exception for all stateful orchestrator errors."""
    pass

StateValidationError

Bases: OrchestratorError

Raised when the state does not conform to the defined schema.

Source code in src/safeagent/stateful_orchestrator.py
25
26
27
28
class StateValidationError(OrchestratorError):
    """Raised when the state does not conform to the defined schema."""
    def __init__(self, message: str):
        super().__init__(message)

StatefulOrchestrator

An orchestrator that manages a central state object, allowing for complex, cyclical, and conditional workflows with integrated governance, human-in-the-loop interrupts, and optional state schema validation.

Source code in src/safeagent/stateful_orchestrator.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class StatefulOrchestrator:
    """
    An orchestrator that manages a central state object, allowing for complex,
    cyclical, and conditional workflows with integrated governance, human-in-the-loop
    interrupts, and optional state schema validation.
    """

    def __init__(self, entry_node: str, state_schema: Optional[Dict[str, Type]] = None):
        """
        Initializes the stateful orchestrator.

        Args:
            entry_node (str): The name of the first node to execute in the graph.
            state_schema (Optional[Dict[str, Type]]): An optional schema defining
                expected keys and their Python types in the state object.
        """
        if not isinstance(entry_node, str) or not entry_node:
            raise ValueError("entry_node must be a non-empty string.")

        self.nodes: Dict[str, Callable[[Dict], Dict]] = {}
        self.edges: Dict[str, Callable[[Dict], str]] = {}
        self.entry_node = entry_node
        self.state_schema = state_schema
        self.gov = GovernanceManager()

    def add_node(self, name: str, func: Callable[[Dict], Dict]):
        self.nodes[name] = func

    def add_edge(self, src: str, dest: str):
        if src not in self.nodes:
            raise EdgeRegistrationError(src, "Source node for edge is not registered")
        if dest not in self.nodes and dest not in ("__end__", "__interrupt__"):
             raise EdgeRegistrationError(dest, "Destination node for edge is not registered")
        self.edges[src] = lambda state: dest

    def add_conditional_edge(self, src: str, path_func: Callable[[Dict], str]):
        if src not in self.nodes:
            raise EdgeRegistrationError(src, "Source node for conditional edge is not registered")
        self.edges[src] = path_func

    def _validate_state(self, state: Dict[str, Any], keys_to_check: List[str]):
        """Validates a subset of the state against the schema if it exists."""
        if not self.state_schema:
            return

        for key in keys_to_check:
            if key not in self.state_schema:
                raise StateValidationError("Key '{}' in state is not defined in the schema.".format(key))
            if key in state and not isinstance(state[key], self.state_schema[key]):
                expected_type = self.state_schema[key].__name__
                actual_type = type(state[key]).__name__
                msg = "Type mismatch for key '{}'. Expected '{}', got '{}'.".format(key, expected_type, actual_type)
                raise StateValidationError(msg)

    def run(self, inputs: Dict[str, Any], user_id: str = "system", max_steps: int = 15) -> Tuple[str, Dict[str, Any]]:
        """
        Executes the graph starting from the entry node.

        Returns:
            A tuple containing the final status ('completed', 'paused', 'error')
            and the final state of the graph.
        """
        state = inputs.copy()
        self._validate_state(state, list(state.keys()))
        self.gov.audit(user_id, "stateful_run_start", "StatefulOrchestrator", {"initial_keys": list(state.keys())})

        return self._execute_from(self.entry_node, state, user_id, max_steps)

    def resume(self, state: Dict[str, Any], human_input: Dict[str, Any], user_id: str = "system", max_steps: int = 15) -> Tuple[str, Dict[str, Any]]:
        """
        Resumes execution of a paused graph.
        """
        if "__next_node__" not in state:
            raise OrchestratorError("Cannot resume. The provided state is not a valid paused state.")

        next_node = state.pop("__next_node__")
        state.update(human_input)

        self.gov.audit(user_id, "graph_resume", "StatefulOrchestrator", {"resuming_at_node": next_node, "human_input_keys": list(human_input.keys())})
        self._validate_state(state, list(human_input.keys()))

        return self._execute_from(next_node, state, user_id, max_steps, start_step=state.get('__step__', 0))

    def _execute_from(self, start_node: str, state: Dict[str, Any], user_id: str, max_steps: int, start_step: int = 0) -> Tuple[str, Dict[str, Any]]:
        current_node_name = start_node

        for step in range(start_step, max_steps):
            if current_node_name == "__end__":
                self.gov.audit(user_id, "graph_end_reached", "StatefulOrchestrator", {"step": step})
                return "completed", state

            if current_node_name == "__interrupt__":
                self.gov.audit(user_id, "graph_interrupt_human_input", "StatefulOrchestrator", {"step": step})
                if state['__previous_node__'] in self.edges:
                    state["__next_node__"] = self.edges[state['__previous_node__']](state)
                    state["__step__"] = step
                return "paused", state

            if current_node_name not in self.nodes:
                raise NodeNotFoundError(current_node_name)

            self.gov.audit(user_id, "node_start", current_node_name, {"step": step})
            node_func = self.nodes[current_node_name]

            try:
                updates = node_func(state)
                self._validate_state(updates, list(updates.keys()))

                for key, value in updates.items():
                    record_to_tag = value if isinstance(value, dict) else {'value': value}
                    tagged_record = self.gov.tag_lineage(record_to_tag, source=current_node_name)
                    state[key] = tagged_record.get('value', tagged_record)

                self.gov.audit(user_id, "node_end", current_node_name, {"step": step, "updated_keys": list(updates.keys())})
            except Exception as e:
                self.gov.audit(user_id, "node_error", current_node_name, {"step": step, "error": str(e)})
                raise

            if current_node_name not in self.edges:
                self.gov.audit(user_id, "graph_path_end", "StatefulOrchestrator", {"last_node": current_node_name})
                return "completed", state

            path_func = self.edges[current_node_name]
            state["__previous_node__"] = current_node_name
            next_node_name = path_func(state)

            self.gov.audit(user_id, "conditional_edge_traversed", current_node_name, {"destination": next_node_name})
            current_node_name = next_node_name
        else:
             self.gov.audit(user_id, "max_steps_reached", "StatefulOrchestrator", {"max_steps": max_steps})
             return "max_steps_reached", state

        return "completed", state

__init__(entry_node, state_schema=None)

Initializes the stateful orchestrator.

Parameters:

Name Type Description Default
entry_node str

The name of the first node to execute in the graph.

required
state_schema Optional[Dict[str, Type]]

An optional schema defining expected keys and their Python types in the state object.

None
Source code in src/safeagent/stateful_orchestrator.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, entry_node: str, state_schema: Optional[Dict[str, Type]] = None):
    """
    Initializes the stateful orchestrator.

    Args:
        entry_node (str): The name of the first node to execute in the graph.
        state_schema (Optional[Dict[str, Type]]): An optional schema defining
            expected keys and their Python types in the state object.
    """
    if not isinstance(entry_node, str) or not entry_node:
        raise ValueError("entry_node must be a non-empty string.")

    self.nodes: Dict[str, Callable[[Dict], Dict]] = {}
    self.edges: Dict[str, Callable[[Dict], str]] = {}
    self.entry_node = entry_node
    self.state_schema = state_schema
    self.gov = GovernanceManager()

resume(state, human_input, user_id='system', max_steps=15)

Resumes execution of a paused graph.

Source code in src/safeagent/stateful_orchestrator.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def resume(self, state: Dict[str, Any], human_input: Dict[str, Any], user_id: str = "system", max_steps: int = 15) -> Tuple[str, Dict[str, Any]]:
    """
    Resumes execution of a paused graph.
    """
    if "__next_node__" not in state:
        raise OrchestratorError("Cannot resume. The provided state is not a valid paused state.")

    next_node = state.pop("__next_node__")
    state.update(human_input)

    self.gov.audit(user_id, "graph_resume", "StatefulOrchestrator", {"resuming_at_node": next_node, "human_input_keys": list(human_input.keys())})
    self._validate_state(state, list(human_input.keys()))

    return self._execute_from(next_node, state, user_id, max_steps, start_step=state.get('__step__', 0))

run(inputs, user_id='system', max_steps=15)

Executes the graph starting from the entry node.

Returns:

Type Description
str

A tuple containing the final status ('completed', 'paused', 'error')

Dict[str, Any]

and the final state of the graph.

Source code in src/safeagent/stateful_orchestrator.py
86
87
88
89
90
91
92
93
94
95
96
97
98
def run(self, inputs: Dict[str, Any], user_id: str = "system", max_steps: int = 15) -> Tuple[str, Dict[str, Any]]:
    """
    Executes the graph starting from the entry node.

    Returns:
        A tuple containing the final status ('completed', 'paused', 'error')
        and the final state of the graph.
    """
    state = inputs.copy()
    self._validate_state(state, list(state.keys()))
    self.gov.audit(user_id, "stateful_run_start", "StatefulOrchestrator", {"initial_keys": list(state.keys())})

    return self._execute_from(self.entry_node, state, user_id, max_steps)