Invoke Decorators
Graphite provides a comprehensive set of decorators for recording execution events, adding distributed tracing, and exposing functions to LLMs. These decorators automatically handle event logging, error tracking, and observability without requiring manual instrumentation.
Overview
The decorators are categorized into two main types:
- Recording Decorators - Automatically record events, add tracing spans, and handle errors
- Function Decorators - Expose functions to LLMs with automatic schema generation
LLM Function Decorator
@llm_function
The @llm_function
decorator exposes methods to Language Learning Models by automatically generating function specifications.
Location: grafi.common.decorators.llm_function
Purpose:
- Extracts function metadata (name, docstring, parameters, type hints)
- Constructs a
FunctionSpec
object with JSON Schema-compatible parameter descriptions - Stores the specification as a
_function_spec
attribute on the decorated function
Usage:
from grafi.common.decorators.llm_function import llm_function
@llm_function
def calculate_sum(x: int, y: int, precision: float = 0.1) -> float:
"""
Calculate the sum of two numbers with optional precision.
Args:
x (int): The first number to add.
y (int): The second number to add.
precision (float, optional): Precision level. Defaults to 0.1.
Returns:
float: The sum of x and y.
"""
return float(x + y)
Features:
- Automatically maps Python types to JSON Schema types
- Extracts parameter descriptions from docstrings
- Marks parameters without defaults as required
- Supports type hints for comprehensive schema generation
Type Mapping:
Python Type | JSON Schema Type |
---|---|
str |
"string" |
int |
"integer" |
float |
"number" |
bool |
"boolean" |
list |
"array" |
dict |
"object" |
Other | "string" (default) |
Recording Decorators
Recording decorators provide automatic event logging, distributed tracing, and error handling for different component types. They come in synchronous and asynchronous variants.
Assistant Decorators
@record_assistant_invoke
Location: grafi.common.decorators.record_assistant_invoke
Records synchronous assistant invocations with event logging and tracing.
Features:
- Records
AssistantInvokeEvent
before execution - Creates distributed tracing spans with OpenInference attributes
- Records
AssistantRespondEvent
on success orAssistantFailedEvent
on error - Captures input/output data as JSON
Usage:
from grafi.common.decorators.record_assistant_invoke import record_assistant_invoke
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
@record_assistant_invoke
def invoke(self, input_event: PublishToTopicEvent) -> List[ConsumeFromTopicEvent]:
# Assistant implementation
return self.workflow.invoke(input_event)
@record_assistant_a_invoke
Location: grafi.common.decorators.record_assistant_a_invoke
Records asynchronous assistant invocations that return async generators.
Features:
- Handles async generator responses (
MsgsAGen
) - Aggregates streaming content for final result tracking
- Records events for both streaming and non-streaming responses
- Preserves async generator behavior while adding observability
Usage:
from grafi.common.decorators.record_assistant_a_invoke import record_assistant_a_invoke
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from typing import AsyncGenerator
@record_assistant_a_invoke
async def a_invoke(self, input_event: PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None]:
# Async assistant implementation
async for output in self.workflow.a_invoke(input_event):
yield output
Node Decorators
@record_node_invoke
Location: grafi.common.decorators.record_node_invoke
Records synchronous node invocations in event-driven workflows.
Features:
- Records
NodeInvokeEvent
with subscription and publication topic information - Tracks subscribed topics and publish destinations
- Captures
ConsumeFromTopicEvent
input data - Records success/failure events with comprehensive context
Usage:
from grafi.common.decorators.record_node_invoke import record_node_invoke
from grafi.common.models.invoke_context import InvokeContext
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from typing import List
@record_node_invoke
def invoke(self, invoke_context: InvokeContext,
node_input: List[ConsumeFromTopicEvent]) -> PublishToTopicEvent:
# Node processing logic - execute command and return PublishToTopicEvent
response = self.command.invoke(invoke_context, node_input)
return PublishToTopicEvent(
publisher_name=self.name,
publisher_type=self.type,
invoke_context=invoke_context,
consumed_event_ids=[event.event_id for event in node_input],
data=response
)
@record_node_a_invoke
Location: grafi.common.decorators.record_node_a_invoke
Records asynchronous node invocations that return async generators.
Features:
- Handles async generator node responses
- Aggregates streaming content from multiple topic events
- Maintains topic subscription and publication tracking
- Records comprehensive node execution metadata
Usage:
from grafi.common.decorators.record_node_a_invoke import record_node_a_invoke
from grafi.common.models.invoke_context import InvokeContext
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from typing import List, AsyncGenerator
@record_node_a_invoke
async def a_invoke(self, invoke_context: InvokeContext,
node_input: List[ConsumeFromTopicEvent]) -> AsyncGenerator[PublishToTopicEvent, None]:
# Async node processing - execute command and yield PublishToTopicEvents
async for messages in self.command.a_invoke(invoke_context, node_input):
yield PublishToTopicEvent(
publisher_name=self.name,
publisher_type=self.type,
invoke_context=invoke_context,
consumed_event_ids=[event.event_id for event in node_input],
data=messages
)
Tool Decorators
@record_tool_invoke
Location: grafi.common.decorators.record_tool_invoke
Records synchronous tool invocations with event logging and tracing.
Features:
- Records
ToolInvokeEvent
before execution - Creates tool-specific tracing spans
- Records
ToolRespondEvent
on success orToolFailedEvent
on error - Captures tool execution context and results
Usage:
from grafi.common.decorators.record_tool_invoke import record_tool_invoke
@record_tool_invoke
def invoke(self, invoke_context: InvokeContext, input_data: Messages) -> Messages:
# Tool execution logic
return tool_results
@record_tool_a_invoke
Location: grafi.common.decorators.record_tool_a_invoke
Records asynchronous tool invocations that return async generators.
Features:
- Handles async generator tool responses
- Processes streaming tool outputs
- Aggregates content for comprehensive result tracking
- Maintains tool execution observability
Usage:
from grafi.common.decorators.record_tool_a_invoke import record_tool_a_invoke
@record_tool_a_invoke
async def a_invoke(self, invoke_context: InvokeContext, input_data: Messages) -> MsgsAGen:
# Async tool execution
yield tool_results
Workflow Decorators
@record_workflow_invoke
Location: grafi.common.decorators.record_workflow_invoke
Records synchronous workflow invocations with comprehensive event logging.
Features:
- Records
WorkflowInvokeEvent
before execution - Creates workflow-level tracing spans
- Records
WorkflowRespondEvent
on success orWorkflowFailedEvent
on error - Tracks workflow execution context and results
Usage:
from grafi.common.decorators.record_workflow_invoke import record_workflow_invoke
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from typing import List
@record_workflow_invoke
def invoke(self, input_event: PublishToTopicEvent) -> List[ConsumeFromTopicEvent]:
# Workflow orchestration logic
# Initialize workflow, execute nodes, return consumed events
return output_events
@record_workflow_a_invoke
Location: grafi.common.decorators.record_workflow_a_invoke
Records asynchronous workflow invocations that return async generators.
Features:
- Handles async generator workflow responses
- Processes streaming workflow outputs
- Aggregates content for comprehensive result tracking
- Maintains workflow execution observability
Usage:
from grafi.common.decorators.record_workflow_a_invoke import record_workflow_a_invoke
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from typing import AsyncGenerator
@record_workflow_a_invoke
async def a_invoke(self, input_event: PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None]:
# Async workflow orchestration
# Initialize workflow, execute nodes asynchronously, yield consumed events
async for output_event in self._execute_workflow(input_event):
yield output_event
Common Features
Event Recording
All recording decorators automatically:
- Record invocation events before execution begins
- Record success events with output data upon completion
- Record failure events with error details when exceptions occur
- Capture timing and execution context information
Distributed Tracing
The decorators integrate with OpenInference tracing standards:
- Create spans with appropriate names (
{component_name}.invoke
or{component_name}.run
) - Set OpenInference span attributes for component identification
- Capture input/output data as span attributes
- Record error information in span attributes on failure
Streaming Support
Async decorators handle streaming responses:
- Preserve async generator behavior for streaming outputs
- Aggregate streaming content for final result tracking
- Distinguish between streaming and non-streaming messages
- Maintain observability without affecting streaming performance
Error Handling
All decorators provide comprehensive error handling:
- Capture and record exception details in events
- Add error information to tracing spans
- Re-raise exceptions to preserve original error behavior
- Maintain error context for debugging and monitoring
Span Attributes
The decorators set standardized span attributes:
Attribute Type | Examples |
---|---|
Component ID | assistant_id , node_id , tool_id , workflow_id |
Component Metadata | name , type , model (for assistants) |
Execution Context | All fields from InvokeContext |
Data | input , output (serialized as JSON) |
Topics (Nodes) | subscribed_topics , publish_to_topics |
OpenInference | openinference.span.kind |
Errors | error (error message when exceptions occur) |
Best Practices
-
Choose the Right Decorator: Use synchronous decorators for blocking operations and asynchronous decorators for streaming or async operations.
-
Consistent Application: Apply decorators consistently across similar component types for uniform observability.
-
Function Specification: Use
@llm_function
on methods that should be available to LLMs, ensuring comprehensive docstrings and type hints. -
Performance Considerations: Recording decorators add minimal overhead but consider the impact of JSON serialization for large data structures.
-
Error Context: Leverage the automatic error recording for debugging and monitoring failed invocations.
-
Streaming Behavior: For async decorators, ensure your async generators yield complete message batches for proper aggregation.