Models
In Graphite, various models provide the fundamental data structures that underpin the event-driven workflow. Message represents the content exchanged between users, assistants, and language models, enabling consistent communication and processing. Event captures the various actions and state changes in the system, from workflow initiation to final outputs. Topics define the named channels where events are published and consumed, establishing a structured mechanism for coordinating data flow across the platform. Additional models include Command for encapsulating tool execution logic and InvokeContext for maintaining workflow state across operations.
Message
Message
extends OpenAI's chat completion structure, serving as a standardized data structure for both incoming and outgoing content in the event-driven workflow. Each Message
instance retains essential metadata such as timestamps, unique identifiers, and optional tool references, facilitating robust and traceable communication between users, assistants, and LLM tools.
Fields
Field | Description |
---|---|
name |
An optional name indicating the source or identifier for the message (e.g., function name). |
message_id |
A unique identifier for the message, defaulting to a generated UUID. |
timestamp |
The time in nanoseconds when the message was created, allowing strict chronological ordering. |
content |
The message content - can be string, dictionary, list of dictionaries, or None. |
refusal |
The refusal message generated by the model, if applicable. |
annotations |
Annotations for the message, such as when using web search tools. |
audio |
Audio response data from the model when audio output modality is requested. |
role |
Specifies the speaker's role (system , user , assistant , or tool ). |
tool_call_id |
Associates the message with a particular tool invocation if relevant. |
tools |
An optional list of OpenAI's ChatCompletionToolParam for referencing available tool calls. |
function_call |
Deprecated field replaced by tool_calls . |
tool_calls |
The tool calls generated by the model, such as function calls. |
is_streaming |
Boolean flag indicating if this message is part of a streaming response. |
Type Aliases
Messages = List[Message]
- A list of Message objectsMsgsAGen = AsyncGenerator[Messages, None]
- An async generator yielding Messages
Usage Example
from grafi.common.models.message import Message
# Creating a user message
user_message = Message(
role="user",
content="What is the capital of France?"
)
# Creating an assistant message
assistant_message = Message(
role="assistant",
content="The capital of France is Paris."
)
InvokeContext
InvokeContext
maintains the workflow state and tracking information across different operations in the system. It provides essential context for conversation management and request tracing.
InvokeContext Fields
Field | Description |
---|---|
conversation_id |
Unique identifier for a conversation between user and assistant. |
invoke_id |
Unique identifier for each conversation invoke - an invoke can involve multiple agents. |
assistant_request_id |
Created when an agent receives a request from the user. |
user_id |
Optional user identifier, defaults to empty string. |
kwargs |
Optional additional keyword arguments and context for the workflow |
InvokeContext Usage Example
from grafi.common.models.invoke_context import InvokeContext
context = InvokeContext(
conversation_id="conv_123",
invoke_id="invoke_456",
assistant_request_id="req_789",
user_id="user_001"
)
Command
Command
encapsulates tool execution logic using the Command Pattern. It provides a standardized interface for invoking tools with proper input handling and context management.
Command Fields
Field | Description |
---|---|
tool |
The Tool instance this command will execute. |
Command Methods
Method | Description |
---|---|
for_tool(tool) |
Class method factory to create appropriate command for a tool type. |
invoke(context, data) |
Synchronously invokes the tool with the provided context and input data. |
a_invoke(context, data) |
Asynchronously invokes the tool, yielding response messages. |
get_tool_input(context, data) |
Processes input events and extracts messages for tool consumption. |
to_dict() |
Serializes the command to a dictionary representation. |
Command Usage Example
from grafi.common.models.command import Command
from grafi.tools.tool import Tool
# Create command from tool
command = Command.for_tool(my_tool)
# Invoke the command
response = command.invoke(invoke_context, input_events)
FunctionSpec
FunctionSpec
defines the structure for function specifications that can be converted to OpenAI tool parameters.
FunctionSpec Fields
Field | Description |
---|---|
name |
The name of the function. |
description |
Description of what the function does. |
parameters |
Schema defining the function's parameters. |
FunctionSpec Methods
Method | Description |
---|---|
to_openai_tool() |
Converts the function spec to OpenAI's ChatCompletionToolParam . |
Supporting Classes
- ParameterSchema: Defines individual parameter types and descriptions
- ParametersSchema: Defines the overall parameters structure with properties and required fields
BaseBuilder
BaseBuilder
provides a generic builder pattern implementation for constructing Python models with a fluent interface.
BaseBuilder Methods
Method | Description |
---|---|
build() |
Returns the fully configured model instance. |
This builder is extended by specific builders like NodeBaseBuilder
to provide domain-specific construction methods.
MCP Connections
The mcp_connections.py
module defines comprehensive models for Model Context Protocol (MCP) connections, supporting multiple transport mechanisms for connecting to MCP servers. These TypedDict-based models ensure type safety and provide clear interfaces for different connection types.
Connection Types
The system supports four distinct connection transport types through a union type Connection
:
1. Standard I/O Connection
Transport: stdio
- Connects via standard input/output streams
Primary Use: Local process communication
Field | Type | Description |
---|---|---|
command |
str |
The executable to run to start the server |
args |
list[str] |
Command line arguments to pass to the executable |
env |
dict[str, str] \| None |
Environment variables for the spawned process |
cwd |
str \| Path \| None |
Working directory for the spawned process |
encoding |
str |
Text encoding for message communication |
encoding_error_handler |
EncodingErrorHandler |
Error handling strategy for encoding issues |
session_kwargs |
dict[str, Any] \| None |
Additional keyword arguments for the ClientSession |
2. SSEConnection
Transport: sse
- Server-Sent Events for real-time communication
Primary Use: HTTP-based streaming connections
Field | Type | Description |
---|---|---|
url |
str |
URL of the SSE endpoint to connect to |
headers |
dict[str, Any] \| None |
HTTP headers to send to the SSE endpoint |
timeout |
float |
HTTP timeout duration |
sse_read_timeout |
float |
SSE-specific read timeout |
session_kwargs |
dict[str, Any] \| None |
Additional keyword arguments for the ClientSession |
httpx_client_factory |
Optional[Callable] |
Custom factory for creating HTTP async client instances |
3. Streamable HTTP Connection
Transport: streamable_http
- HTTP with streaming capabilities
Primary Use: Long-lived HTTP connections with streaming support
Field | Type | Description |
---|---|---|
url |
str |
URL of the endpoint to connect to |
headers |
dict[str, Any] \| None |
HTTP headers to send to the endpoint |
timeout |
timedelta |
HTTP timeout duration |
sse_read_timeout |
timedelta |
Wait time for new events before disconnecting |
terminate_on_close |
bool |
Whether to terminate the session on close |
session_kwargs |
dict[str, Any] \| None |
Additional keyword arguments for the ClientSession |
httpx_client_factory |
Optional[Callable] |
Custom factory for creating HTTP async client instances |
4. WebSocket Connection
Transport: websocket
- WebSocket protocol for bidirectional communication
Primary Use: Real-time bidirectional communication
Field | Type | Description |
---|---|---|
url |
str |
URL of the WebSocket endpoint to connect to |
session_kwargs |
dict[str, Any] \| None |
Additional keyword arguments for the ClientSession |
Supporting Types
EncodingErrorHandler
Defines how encoding errors should be handled when communicating with standard I/O-based connections:
strict
: Raise an exception on encoding errors (default Python behavior)ignore
: Ignore encoding errors and skip problematic charactersreplace
: Replace problematic characters with placeholder characters
Usage Examples
Standard I/O Connection Example
stdio_config: StdioConnection = {
"transport": "stdio",
"command": "python",
"args": ["-m", "my_mcp_server"],
"env": {"DEBUG": "1"},
"cwd": "/path/to/server",
"encoding": "utf-8",
"encoding_error_handler": "replace",
"session_kwargs": None
}
SSE Connection Example
sse_config: SSEConnection = {
"transport": "sse",
"url": "https://api.example.com/mcp/events",
"headers": {"Authorization": "Bearer token123"},
"timeout": 30.0,
"sse_read_timeout": 60.0,
"session_kwargs": {"trust_env": True},
"httpx_client_factory": None
}
WebSocket Connection Example
ws_config: WebsocketConnection = {
"transport": "websocket",
"url": "wss://api.example.com/mcp/ws",
"session_kwargs": {"ping_interval": 20}
}
These connection models provide a flexible and type-safe foundation for integrating with various MCP server implementations, whether they're local processes, HTTP-based services, or real-time communication endpoints.
These models collectively provide a robust foundation for the event-driven architecture, ensuring consistent data handling, proper type safety, and extensible design patterns throughout the Graphite platform.
Additional Models
Default ID Generation
default_id
: A field generator that creates UUIDs automatically usinguuid.uuid4().hex
EventId
: Type alias for string-based event identifiers
EventId Type
EventId is a string type.