Skip to content

Models

In Graphite, various models provide the fundamental data structures that underpin the event-driven workflow. Message represents the content exchanged between users, assistants, and language models, enabling consistent communication and processing. Event captures the various actions and state changes in the system, from workflow initiation to final outputs. Topics define the named channels where events are published and consumed, establishing a structured mechanism for coordinating data flow across the platform. Additional models include Command for encapsulating tool execution logic and InvokeContext for maintaining workflow state across operations.

Message

Message extends OpenAI's chat completion structure, serving as a standardized data structure for both incoming and outgoing content in the event-driven workflow. Each Message instance retains essential metadata such as timestamps, unique identifiers, and optional tool references, facilitating robust and traceable communication between users, assistants, and LLM tools.

Fields

Field Description
name An optional name indicating the source or identifier for the message (e.g., function name).
message_id A unique identifier for the message, defaulting to a generated UUID.
timestamp The time in nanoseconds when the message was created, allowing strict chronological ordering.
content The message content - can be string, dictionary, list of dictionaries, or None.
refusal The refusal message generated by the model, if applicable.
annotations Annotations for the message, such as when using web search tools.
audio Audio response data from the model when audio output modality is requested.
role Specifies the speaker's role (system, user, assistant, or tool).
tool_call_id Associates the message with a particular tool invocation if relevant.
tools An optional list of OpenAI's ChatCompletionToolParam for referencing available tool calls.
function_call Deprecated field replaced by tool_calls.
tool_calls The tool calls generated by the model, such as function calls.
is_streaming Boolean flag indicating if this message is part of a streaming response.

Type Aliases

  • Messages = List[Message] - A list of Message objects
  • MsgsAGen = AsyncGenerator[Messages, None] - An async generator yielding Messages

Usage Example

from grafi.common.models.message import Message

# Creating a user message
user_message = Message(
    role="user",
    content="What is the capital of France?"
)

# Creating an assistant message
assistant_message = Message(
    role="assistant",
    content="The capital of France is Paris."
)

InvokeContext

InvokeContext maintains the workflow state and tracking information across different operations in the system. It provides essential context for conversation management and request tracing.

InvokeContext Fields

Field Description
conversation_id Unique identifier for a conversation between user and assistant.
invoke_id Unique identifier for each conversation invoke - an invoke can involve multiple agents.
assistant_request_id Created when an agent receives a request from the user.
user_id Optional user identifier, defaults to empty string.
kwargs Optional additional keyword arguments and context for the workflow

InvokeContext Usage Example

from grafi.common.models.invoke_context import InvokeContext

context = InvokeContext(
    conversation_id="conv_123",
    invoke_id="invoke_456",
    assistant_request_id="req_789",
    user_id="user_001"
)

Command

Command encapsulates tool execution logic using the Command Pattern. It provides a standardized interface for invoking tools with proper input handling and context management.

Command Fields

Field Description
tool The Tool instance this command will execute.

Command Methods

Method Description
for_tool(tool) Class method factory to create appropriate command for a tool type.
invoke(context, data) Synchronously invokes the tool with the provided context and input data.
a_invoke(context, data) Asynchronously invokes the tool, yielding response messages.
get_tool_input(context, data) Processes input events and extracts messages for tool consumption.
to_dict() Serializes the command to a dictionary representation.

Command Usage Example

from grafi.common.models.command import Command
from grafi.tools.tool import Tool

# Create command from tool
command = Command.for_tool(my_tool)

# Invoke the command
response = command.invoke(invoke_context, input_events)

FunctionSpec

FunctionSpec defines the structure for function specifications that can be converted to OpenAI tool parameters.

FunctionSpec Fields

Field Description
name The name of the function.
description Description of what the function does.
parameters Schema defining the function's parameters.

FunctionSpec Methods

Method Description
to_openai_tool() Converts the function spec to OpenAI's ChatCompletionToolParam.

Supporting Classes

  • ParameterSchema: Defines individual parameter types and descriptions
  • ParametersSchema: Defines the overall parameters structure with properties and required fields

BaseBuilder

BaseBuilder provides a generic builder pattern implementation for constructing Python models with a fluent interface.

BaseBuilder Methods

