Workflow Components
This document details the internal components that power the event-driven workflow's asynchronous execution model. These components work together to provide efficient concurrent processing, proper termination detection, and streaming output capabilities.
AsyncNodeTracker
The AsyncNodeTracker
is a critical component that manages the state of active nodes in an asynchronous workflow. It provides coordination between concurrent node executions and enables proper workflow termination detection.
Purpose
The AsyncNodeTracker serves several key purposes:
- Activity Tracking: Monitors which nodes are currently processing events
- Idle Detection: Determines when the entire workflow has no active nodes
- Progress Monitoring: Tracks processing cycles to detect workflow progress
- Synchronization: Provides coordination primitives for concurrent operations
Architecture
class AsyncNodeTracker:
def __init__(self) -> None:
self._active: set[str] = set() # Currently active nodes
self._processing_count: Dict[str, int] = defaultdict(int) # Processing cycles per node
self._cond = asyncio.Condition() # Coordination primitive
self._idle_event = asyncio.Event() # Signals when workflow is idle
Key Methods
Method | Description |
---|---|
enter(node_name: str) |
Marks a node as active when it begins processing |
leave(node_name: str) |
Marks a node as inactive when it completes processing |
is_idle() |
Returns True if no nodes are currently active |
wait_idle_event() |
Async wait until the workflow becomes idle |
get_activity_count() |
Returns total processing cycles across all nodes |
reset() |
Resets the tracker to initial state |
Usage Example
tracker = AsyncNodeTracker()
# Node entering active state
await tracker.enter("node1")
assert not tracker.is_idle()
# Node completing processing
await tracker.leave("node1")
assert tracker.is_idle()
# Wait for workflow to become idle
await tracker.wait_idle_event()
Workflow Integration
The AsyncNodeTracker integrates with the workflow execution in several ways:
- Node Lifecycle: Each node signals when it enters/leaves active state
- Termination Detection: Output listeners use idle state to determine completion
- Progress Monitoring: Activity count helps detect stalled workflows
graph TD
A[Node Receives Events] --> B[tracker.enter(node_name)]
B --> C[Node Processing]
C --> D[tracker.leave(node_name)]
D --> E{All Nodes Idle?}
E -->|Yes| F[Signal Workflow Complete]
E -->|No| G[Continue Processing]
AsyncOutputQueue
The AsyncOutputQueue
manages the collection and streaming of output events from multiple topics in an asynchronous workflow. It coordinates between output topic listeners and provides a unified async iterator interface for consuming events.
Purpose
The AsyncOutputQueue addresses several challenges:
- Multi-Topic Monitoring: Efficiently monitors multiple output topics concurrently
- Stream Coordination: Provides a single stream from multiple sources
- Termination Detection: Integrates with AsyncNodeTracker for proper completion
- Type Safety: Provides proper type hints for async iteration
Architecture
class AsyncOutputQueue:
def __init__(
self,
output_topics: List[TopicBase],
consumer_name: str,
tracker: AsyncNodeTracker,
):
self.output_topics = output_topics
self.consumer_name = consumer_name
self.tracker = tracker
self.queue: asyncio.Queue[TopicEvent] = asyncio.Queue()
self.listener_tasks: List[asyncio.Task] = []
Key Features
Feature | Description |
---|---|
Async Iteration | Implements __aiter__ and __anext__ for async for loop support |
Concurrent Listeners | Spawns separate listener tasks for each output topic |
Idle Detection | Monitors workflow idle state to determine when to stop iteration |
Graceful Shutdown | Provides methods to cleanly stop all listener tasks |
Usage Example
# Create output queue with topics and tracker
output_queue = AsyncOutputQueue(output_topics, "consumer_name", tracker)
# Start listening to all topics
await output_queue.start_listeners()
try:
# Consume events as they arrive
async for event in output_queue:
# Process event
yield event.data
finally:
# Clean shutdown
await output_queue.stop_listeners()
Output Listener Logic
Each output topic gets its own listener that implements sophisticated termination logic:
async def _output_listener(self, topic: TopicBase):
last_activity_count = 0
while True:
# Wait for either new data or idle state
topic_task = asyncio.create_task(topic.a_consume(self.consumer_name))
idle_event_waiter = asyncio.create_task(self.tracker.wait_idle_event())
done, pending = await asyncio.wait(
{topic_task, idle_event_waiter},
return_when=asyncio.FIRST_COMPLETED,
)
# Process new events
if topic_task in done:
output_events = topic_task.result()
for event in output_events:
await self.queue.put(event)
# Check for completion
if idle_event_waiter in done and self.tracker.is_idle():
current_activity = self.tracker.get_activity_count()
# No new activity and no data = done
if current_activity == last_activity_count and not topic.can_consume(self.consumer_name):
break
last_activity_count = current_activity
Iteration and Termination
The AsyncOutputQueue implements async iteration with proper idle detection:
async def __anext__(self) -> TopicEvent:
while True:
queue_task = asyncio.create_task(self.queue.get())
idle_task = asyncio.create_task(self.tracker._idle_event.wait())
done, pending = await asyncio.wait(
{queue_task, idle_task},
return_when=asyncio.FIRST_COMPLETED,
)
# Data available - return it
if queue_task in done:
idle_task.cancel()
return queue_task.result()
# Workflow idle - check if we should stop
queue_task.cancel()
await asyncio.sleep(0) # Allow downstream activation
if self.tracker.is_idle() and self.queue.empty():
raise StopAsyncIteration
Integration with EventDrivenWorkflow
These components work together in the EventDrivenWorkflow's async execution:
async def a_invoke(self, input_event: PublishToTopicEvent) -> AsyncGenerator[ConsumeFromTopicEvent, None]:
# Initialize workflow
await self.a_init_workflow(invoke_context, input)
# Start node processing tasks
node_processing_tasks = [
asyncio.create_task(self._invoke_node(invoke_context, node))
for node in self.nodes.values()
]
# Create output queue for streaming results
output_topics = [t for t in self._topics.values()
if t.type in (AGENT_OUTPUT_TOPIC_TYPE, IN_WORKFLOW_OUTPUT_TOPIC_TYPE)]
output_queue = AsyncOutputQueue(output_topics, self.name, self._tracker)
await output_queue.start_listeners()
try:
# Stream results as they arrive
async for event in output_queue:
yield event.data
finally:
await output_queue.stop_listeners()
Coordination Flow
graph TD
A[Workflow Start] --> B[Initialize AsyncNodeTracker]
B --> C[Start Node Tasks]
C --> D[Create AsyncOutputQueue]
D --> E[Start Output Listeners]
E --> F{Events Available?}
F -->|Yes| G[Yield Event Data]
G --> F
F -->|No & Idle| H[Check Termination]
H -->|Activity Changed| F
H -->|No Activity| I[Stop Iteration]
I --> J[Cleanup Tasks]
Benefits
The combination of AsyncNodeTracker and AsyncOutputQueue provides:
- Efficient Concurrency: Multiple nodes and topics processed in parallel
- Proper Termination: Detects when workflow is truly complete
- Stream Processing: Events streamed as available, not batched
- Resource Management: Clean startup and shutdown of async tasks
- Type Safety: Full type hints for better IDE support and error detection
Best Practices
When working with these components:
- Always Clean Up: Use try/finally blocks to ensure listener tasks are stopped
- Monitor Activity: Use activity counts to detect stalled workflows
- Handle Cancellation: Nodes should handle CancelledError gracefully
- Coordinate Properly: Ensure nodes signal enter/leave at appropriate times
- Test Thoroughly: These components handle complex async coordination
These workflow components form the foundation of Graphite's robust asynchronous execution model, enabling scalable and reliable event-driven processing.