Assistant
The Assistant serves as the primary interface between users and the underlying workflow system. It processes user input, manages workflows, and coordinates interactions between users and workflow components. Assistants use language models to process input and generate responses through structured workflows.
Architecture Overview
Assistants in Graphite follow a two-tier architecture:
- AssistantBase - Abstract base class defining the core interface and properties
- Assistant - Concrete implementation that handles workflow execution and message processing
AssistantBase Class
The AssistantBase
class provides an abstract foundation for all assistants, defining essential properties and methods that must be implemented by concrete assistant classes.
Class Configuration
from grafi.assistants.assistant_base import AssistantBase
from grafi.common.events.topic_events.publish_to_topic_event import PublishToTopicEvent
from grafi.common.events.topic_events.consume_from_topic_event import ConsumeFromTopicEvent
from typing import List, AsyncGenerator
class MyAssistant(AssistantBase):
def _construct_workflow(self):
# Implementation required
pass
def invoke(self, input_event: PublishToTopicEvent) -> List[ConsumeFromTopicEvent]:
# Implementation required
pass
async def a_invoke(self, input_event: PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None]:
# Implementation required
pass
AssistantBase Fields
Field | Type | Default | Description |
---|---|---|---|
assistant_id |
str |
default_id |
Unique identifier for the assistant instance |
name |
str |
"Assistant" |
Human-readable name for the assistant |
type |
str |
"assistant" |
Category or type specification for the assistant |
oi_span_type |
OpenInferenceSpanKindValues |
AGENT |
OpenInference span type for distributed tracing |
workflow |
Workflow |
Workflow() |
Associated workflow instance managed by the assistant |
Required Methods
Subclasses must implement these abstract methods:
Method | Signature | Description |
---|---|---|
_construct_workflow |
() -> AssistantBase |
Constructs and configures the assistant's workflow |
invoke |
(PublishToTopicEvent) -> List[ConsumeFromTopicEvent] |
Synchronous event processing |
a_invoke |
(PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None] |
Asynchronous event processing with streaming support |
Lifecycle
The AssistantBase
automatically calls _construct_workflow()
during initialization via model_post_init()
, ensuring the workflow is properly configured before the assistant is used.
Assistant Class
The concrete Assistant
class extends AssistantBase
and provides a complete implementation for workflow-based message processing. It includes automatic event recording and tracing through decorators.
Implementation Features
- Automatic Event Recording: Uses
@record_assistant_invoke
and@record_assistant_a_invoke
decorators - Workflow Delegation: Delegates all processing to the configured workflow
- Manifest Generation: Supports generating configuration manifests
- Serialization: Provides dictionary serialization capabilities
Assistant Methods
Method | Signature | Description |
---|---|---|
invoke |
(PublishToTopicEvent) -> List[ConsumeFromTopicEvent] |
Processes input events synchronously through the workflow |
a_invoke |
(PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None] |
Processes input events asynchronously with streaming support |
to_dict |
() -> dict[str, Any] |
Serializes the assistant's workflow configuration |
generate_manifest |
(output_dir: str = ".") -> str |
Generates a JSON manifest file for the assistant |
Method Details
invoke()
Synchronously processes input events through the configured workflow.
@record_assistant_invoke
def invoke(self, input_event: PublishToTopicEvent) -> List[ConsumeFromTopicEvent]:
events = self.workflow.invoke(input_event)
return events
Parameters:
input_event
: APublishToTopicEvent
containing the data to be processed and context information
Returns: List of ConsumeFromTopicEvent
objects representing the workflow output
Raises: ValueError
if required configuration (e.g., API keys) is missing
a_invoke()
Asynchronously processes input events with support for streaming responses.
@record_assistant_a_invoke
async def a_invoke(self, input_event: PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None]:
async for output in self.workflow.a_invoke(input_event):
yield output
Parameters:
input_event
: APublishToTopicEvent
containing the data to be processed and context information
Returns: Async generator yielding ConsumeFromTopicEvent
objects
Use Cases: Streaming responses, real-time processing, concurrent operations
generate_manifest()
Creates a JSON manifest file containing the assistant's configuration.
def generate_manifest(self, output_dir: str = ".") -> str:
manifest_dict = self.to_dict()
output_path = os.path.join(output_dir, f"{self.name}_manifest.json")
# Writes JSON file
return output_path
Parameters:
output_dir
: Directory for the manifest file (default: current directory)
Returns: Path to the generated manifest file
AssistantBaseBuilder Class
The AssistantBaseBuilder
provides a fluent interface for constructing assistant instances with proper configuration.
Builder Methods
Method | Parameters | Description |
---|---|---|
oi_span_type |
OpenInferenceSpanKindValues |
Sets the OpenInference span type for tracing |
name |
str |
Sets the assistant's name |
type |
str |
Sets the assistant's type category |
event_store |
EventStore |
Registers an event store for event recording |
Usage Example
from grafi.assistants.assistant_base import AssistantBaseBuilder
from grafi.common.event_stores.in_memory_event_store import InMemoryEventStore
from openinference.semconv.trace import OpenInferenceSpanKindValues
builder = AssistantBaseBuilder(MyAssistant)
assistant = (builder
.name("Customer Support Assistant")
.type("support")
.oi_span_type(OpenInferenceSpanKindValues.AGENT)
.event_store(InMemoryEventStore())
.build())