A lightweight, production-ready framework for building stateful multi-agent systems with Redis persistence and concurrent processing.
A proof-of-concept implementation of a stateful multi-agent system using LangGraph, LangChain, and Redis for persistent state management.
- Python: 3.8+
- Redis: 6.0+ for state management and message brokering
- LangChain: For building and chaining language model applications
- LangGraph: For orchestrating multi-agent workflows
- OpenAI API: For language model capabilities
langchain>=0.1.0
langgraph>=0.1.0
redis>=4.5.0
openai>=1.0.0
rich>=13.0.0
python-dotenv>=1.0.0
pytest>=7.0.0 # For testing
- Poetry: For dependency management
- Pytest: For unit and integration testing
- Black & isort: For code formatting
- Mypy: For static type checking
- Docker: For containerization (optional)
- CPU: 4 cores (8+ recommended for production)
- RAM: 8GB (16GB+ recommended for concurrent operations)
- Storage: 1GB free disk space (SSD recommended)
- Network: Stable internet connection for API calls
- Operating Systems:
- Ubuntu 20.04 LTS / 22.04 LTS
- macOS 12+ (Intel/Apple Silicon)
- Windows 10/11 (WSL2 recommended for better performance)
- Redis Server: Download Redis
- Version: 6.0 or higher
- Memory: At least 1GB allocated
- Persistence: AOF (Append Only File) recommended for data durability
- Python: 3.8 - 3.11 (3.11 recommended)
- pip: Latest stable version
- Virtual Environment: venv, conda, or pyenv
- Stateful Agents: Maintain conversation context and task state across interactions
- Distributed Processing: Handle multiple concurrent operations with Redis-backed state management
- Modular Architecture: Easy to extend with new agent types and workflows
- Persistence: All agent states and conversation history are persisted in Redis
-
Install Redis
# macOS brew install redis # Ubuntu/Debian sudo apt update && sudo apt install -y redis-server # Windows (using WSL2 recommended) # Follow: https://redis.io/docs/install/install-redis/install-redis-on-windows/
-
Set up Python environment
# Clone the repository git clone https://github.com/yourusername/stateful_multiagent_poc.git cd stateful_multiagent_poc # Create and activate virtual environment python -m venv venv source venv/bin/activate # or `venv\Scripts\activate` on Windows # Install dependencies pip install -r requirements.txt # Set your OpenAI API key echo "OPENAI_API_KEY=your-api-key-here" > .env
-
Run the example
# Start Redis server (in a new terminal) redis-server & # Run the example python examples/basic_usage.py
from storage.redis_memory_clean import RedisMemory
# Initialize with default settings
memory = RedisMemory()
# Save state
state = {
'conversation': [{'role': 'user', 'content': 'Hello, world!'}],
'context': {'user_id': 123}
}
memory.save('session_123', state)
# Retrieve state
loaded_state = memory.load('session_123')
print(loaded_state)-
Agent System
- Stateful agents with persistent memory
- Concurrent operation support
- Automatic state recovery
-
Storage Layer
- Redis for state persistence
- Optimized for high-frequency reads/writes
- Thread-safe operations with proper locking
-
Orchestration
- LangGraph-based workflow management
- Task distribution and coordination
- Error handling and retries
graph TD
A[Client Request] --> B[Orchestrator]
B --> C[Agent 1]
B --> D[Agent 2]
B --> E[Agent N]
C --> F[(Redis State)]
D --> F
E --> F
-
Initialization
- Load environment configuration
- Initialize Redis connection
- Set up agent registry
-
Request Processing
- Parse input task
- Create or load session state
- Route to appropriate agent(s)
-
State Management
- Automatic state persistence
- Concurrent access control
- State versioning and history
Before proceeding with the setup, ensure you have the following installed:
-
Development Tools:
-
API Keys:
- OpenAI API key (Get API Key)
- (Optional) Redis Cloud account for managed Redis
-
System Dependencies:
# Ubuntu/Debian sudo apt-get update sudo apt-get install -y build-essential python3-dev # macOS brew install python redis # Windows (Using WSL2 recommended) # Install WSL2 and Ubuntu from Microsoft Store
- Python 3.8 or higher
- Redis server (latest stable version recommended)
- OpenAI API key
- Git (for version control)
-
Clone the repository:
git clone <repository-url> cd stateful_multiagent_poc
-
Set up Python environment:
# Create and activate virtual environment python -m venv venv # Windows .\venv\Scripts\activate # Unix/macOS # source venv/bin/activate
-
Install dependencies:
pip install -r requirements.txt
-
Configure environment variables: Create a
.envfile in the project root:OPENAI_API_KEY=your-api-key-here REDIS_HOST=localhost REDIS_PORT=6379
-
Start Redis server:
# Windows redis-server # Linux/macOS sudo service redis-server start
-
Clone the repository:
git clone <repository-url> cd stateful_multiagent_poc
-
Create and activate a virtual environment:
python -m venv venv .\venv\Scripts\activate # On Windows source venv/bin/activate # On Unix/macOS
-
Install dependencies:
pip install -r requirements.txt
-
Set your OpenAI API key:
# On Windows set OPENAI_API_KEY=your-api-key-here # On Unix/macOS export OPENAI_API_KEY=your-api-key-here
-
Ensure Redis server is running:
# On Windows redis-server # On Linux/macOS sudo service redis-server start
Run the main application:
python main.py "Your task description here"--redis-host: Redis server host (default: localhost)--redis-port: Redis server port (default: 6379)--session-id: Optional session ID for state persistence
Example with custom Redis configuration:
python main.py "Analyze this document" --redis-host localhost --redis-port 6379 --session-id my-session-123stateful_multiagent_poc/
├── agents/ # Agent implementations
├── orchestrator/ # Orchestration logic
├── storage/ # Storage backends (Redis)
├── tests/ # Test suite
├── main.py # Main application entry point
└── requirements.txt # Project dependencies
-
Unit Tests
pytest tests/ -v
- Verifies individual components in isolation
- Validates state management operations
- Checks concurrent access patterns
-
Integration Tests
pytest tests/test_integration.py -v
- Tests end-to-end workflows
- Validates Redis persistence
- Verifies agent interactions
✅ [PASS] test_concurrent_access (0.5s)
✓ Verifies concurrent state updates
✓ Validates lock acquisition/release
✓ Ensures data consistency
📊 Test Summary:
- Total: 15
- Passed: 15
- Failed: 0
- Coverage: 92%
- Detailed logs in
logs/directory - Debug information for troubleshooting
- Performance metrics and timing
-
Create Agent Class
from agents.base_agent import BaseAgent class MyAgent(BaseAgent): def __init__(self, config): super().__init__(config) async def process(self, input_data): # Your agent logic here return {"result": "success"}
-
Register Agent
# In orchestrator/__init__.py from agents.my_agent import MyAgent AGENT_REGISTRY = { 'my_agent': MyAgent, # ... other agents }
-
Enable Debug Logging
LOG_LEVEL=DEBUG python main.py
-
Inspect Redis
redis-cli > KEYS * > GET <key>
-
Performance Profiling
python -m cProfile -o profile.stats main.py
- Request latency
- Memory usage
- Redis connection status
- Error rates
# View recent errors
grep ERROR logs/app.log
# Monitor in real-time
tail -f logs/app.log | grep -i warning\|error# examples/chat_app.py
from storage.redis_memory_clean import RedisMemory
class ChatAgent:
def __init__(self):
self.memory = RedisMemory()
def process_message(self, session_id, message):
# Load conversation history
state = self.memory.load(session_id)
# Update conversation
state.setdefault('messages', []).append(message)
# Save updated state
self.memory.save(session_id, state)
return state
# Usage
agent = ChatAgent()
agent.process_message('user1', {'role': 'user', 'content': 'Hello!'})# examples/task_processor.py
from concurrent.futures import ThreadPoolExecutor
from storage.redis_memory_clean import RedisMemory
class TaskProcessor:
def __init__(self):
self.memory = RedisMemory()
def process_task(self, task_id, data):
with self.memory.lock(f'task_{task_id}'):
state = self.memory.load(task_id)
# Process task here
state['status'] = 'completed'
state['result'] = f"Processed {data}"
self.memory.save(task_id, state)
return state
# Process tasks in parallel
def process_tasks():
processor = TaskProcessor()
with ThreadPoolExecutor() as executor:
tasks = [('task1', 'data1'), ('task2', 'data2')]
results = list(executor.map(lambda x: processor.process_task(*x), tasks))
return results-
Redis Connection Failed
# Check if Redis is running redis-cli ping # Should return: PONG # If not running, start Redis redis-server --daemonize yes
-
Missing Dependencies
# Ensure all dependencies are installed pip install -r requirements.txt -
API Key Not Found
# Verify .env file exists and contains: # OPENAI_API_KEY=your-api-key-here
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch:
git checkout -b feature/your-feature - Commit your changes:
git commit -m 'Add some feature' - Push to the branch:
git push origin feature/your-feature - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Fork the repository
- Create a feature branch
- Commit your changes
- Push to the branch
- Create a new Pull Request