Containers
The Graphite container system provides a thread-safe singleton-based dependency injection container for managing shared resources throughout the application. It handles the registration and lifecycle of core components like event stores and tracers.
Overview
The container system provides:
- Singleton Pattern: Thread-safe singleton implementation for global access
- Dependency Injection: Centralized registration and retrieval of dependencies
- Event Store Management: Registration and default setup of event storage
- Tracing Integration: Registration and configuration of OpenTelemetry tracing
- Lazy Initialization: On-demand creation of default implementations
- Production Safety: Warnings for development-only components
Core Components
SingletonMeta
A thread-safe meta-class that implements the singleton pattern.
Features
- Thread Safety: Uses threading locks to prevent race conditions
- Instance Management: Maintains a dictionary of singleton instances per class
- Memory Efficiency: Ensures only one instance exists per class type
class SingletonMeta(type):
_instances: dict[type, object] = {}
_lock: threading.Lock = threading.Lock()
def __call__(cls: "SingletonMeta", *args: Any, **kwargs: Any) -> Any:
# Ensure thread-safe singleton creation
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
Container
The main dependency injection container using singleton pattern.
Properties
Property | Type | Description |
---|---|---|
event_store |
EventStore |
Returns registered event store or creates default |
tracer |
Tracer |
Returns registered tracer or creates default |
Methods
Method | Signature | Description |
---|---|---|
register_event_store |
(event_store: EventStore) -> None |
Register custom event store implementation |
register_tracer |
(tracer: Tracer) -> None |
Register custom tracer implementation |
Global Instance
A pre-instantiated global container instance available throughout the application.
Usage Examples
Basic Container Usage
from grafi.common.containers.container import container
# Access the global container instance
event_store = container.event_store
tracer = container.tracer
print(f"Event store type: {type(event_store)}")
print(f"Tracer type: {type(tracer)}")
Custom Event Store Registration
from grafi.common.containers.container import container
from grafi.common.event_stores.event_store_postgres import EventStorePostgres
# Create custom event store
postgres_store = EventStorePostgres(
connection_string="postgresql://user:pass@localhost:5432/events"
)
# Register with container
container.register_event_store(postgres_store)
# Now all access will use the custom store
event_store = container.event_store
assert isinstance(event_store, EventStorePostgres)
Custom Tracer Registration
from grafi.common.containers.container import container
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Create custom tracer with Jaeger export
tracer_provider = TracerProvider()
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)
# Set as global tracer provider
trace.set_tracer_provider(tracer_provider)
custom_tracer = trace.get_tracer(__name__)
# Register with container
container.register_tracer(custom_tracer)
# Now all access will use the custom tracer
tracer = container.tracer
Event Store Integration
Default Behavior
# First access creates default in-memory store
event_store = container.event_store
# Logs warning: "Using EventStoreInMemory. This is ONLY suitable for local testing..."
Production Setup
def setup_production_container():
"""Setup container for production environment."""
from grafi.common.event_stores.event_store_postgres import EventStorePostgres
import os
# Get database connection from environment
db_url = os.getenv("DATABASE_URL")
if not db_url:
raise ValueError("DATABASE_URL environment variable required")
# Create and register production event store
prod_store = EventStorePostgres(connection_string=db_url)
container.register_event_store(prod_store)
print("Production event store registered")
# Call during application startup
setup_production_container()
Event Store Validation
def validate_event_store():
"""Validate that production event store is configured."""
from grafi.common.event_stores.event_store_in_memory import EventStoreInMemory
event_store = container.event_store
if isinstance(event_store, EventStoreInMemory):
raise RuntimeError(
"Production environment detected with in-memory event store. "
"Please configure a persistent event store."
)
print(f"Using production event store: {type(event_store).__name__}")
Tracing Integration
Default Tracing Setup
# First access creates default tracer with auto-configuration
tracer = container.tracer
# Uses setup_tracing with default parameters:
# - tracing_options=TracingOptions.AUTO
# - collector_endpoint="localhost"
# - collector_port=4317
# - project_name="grafi-trace"
Custom Tracing Configuration
def setup_custom_tracing():
"""Setup custom tracing configuration."""
from grafi.common.instrumentations.tracing import setup_tracing, TracingOptions
import os
# Get tracing configuration from environment
collector_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost")
collector_port = int(os.getenv("OTEL_EXPORTER_OTLP_PORT", "4317"))
service_name = os.getenv("SERVICE_NAME", "grafi-service")
# Setup custom tracer
custom_tracer = setup_tracing(
tracing_options=TracingOptions.ENABLED,
collector_endpoint=collector_endpoint,
collector_port=collector_port,
project_name=service_name
)
# Register with container
container.register_tracer(custom_tracer)
print(f"Custom tracing configured for {service_name}")
setup_custom_tracing()
Distributed Tracing
def create_span_example():
"""Example of using container tracer for distributed tracing."""
tracer = container.tracer
with tracer.start_as_current_span("process_user_request") as span:
span.set_attribute("user.id", "12345")
span.set_attribute("request.type", "get_profile")
# Simulate processing
process_request()
span.set_attribute("response.status", "success")
def process_request():
"""Nested span example."""
tracer = container.tracer
with tracer.start_as_current_span("database_query") as span:
span.set_attribute("db.operation", "SELECT")
span.set_attribute("db.table", "users")
# Database operation simulation
result = query_database()
span.set_attribute("db.rows_affected", len(result))
Application Lifecycle Integration
Startup Configuration
class Application:
def __init__(self):
self.container = container
def configure_dependencies(self):
"""Configure all application dependencies."""
self._setup_event_store()
self._setup_tracing()
self._validate_configuration()
def _setup_event_store(self):
"""Setup event store based on environment."""
import os
if os.getenv("ENVIRONMENT") == "production":
from grafi.common.event_stores.event_store_postgres import EventStorePostgres
db_url = os.getenv("DATABASE_URL")
if not db_url:
raise ValueError("DATABASE_URL required in production")
event_store = EventStorePostgres(connection_string=db_url)
self.container.register_event_store(event_store)
else:
# Development - use default in-memory store
pass
def _setup_tracing(self):
"""Setup tracing based on environment."""
import os
from grafi.common.instrumentations.tracing import setup_tracing, TracingOptions
if os.getenv("TRACING_ENABLED", "false").lower() == "true":
tracer = setup_tracing(
tracing_options=TracingOptions.ENABLED,
collector_endpoint=os.getenv("OTEL_ENDPOINT", "localhost"),
collector_port=int(os.getenv("OTEL_PORT", "4317")),
project_name=os.getenv("SERVICE_NAME", "grafi-app")
)
self.container.register_tracer(tracer)
def _validate_configuration(self):
"""Validate container configuration."""
# Access properties to trigger initialization
event_store = self.container.event_store
tracer = self.container.tracer
print(f"Event store: {type(event_store).__name__}")
print(f"Tracer: {type(tracer).__name__}")
def start(self):
"""Start the application."""
self.configure_dependencies()
print("Application started with configured dependencies")
# Usage
app = Application()
app.start()
Graceful Shutdown
class ApplicationManager:
def __init__(self):
self.container = container
async def shutdown(self):
"""Gracefully shutdown application resources."""
print("Shutting down application...")
# Close event store connections
event_store = self.container.event_store
if hasattr(event_store, 'close'):
await event_store.close()
# Flush tracer spans
tracer = self.container.tracer
if hasattr(tracer, 'force_flush'):
tracer.force_flush()
print("Application shutdown complete")
Testing with Containers
Test Container Setup
import pytest
from grafi.common.containers.container import Container
from grafi.common.event_stores.event_store_in_memory import EventStoreInMemory
@pytest.fixture
def test_container():
"""Create a test container with in-memory components."""
test_container = Container()
# Use in-memory event store for tests
test_store = EventStoreInMemory()
test_container.register_event_store(test_store)
# Use no-op tracer for tests
from opentelemetry.trace import NoOpTracer
test_tracer = NoOpTracer()
test_container.register_tracer(test_tracer)
yield test_container
# Cleanup if needed
if hasattr(test_store, 'clear'):
test_store.clear()
def test_event_store_integration(test_container):
"""Test event store integration."""
event_store = test_container.event_store
# Verify it's the test store
assert isinstance(event_store, EventStoreInMemory)
# Test basic operations
from grafi.common.events.event import Event
test_event = Event(event_id="test-123")
event_store.record_event(test_event)
retrieved = event_store.get_event("test-123")
assert retrieved.event_id == "test-123"
Mock Container
from unittest.mock import Mock, patch
def test_with_mocked_container():
"""Test using mocked container dependencies."""
# Create mock event store
mock_event_store = Mock()
mock_event_store.record_event.return_value = None
mock_event_store.get_events.return_value = []
# Create mock tracer
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_as_current_span.return_value.__enter__.return_value = mock_span
# Patch container properties
with patch('grafi.common.containers.container.container.event_store', mock_event_store), \
patch('grafi.common.containers.container.container.tracer', mock_tracer):
# Test code using container
from grafi.common.containers.container import container
event_store = container.event_store
tracer = container.tracer
# Verify mocks are used
assert event_store is mock_event_store
assert tracer is mock_tracer
Thread Safety
Concurrent Access
import threading
import time
from grafi.common.containers.container import container
def worker_function(worker_id: int, results: dict):
"""Worker function to test thread safety."""
# Access container from multiple threads
event_store = container.event_store
tracer = container.tracer
# Store results for verification
results[worker_id] = {
'event_store_id': id(event_store),
'tracer_id': id(tracer)
}
def test_thread_safety():
"""Test that container is thread-safe."""
results = {}
threads = []
# Create multiple threads
for i in range(10):
thread = threading.Thread(
target=worker_function,
args=(i, results)
)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Verify all threads got the same instances
event_store_ids = {r['event_store_id'] for r in results.values()}
tracer_ids = {r['tracer_id'] for r in results.values()}
assert len(event_store_ids) == 1, "Event store should be singleton"
assert len(tracer_ids) == 1, "Tracer should be singleton"
print("Thread safety test passed")
# Run the test
test_thread_safety()
Best Practices
Container Configuration
- Early Registration: Register dependencies during application startup
- Environment-Based Setup: Use environment variables for configuration
- Validation: Validate container configuration before starting main logic
- Production Safety: Never use in-memory stores in production
Dependency Management
- Single Responsibility: Keep container focused on dependency injection
- Lazy Loading: Let container handle lazy initialization of defaults
- Type Safety: Use proper type hints for registered dependencies
- Error Handling: Handle missing dependencies gracefully
Testing Strategies
- Test Containers: Use separate container instances for tests
- Mock Dependencies: Mock container dependencies for unit tests
- Integration Tests: Test with real dependencies in integration tests
- Cleanup: Always clean up test resources
Performance Considerations
- Singleton Benefits: Leverage singleton pattern for shared resources
- Thread Safety: Container is thread-safe by design
- Memory Efficiency: Single instances reduce memory overhead
- Initialization Cost: Lazy initialization spreads startup cost
Error Handling
Common Issues
def handle_container_errors():
"""Examples of handling container-related errors."""
try:
# This might fail if dependencies are not available
event_store = container.event_store
except Exception as e:
print(f"Failed to get event store: {e}")
# Fallback to in-memory store
from grafi.common.event_stores.event_store_in_memory import EventStoreInMemory
fallback_store = EventStoreInMemory()
container.register_event_store(fallback_store)
def validate_production_setup():
"""Validate that production dependencies are properly configured."""
import os
from grafi.common.event_stores.event_store_in_memory import EventStoreInMemory
if os.getenv("ENVIRONMENT") == "production":
event_store = container.event_store
if isinstance(event_store, EventStoreInMemory):
raise RuntimeError(
"Production environment using in-memory event store. "
"Configure persistent storage."
)
# Additional validation
if not hasattr(event_store, 'connection_pool'):
raise RuntimeError("Event store missing connection pool")
Migration Guide
From Direct Dependencies to Container
# Before: Direct dependency instantiation
# event_store = EventStorePostgres(connection_string)
# tracer = setup_tracing(...)
# After: Using container
from grafi.common.containers.container import container
# Setup once during application startup
container.register_event_store(event_store)
container.register_tracer(tracer)
# Use throughout application
event_store = container.event_store
tracer = container.tracer
Existing Code Integration
class ExistingService:
def __init__(self):
# Old way - direct instantiation
# self.event_store = EventStoreInMemory()
# New way - use container
from grafi.common.containers.container import container
self.event_store = container.event_store
self.tracer = container.tracer
def process_data(self, data):
# Use tracer from container
with self.tracer.start_as_current_span("process_data") as span:
span.set_attribute("data.size", len(data))
# Process data
result = self._transform_data(data)
# Record event using container's event store
from grafi.common.events.event import Event
event = Event(event_id=f"processed-{result.id}")
self.event_store.record_event(event)
return result
The container system provides a robust foundation for dependency injection in Graphite applications, ensuring thread-safe access to shared resources while maintaining flexibility for different deployment environments and testing scenarios.