Node
A Node is a discrete component in a graph-based agent system that operates under an event-driven model. Its primary role is to represent its position within a workflow graph, manage event subscriptions, and designate topics for publishing. In addition, it delegates invoke to a Command object, adhering to the Command Pattern. Each Node comprises the following elements:
- Unique Identity
- Distinguished by a unique node_id, name, and type.
- The name must be unique within a given workflow.
- Subscribed Topics
- Stores the event topics to which the node subscribes, typically originating from upstream publishers.
- Subscriptions can reference explicit topic names or apply custom subscription strategies.
- Publish-To Topics
- Stores the event topics designated for downstream nodes to subscribe to, facilitating event routing.
- Command for Invoke
- Encapsulates invoke logic through a Command object.
- Allows integration of new or specialized commands without modifying the node's existing structure.
NodeBase
The NodeBase
class serves as the abstract foundation for all nodes in the graph-based agent system. It defines the core structure and interface that all node implementations must follow, including:
- Core Properties: Essential attributes like
node_id
,name
,type
, andtool
- Event Management: Subscription expressions and publish-to topics for event-driven communication
- Command Integration: Internal command management through the Command Pattern
- Builder Pattern: Provides a fluent interface for node construction via
NodeBaseBuilder
The base class defines an abstract method invoke
that must be implemented by concrete subclasses to define their specific behavior.
Node Implementation
The Node
class is a concrete implementation of NodeBase
that provides a working node with standard invoke behavior. Unlike the abstract base class, Node
actually implements the invoke
method by delegating to an internal Command
object.
Key features of the Node class include:
- Automatic Command Setup: Creates commands automatically from tools during initialization
- Event-Driven Invocation: Uses
can_invoke()
to determine readiness based on subscribed topics - Event-Based Interface: Consumes
ConsumeFromTopicEvent
objects and publishesPublishToTopicEvent
objects - Decorator Support: Includes built-in instrumentation via
@record_node_invoke
The following table describes each field within the Node class, highlighting its purpose and usage in the workflow:
Field | Description |
---|---|
node_id |
A unique identifier for the node instance. |
name |
A unique name identifying the node within the workflow. |
type |
Defines the category or type of node, indicating its function. |
tool |
Optional tool that the node uses; automatically creates a command when set. |
command |
Property providing access to the internal command object for invoke logic. |
oi_span_type |
Semantic attribute from OpenInference for tracing purposes. |
subscribed_expressions |
List of DSL-based subscription expressions used by the node. |
publish_to |
List of designated topics the node publishes events to. |
_subscribed_topics |
Internal mapping of subscribed topic names to Topic instances. |
_command |
Private field storing the command object. |
The following table summarizes the methods available in the Node class, highlighting their purpose and intended usage:
Method | Description |
---|---|
builder |
Class method that returns a NodeBaseBuilder for fluent node construction. |
model_post_init |
Model post-initialization hook that sets up subscribed topics and auto-creates commands from tools during initialization. |
invoke |
Asynchronously invokes the node's command, yielding PublishToTopicEvent objects as they become available. |
can_invoke |
Evaluates subscription conditions to determine whether the node is ready to invoke based on available topics. |
can_invoke_with_topics |
Checks if the node can invoke given a specific list of topic names. |
to_dict |
Serializes node attributes to a dictionary, suitable for persistence or transmission. |
NodeBaseBuilder
The NodeBaseBuilder
class provides a fluent interface for constructing nodes with a builder pattern. It allows for readable and maintainable node configuration through method chaining.
Available builder methods:
Method | Description |
---|---|
name |
Sets the unique name for the node within the workflow. |
type |
Sets the node type, indicating its function or category. |
tool |
Sets the tool for the node; automatically creates a command from the tool. |
oi_span_type |
Sets the OpenInference span type for tracing purposes. |
subscribe |
Adds subscription expressions to topics or SubExpr objects. |
publish_to |
Adds topics that this node will publish events to. |
Example usage:
from grafi.nodes.node import Node
from grafi.topics.input_topic import InputTopic
from grafi.topics.output_topic import OutputTopic
node = Node.builder()
.name("ProcessorNode")
.type("DataProcessor")
.tool(my_tool)
.subscribe(input_topic)
.publish_to(output_topic)
.build()
# Node invoke signature
from grafi.models.invoke_context import InvokeContext
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
# Asynchronous invocation
context = InvokeContext(session_id="session-123")
input_datas = [ConsumeFromTopicEvent(...)] # List of consumed events
async for output_event in node.invoke(context, input_datas):
# Process each PublishToTopicEvent as it's generated
pass