Topic
The Topic
class is a concrete implementation of TopicBase
that provides a complete message queue system for publishing and consuming messages within Graphite applications. It includes built-in logging, event handling, and condition-based message filtering.
Overview
The Topic
class extends TopicBase
to provide:
- Conditional Publishing: Messages are only published if they meet the topic's condition
- Event Handling: Optional event handlers for publish operations
- Logging Integration: Automatic logging of publish operations
- Builder Pattern: Fluent API for topic configuration
Core Components
Topic Class
A complete topic implementation with publishing logic and event handling.
Fields
Field | Type | Description |
---|---|---|
topic_events |
List[TopicEvent] |
List of all topic events (overrides base) |
publish_event_handler |
Optional[Callable[[PublishToTopicEvent], None]] |
Optional handler called after successful publishing |
Inherits all fields from TopicBase
: name
, condition
, consumption_offsets
Methods
Method | Signature | Description |
---|---|---|
builder |
() -> TopicBuilder |
Class method returning a TopicBuilder instance |
publish_data |
(invoke_context, publisher_name, publisher_type, data, consumed_events) -> PublishToTopicEvent |
Publishes messages if condition is met |
Inherits all methods from TopicBase
: can_consume
, consume
, reset
, restore_topic
, etc.
TopicBuilder
Enhanced builder for Topic
instances with additional configuration options.
Builder Methods
Method | Signature | Description |
---|---|---|
publish_event_handler |
(handler: Callable[[PublishToTopicEvent], None]) -> Self |
Set event handler for publish operations |
Inherits all builder methods from TopicBaseBuilder
: name
, condition
Publishing Logic
Conditional Publishing
The publish_data
method implements condition-based publishing:
def publish_data(
self,
invoke_context: InvokeContext,
publisher_name: str,
publisher_type: str,
data: Messages,
consumed_events: List[ConsumeFromTopicEvent],
) -> PublishToTopicEvent:
"""
Publishes a message's event ID to this topic if it meets the condition.
"""
if self.condition(data):
event = PublishToTopicEvent(
invoke_context=invoke_context,
topic_name=self.name,
publisher_name=publisher_name,
publisher_type=publisher_type,
data=data,
consumed_event_ids=[
consumed_event.event_id for consumed_event in consumed_events
],
offset=len(self.topic_events),
)
self.topic_events.append(event)
if self.publish_event_handler:
self.publish_event_handler(event)
logger.info(
f"[{self.name}] Message published with event_id: {event.event_id}"
)
return event
else:
logger.info(f"[{self.name}] Message NOT published (condition not met)")
return None
Publishing Workflow
- Condition Check: Evaluate if messages meet the topic's condition
- Event Creation: Create
PublishToTopicEvent
with metadata and offset - Event Storage: Add event to topic's event list
- Handler Execution: Call publish event handler if configured
- Logging: Log success or failure with appropriate message
- Return Result: Return event on success,
None
on condition failure
Usage Examples
Basic Topic Creation
from grafi.common.topics.topic import Topic
# Create simple topic
topic = Topic(name="notifications")
# Or use builder pattern
topic = (Topic.builder()
.name("notifications")
.condition(lambda msgs: len(msgs) > 0)
.build())
Topic with Event Handler
def on_message_published(event: PublishToTopicEvent):
print(f"Published message to {event.topic_name}: {event.data}")
topic = (Topic.builder()
.name("alerts")
.condition(lambda msgs: any("error" in msg.content.lower() for msg in msgs))
.publish_event_handler(on_message_published)
.build())
Publishing Messages
from grafi.common.models.invoke_context import InvokeContext
from grafi.common.models.message import Message
# Create context and messages
context = InvokeContext()
messages = [Message(role="user", content="Hello world")]
# Publish to topic
event = topic.publish_data(
invoke_context=context,
publisher_name="my_publisher",
publisher_type="application",
data=messages,
consumed_events=[]
)
if event:
print(f"Published: {event.event_id}")
else:
print("Message did not meet condition")
Message Consumption
# Check for new messages
if topic.can_consume("consumer_1"):
messages = topic.consume("consumer_1")
for message in messages:
print(f"Consumed: {message.data}")
Filtering and Conditions
Custom Conditions
# Only publish error messages
error_topic = (Topic.builder()
.name("errors")
.condition(lambda msgs: any("error" in msg.content.lower() for msg in msgs))
.build())
# Only publish messages from specific roles
admin_topic = (Topic.builder()
.name("admin_messages")
.condition(lambda msgs: any(msg.role == "admin" for msg in msgs))
.build())
# Complex business logic
validated_topic = (Topic.builder()
.name("validated_messages")
.condition(lambda msgs: all(
msg.metadata.get("validated", False) for msg in msgs
))
.build())
Default Behavior
# Accept all messages (default condition)
all_messages_topic = Topic(name="all_messages")
# Equivalent to: condition=lambda _: True
Pre-configured Topics
Agent Input Topic
The module provides a pre-configured topic for agent input:
from grafi.common.topics.topic import agent_input_topic
# Use the predefined agent input topic
event = agent_input_topic.publish_data(
invoke_context=context,
publisher_name="user_interface",
publisher_type="input_handler",
data=user_messages,
consumed_events=[]
)
Logging Integration
Log Messages
The Topic class automatically logs publishing operations:
# Successful publishing
logger.info(f"[{self.name}] Message published with event_id: {event.event_id}")
# Condition not met
logger.info(f"[{self.name}] Message NOT published (condition not met)")
Log Format Examples
INFO: [notifications] Message published with event_id: evt_123456
INFO: [errors] Message NOT published (condition not met)
INFO: [alerts] Message published with event_id: evt_789012
Best Practices
Topic Design
- Meaningful Names: Use descriptive names that indicate the topic's purpose
- Focused Conditions: Keep condition functions simple and focused
- Event Handlers: Use handlers for side effects, not primary logic
- Error Handling: Handle condition evaluation errors gracefully
Performance Optimization
- Efficient Conditions: Optimize condition functions for frequent evaluation
- Handler Performance: Keep event handlers lightweight and fast
- Memory Management: Monitor topic event accumulation
- Batch Processing: Consider batching for high-volume scenarios
Error Handling
def safe_condition(messages: Messages) -> bool:
try:
return any("priority" in msg.metadata for msg in messages)
except (AttributeError, KeyError):
return False
def safe_handler(event: PublishToTopicEvent):
try:
process_event(event)
except Exception as e:
logger.error(f"Error in event handler: {e}")
Testing Strategies
def test_topic_publishing():
# Create test topic
topic = Topic(name="test_topic")
messages = [Message(role="user", content="test")]
# Test successful publishing
event = topic.publish_data(
invoke_context=InvokeContext(),
publisher_name="test",
publisher_type="test",
data=messages,
consumed_events=[]
)
assert event is not None
assert len(topic.topic_events) == 1
def test_condition_filtering():
# Create topic with condition
topic = (Topic.builder()
.name("filtered_topic")
.condition(lambda msgs: len(msgs) > 1)
.build())
# Test with single message (should be filtered)
single_message = [Message(role="user", content="test")]
event = topic.publish_data(
invoke_context=InvokeContext(),
publisher_name="test",
publisher_type="test",
data=single_message,
consumed_events=[]
)
assert event is None
assert len(topic.topic_events) == 0
The Topic
class provides a robust, production-ready implementation of the topic-based messaging pattern with built-in logging, event handling, and flexible configuration options for Graphite applications.