Topic Event Cache
The TopicEventCache is a sophisticated in-memory caching system that provides Kafka-like functionality for managing topic events, consumer offsets, and reliable message processing in Graphite workflows. It acts as a miniature message broker within each topic, supporting concurrent producers and consumers with proper offset management.
Overview
The TopicEventCache implements:
- Event Storage: Contiguous log of topic events with offset-based indexing
- Consumer Tracking: Per-consumer offset management (consumed and committed)
- Async Operations: Full async/await support with condition variables
- Reliable Processing: Separate consumed/committed offsets prevent duplicate processing
- Backpressure: Built-in flow control and timeout handling
Architecture
Core Components
class TopicEventCache:
def __init__(self, name: str = ""):
self.name: str = name
self._records: List[TopicEvent] = [] # contiguous log
# Per-consumer cursors
self._consumed: Dict[str, int] = defaultdict(int) # next offset to read
self._committed: Dict[str, int] = defaultdict(lambda: -1) # last committed offset
# For asynchronous operations
self._cond: asyncio.Condition = asyncio.Condition()
Offset Management
The cache maintains two types of offsets per consumer:
- Consumed Offset: Tracks the next message to fetch (advanced immediately on fetch)
- Committed Offset: Tracks messages that have been fully processed (advanced after processing)
This dual-offset system prevents duplicate message processing in concurrent environments.
Core Methods
Event Storage
put(event: TopicEvent) → TopicEvent
Synchronously append an event to the log.
async a_put(event: TopicEvent) → TopicEvent
Asynchronously append an event and notify waiting consumers.
async def a_put(self, event: TopicEvent) -> TopicEvent:
async with self._cond:
self._records.append(event)
self._cond.notify_all() # wake waiting consumers
return event
Event Consumption
can_consume(consumer_id: str) → bool
Check if a consumer has unread messages.
def can_consume(self, consumer_id: str) -> bool:
self._ensure_consumer(consumer_id)
# Can consume if there are records beyond the consumed offset
return self._consumed[consumer_id] < len(self._records)
fetch(consumer_id: str, offset: Optional[int] = None) → List[TopicEvent]
Synchronously fetch unread events and advance consumed offset.
def fetch(self, consumer_id: str, offset: Optional[int] = None) -> List[TopicEvent]:
"""
Fetch records newer than the consumer's consumed offset.
Immediately advances consumed offset to prevent duplicate fetches.
"""
self._ensure_consumer(consumer_id)
if self.can_consume(consumer_id):
start = self._consumed[consumer_id]
if offset is not None:
end = min(len(self._records), offset + 1)
batch = self._records[start:end]
else:
batch = self._records[start:]
# Advance consumed offset immediately to prevent duplicate fetches
self._consumed[consumer_id] += len(batch)
return batch
return []
async a_fetch(consumer_id: str, offset: Optional[int] = None, timeout: Optional[float] = None) → List[TopicEvent]
Asynchronously fetch events with blocking and timeout support.
async def a_fetch(
self,
consumer_id: str,
offset: Optional[int] = None,
timeout: Optional[float] = None,
) -> List[TopicEvent]:
"""
Await fresh records newer than the consumer's consumed offset.
Immediately advances consumed offset to prevent duplicate fetches.
"""
self._ensure_consumer(consumer_id)
async with self._cond:
# Wait for data to become available
while not self.can_consume(consumer_id):
if timeout is None:
await self._cond.wait()
else:
try:
await asyncio.wait_for(self._cond.wait(), timeout)
except asyncio.TimeoutError:
return []
start = self._consumed[consumer_id]
if offset is not None:
end = min(len(self._records), offset + 1)
batch = self._records[start:end]
else:
batch = self._records[start:]
# Advance consumed offset immediately
self._consumed[consumer_id] += len(batch)
return batch
Offset Commitment
commit_to(consumer_id: str, offset: int) → int
Synchronously commit processed messages up to the specified offset.
def commit_to(self, consumer_id: str, offset: int) -> int:
"""
Marks everything up to `offset` as processed/durable
for this consumer.
"""
self._ensure_consumer(consumer_id)
# Only commit if offset is greater than current committed
if offset > self._committed[consumer_id]:
self._committed[consumer_id] = offset
return self._committed[consumer_id]
async a_commit_to(consumer_id: str, offset: int) → None
Asynchronously commit processed messages.
async def a_commit_to(self, consumer_id: str, offset: int) -> None:
"""Commit all offsets up to and including the specified offset."""
async with self._cond:
self._ensure_consumer(consumer_id)
if offset > self._committed[consumer_id]:
self._committed[consumer_id] = offset
Usage Patterns
Basic Producer-Consumer
# Producer
cache = TopicEventCache("my_topic")
event = PublishToTopicEvent(...)
cache.put(event)
# Consumer
consumer_id = "consumer_1"
if cache.can_consume(consumer_id):
events = cache.fetch(consumer_id)
# Process events
for event in events:
process_event(event)
# Commit after successful processing
if events:
last_offset = events[-1].offset
cache.commit_to(consumer_id, last_offset)
Async Producer-Consumer
async def producer():
cache = TopicEventCache("async_topic")
for i in range(10):
event = create_event(i)
await cache.a_put(event)
await asyncio.sleep(0.1)
async def consumer():
cache = TopicEventCache("async_topic")
consumer_id = "async_consumer"
while True:
# Fetch with timeout
events = await cache.a_fetch(consumer_id, timeout=1.0)
if not events:
break # Timeout occurred
# Process events
for event in events:
await process_event_async(event)
# Commit after processing
if events:
last_offset = events[-1].offset
await cache.a_commit_to(consumer_id, last_offset)
Multiple Consumers
cache = TopicEventCache("shared_topic")
# Each consumer tracks its own offsets
consumers = ["consumer_1", "consumer_2", "consumer_3"]
for consumer_id in consumers:
if cache.can_consume(consumer_id):
events = cache.fetch(consumer_id)
# Each consumer gets its own view of unprocessed events
Best Practices
Offset Management
- Immediate Consumption Tracking: The consumed offset is advanced immediately on fetch to prevent duplicate fetches
- Commit After Processing: Only commit offsets after successful event processing
- Batch Commits: Commit the highest offset in a batch for efficiency
Error Handling
async def robust_consumer():
cache = TopicEventCache("robust_topic")
consumer_id = "robust_consumer"
try:
events = await cache.a_fetch(consumer_id, timeout=5.0)
if not events:
return # No events or timeout
processed_events = []
for event in events:
try:
await process_event(event)
processed_events.append(event)
except Exception as e:
logger.error(f"Failed to process event {event.offset}: {e}")
# Decide whether to skip or retry
break
# Only commit successfully processed events
if processed_events:
last_offset = processed_events[-1].offset
await cache.a_commit_to(consumer_id, last_offset)
except asyncio.TimeoutError:
logger.info("No events available within timeout")
except Exception as e:
logger.error(f"Consumer error: {e}")
Performance Optimization
- Batch Processing: Process multiple events in batches when possible
- Timeout Management: Use appropriate timeouts to prevent indefinite blocking
- Memory Management: Monitor cache size for long-running topics
- Consumer Cleanup: Reset consumers that are no longer needed
Testing
async def test_cache_behavior():
cache = TopicEventCache("test_topic")
# Test basic put/fetch
event = create_test_event()
await cache.a_put(event)
consumer_id = "test_consumer"
assert cache.can_consume(consumer_id)
events = await cache.a_fetch(consumer_id)
assert len(events) == 1
assert events[0] == event
# Test duplicate fetch prevention
events2 = await cache.a_fetch(consumer_id)
assert len(events2) == 0 # Should be empty due to consumed offset
# Test commit and reset
await cache.a_commit_to(consumer_id, 0)
cache.reset()
assert len(cache._records) == 0
assert cache._consumed[consumer_id] == 0
assert cache._committed[consumer_id] == -1
Integration with Topics
The TopicEventCache is used internally by all TopicBase implementations:
class TopicBase(BaseModel):
event_cache: TopicEventCache = Field(default_factory=TopicEventCache)
def can_consume(self, consumer_name: str) -> bool:
return self.event_cache.can_consume(consumer_name)
async def a_consume(self, consumer_name: str, timeout: Optional[float] = None) -> List[TopicEvent]:
return await self.event_cache.a_fetch(consumer_name, timeout=timeout)
async def a_commit(self, consumer_name: str, offset: int) -> None:
await self.event_cache.a_commit_to(consumer_name, offset)
This provides a consistent, reliable messaging substrate for all topic types in Graphite workflows, ensuring proper event ordering, delivery guarantees, and offset management across the entire system.