Skip to content

API: Sinks

This section covers the BaseOutputSink and the built-in sink implementations for handling tool outputs.

safeagent.sinks

BaseOutputSink

Abstract base class for all tool output sinks.

Source code in src/safeagent/sinks.py
 8
 9
10
11
12
13
14
15
16
17
18
class BaseOutputSink:
    """Abstract base class for all tool output sinks."""
    def handle(self, tool_name: str, result: Any, run_id: str, **kwargs) -> Dict:
        """
        Processes the tool's result. Must be implemented by subclasses.
        Should return a dictionary with metadata about the sink operation.
        """
        raise NotImplementedError

    def __str__(self):
        return self.__class__.__name__

handle(tool_name, result, run_id, **kwargs)

Processes the tool's result. Must be implemented by subclasses. Should return a dictionary with metadata about the sink operation.

Source code in src/safeagent/sinks.py
10
11
12
13
14
15
def handle(self, tool_name: str, result: Any, run_id: str, **kwargs) -> Dict:
    """
    Processes the tool's result. Must be implemented by subclasses.
    Should return a dictionary with metadata about the sink operation.
    """
    raise NotImplementedError

FileOutputSink

Bases: BaseOutputSink

An output sink that saves the tool's result to a local JSON file.

Source code in src/safeagent/sinks.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class FileOutputSink(BaseOutputSink):
    """An output sink that saves the tool's result to a local JSON file."""
    def __init__(self, base_path: str = "tool_outputs"):
        self.base_path = Path(base_path)
        # Ensure the output directory exists
        self.base_path.mkdir(parents=True, exist_ok=True)

    def handle(self, tool_name: str, result: Any, run_id: str, **kwargs) -> Dict:
        # Use a combination of tool name and run_id for a unique filename
        filename = f"{tool_name}_{run_id}.json"
        filepath = self.base_path / filename

        try:
            # Prepare data for JSON serialization
            serializable_result = result
            if not isinstance(result, (dict, list, str, int, float, bool, type(None))):
                serializable_result = str(result)

            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump({"tool_name": tool_name, "result": serializable_result}, f, indent=2)

            return {"status": "success", "path": str(filepath)}
        except Exception as e:
            return {"status": "failure", "error": str(e)}

    def __str__(self):
        return f"FileOutputSink(path='{self.base_path}')"

PubSubSink

Bases: BaseOutputSink

A conceptual output sink for Google Cloud Pub/Sub.

Source code in src/safeagent/sinks.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class PubSubSink(BaseOutputSink):
    """A conceptual output sink for Google Cloud Pub/Sub."""
    def __init__(self, project_id: str, topic_id: str):
        self.project_id = project_id
        self.topic_id = topic_id
        # In a real app: from google.cloud import pubsub_v1
        # self.publisher = pubsub_v1.PublisherClient()
        # self.topic_path = self.publisher.topic_path(project_id, topic_id)
        print("NOTE: PubSubSink is a conceptual example. Using mock implementation.")

    def handle(self, tool_name: str, result: Any, run_id: str, **kwargs) -> Dict:
        message_data = json.dumps({
            "tool_name": tool_name,
            "result": result,
            "run_id": run_id
        }, default=str).encode("utf-8")

        # future = self.publisher.publish(self.topic_path, message_data)
        # message_id = future.result()
        message_id = f"mock_message_id_for_{run_id}"

        print(f"MOCK PUBLISH: Message to Pub/Sub topic '{self.topic_id}' with ID: {message_id}")
        return {"status": "success", "message_id": message_id}

    def __str__(self):
        return f"PubSubSink(topic='{self.topic_id}')"