Method Description
build() Returns the fully configured model instance.

This builder is extended by specific builders like NodeBaseBuilder to provide domain-specific construction methods.

MCP Connections

The mcp_connections.py module defines comprehensive models for Model Context Protocol (MCP) connections, supporting multiple transport mechanisms for connecting to MCP servers. These TypedDict-based models ensure type safety and provide clear interfaces for different connection types.

Connection Types

The system supports four distinct connection transport types through a union type Connection:

1. Standard I/O Connection

Transport: stdio - Connects via standard input/output streams

Primary Use: Local process communication

Field Type Description
command str The executable to run to start the server
args list[str] Command line arguments to pass to the executable
env dict[str, str] \| None Environment variables for the spawned process
cwd str \| Path \| None Working directory for the spawned process
encoding str Text encoding for message communication
encoding_error_handler EncodingErrorHandler Error handling strategy for encoding issues
session_kwargs dict[str, Any] \| None Additional keyword arguments for the ClientSession
2. SSEConnection

Transport: sse - Server-Sent Events for real-time communication

Primary Use: HTTP-based streaming connections

Field Type Description
url str URL of the SSE endpoint to connect to
headers dict[str, Any] \| None HTTP headers to send to the SSE endpoint
timeout float HTTP timeout duration
sse_read_timeout float SSE-specific read timeout
session_kwargs dict[str, Any] \| None Additional keyword arguments for the ClientSession
httpx_client_factory Optional[Callable] Custom factory for creating HTTP async client instances
3. Streamable HTTP Connection

Transport: streamable_http - HTTP with streaming capabilities

Primary Use: Long-lived HTTP connections with streaming support

Field Type Description
url str URL of the endpoint to connect to
headers dict[str, Any] \| None HTTP headers to send to the endpoint
timeout timedelta HTTP timeout duration
sse_read_timeout timedelta Wait time for new events before disconnecting
terminate_on_close bool Whether to terminate the session on close
session_kwargs dict[str, Any] \| None Additional keyword arguments for the ClientSession
httpx_client_factory Optional[Callable] Custom factory for creating HTTP async client instances
4. WebSocket Connection

Transport: websocket - WebSocket protocol for bidirectional communication

Primary Use: Real-time bidirectional communication

Field Type Description
url str URL of the WebSocket endpoint to connect to
session_kwargs dict[str, Any] \| None Additional keyword arguments for the ClientSession

Supporting Types

EncodingErrorHandler
EncodingErrorHandler = Literal["strict", "ignore", "replace"]

Defines how encoding errors should be handled when communicating with standard I/O-based connections:

  • strict: Raise an exception on encoding errors (default Python behavior)
  • ignore: Ignore encoding errors and skip problematic characters
  • replace: Replace problematic characters with placeholder characters

Usage Examples

Standard I/O Connection Example
stdio_config: StdioConnection = {
    "transport": "stdio",
    "command": "python",
    "args": ["-m", "my_mcp_server"],
    "env": {"DEBUG": "1"},
    "cwd": "/path/to/server",
    "encoding": "utf-8",
    "encoding_error_handler": "replace",
    "session_kwargs": None
}
SSE Connection Example
sse_config: SSEConnection = {
    "transport": "sse",
    "url": "https://api.example.com/mcp/events",
    "headers": {"Authorization": "Bearer token123"},
    "timeout": 30.0,
    "sse_read_timeout": 60.0,
    "session_kwargs": {"trust_env": True},
    "httpx_client_factory": None
}
WebSocket Connection Example
ws_config: WebsocketConnection = {
    "transport": "websocket",
    "url": "wss://api.example.com/mcp/ws",
    "session_kwargs": {"ping_interval": 20}
}

These connection models provide a flexible and type-safe foundation for integrating with various MCP server implementations, whether they're local processes, HTTP-based services, or real-time communication endpoints.

These models collectively provide a robust foundation for the event-driven architecture, ensuring consistent data handling, proper type safety, and extensible design patterns throughout the Graphite platform.

Additional Models

Default ID Generation

  • default_id: A field generator that creates UUIDs automatically using uuid.uuid4().hex
  • EventId: Type alias for string-based event identifiers

EventId Type

EventId is a string type.