Skip to content

Node

A Node is a discrete component in a graph-based agent system that operates under an event-driven model. Its primary role is to represent its position within a workflow graph, manage event subscriptions, and designate topics for publishing. In addition, it delegates invoke to a Command object, adhering to the Command Pattern. Each Node comprises the following elements:

  • Unique Identity
  • Distinguished by a unique node_id, name, and type.
  • The name must be unique within a given workflow.
  • Subscribed Topics
  • Stores the event topics to which the node subscribes, typically originating from upstream publishers.
  • Subscriptions can reference explicit topic names or apply custom subscription strategies.
  • Publish-To Topics
  • Stores the event topics designated for downstream nodes to subscribe to, facilitating event routing.
  • Command for Invoke
  • Encapsulates invoke logic through a Command object.
  • Allows integration of new or specialized commands without modifying the node's existing structure.

NodeBase

The NodeBase class serves as the abstract foundation for all nodes in the graph-based agent system. It defines the core structure and interface that all node implementations must follow, including:

  • Core Properties: Essential attributes like node_id, name, type, and tool
  • Event Management: Subscription expressions and publish-to topics for event-driven communication
  • Command Integration: Internal command management through the Command Pattern
  • Builder Pattern: Provides a fluent interface for node construction via NodeBaseBuilder

The base class defines abstract methods invoke and a_invoke that must be implemented by concrete subclasses to define their specific behavior.

Node Implementation

The Node class is a concrete implementation of NodeBase that provides a working node with standard invoke behavior. Unlike the abstract base class, Node actually implements the invoke and a_invoke methods by delegating to an internal Command object.

Key features of the Node class include:

  • Automatic Command Setup: Creates commands automatically from tools during initialization
  • Event-Driven Invocation: Uses can_invoke() to determine readiness based on subscribed topics
  • Event-Based Interface: Consumes ConsumeFromTopicEvent objects and publishes PublishToTopicEvent objects
  • Decorator Support: Includes built-in instrumentation via @record_node_invoke and @record_node_a_invoke

The following table describes each field within the Node class, highlighting its purpose and usage in the workflow:

Field Description
node_id A unique identifier for the node instance.
name A unique name identifying the node within the workflow.
type Defines the category or type of node, indicating its function.
tool Optional tool that the node uses; automatically creates a command when set.
command Property providing access to the internal command object for invoke logic.
oi_span_type Semantic attribute from OpenInference for tracing purposes.
subscribed_expressions List of DSL-based subscription expressions used by the node.
publish_to List of designated topics the node publishes events to.
_subscribed_topics Internal mapping of subscribed topic names to Topic instances.
_command Private field storing the command object.

The following table summarizes the methods available in the Node class, highlighting their purpose and intended usage:

Method Description
builder Class method that returns a NodeBaseBuilder for fluent node construction.
model_post_init Model post-initialization hook that sets up subscribed topics and auto-creates commands from tools during initialization.
invoke Synchronously invokes the node's command with a list of ConsumeFromTopicEvent and returns a PublishToTopicEvent.
a_invoke Asynchronously invokes the node's command, yielding PublishToTopicEvent objects as they become available.
can_invoke Evaluates subscription conditions to determine whether the node is ready to invoke based on available topics.
can_invoke_with_topics Checks if the node can invoke given a specific list of topic names.
to_dict Serializes node attributes to a dictionary, suitable for persistence or transmission.

NodeBaseBuilder

The NodeBaseBuilder class provides a fluent interface for constructing nodes with a builder pattern. It allows for readable and maintainable node configuration through method chaining.

Available builder methods:

Method Description
name Sets the unique name for the node within the workflow.
type Sets the node type, indicating its function or category.
tool Sets the tool for the node; automatically creates a command from the tool.
oi_span_type Sets the OpenInference span type for tracing purposes.
subscribe Adds subscription expressions to topics or SubExpr objects.
publish_to Adds topics that this node will publish events to.

Example usage:

from grafi.nodes.node import Node
from grafi.common.topics.input_topic import InputTopic
from grafi.common.topics.output_topic import OutputTopic

node = Node.builder()
    .name("ProcessorNode")
    .type("DataProcessor")
    .tool(my_tool)
    .subscribe(input_topic)
    .publish_to(output_topic)
    .build()

# Node invoke signature
from grafi.common.models.invoke_context import InvokeContext
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent

# Synchronous invocation
context = InvokeContext(session_id="session-123")
input_events = [ConsumeFromTopicEvent(...)]  # List of consumed events
output_event: PublishToTopicEvent = node.invoke(context, input_events)

# Asynchronous invocation
async for output_event in node.a_invoke(context, input_events):
    # Process each PublishToTopicEvent as it's generated
    pass