Skip to content

jsaraivx/stream-guard-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

28 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Stream-Guard-Kafka ๐Ÿ›ก๏ธ

Python Kafka ksqlDB PostgreSQL Docker

Real-time fraud detection system for financial transactions using Hybrid Architecture (Kappa Architecture).

๐ŸŽฏ Objective

Simulate a high-performance Data Engineering pipeline where banking transactions are ingested, analyzed, and stored in real-time. The project demonstrates the use of Declarative Stream Processing (ksqlDB) for temporal window rules and Imperative Processing (Python) for complex business logic and persistence.

๐Ÿ—๏ธ Architecture

The system uses a hybrid pattern to maximize performance:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Producer  โ”‚โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚    Kafka    โ”‚โ—€โ”€โ”€โ”€โ”€โ–ถโ”‚   ksqlDB    โ”‚
โ”‚   (Faker)   โ”‚      โ”‚   Broker    โ”‚      โ”‚   Server    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                            โ”‚                     โ”‚
                            โ”‚ Topic: fraud_alerts โ”‚
                            โ–ผ                     โ–ผ
                     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                     โ”‚  Consumer   โ”‚      โ”‚  Kafka-UI   โ”‚
                     โ”‚  (Python)   โ”‚      โ”‚ (Monitor)   โ”‚
                     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                            โ”‚
                            โ–ผ
                     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                     โ”‚ PostgreSQL  โ”‚
                     โ”‚  Database   โ”‚
                     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Data Flow

  1. Ingestion: The Producer Service generates synthetic transactions (JSON) and publishes to the transactions topic.

  2. Stream Processing (ksqlDB): ksqlDB consumes the raw stream and applies:

    • Immediate filters (e.g., High Amount)
    • Window Aggregations (Windowing) to detect bot behavior (Velocity)
    • Publishes only anomalies to the fraud_alerts topic
  3. Business Logic (Python): The Consumer Service listens to the alerts topic, enriches data if necessary, and decides the final action.

  4. Storage (PostgreSQL): Persistent storage for auditing and analytical dashboards.

๐Ÿš€ Tech Stack

  • Language: Python 3.10+ (Static Typing via pydantic)
  • Messaging: Apache Kafka 3.6 (KRaft Mode - No ZooKeeper)
  • Stream Processing: ksqlDB (Confluent)
  • Database: PostgreSQL 15
  • Infrastructure: Docker & Docker Compose
  • Main Libraries:
    • kafka-python: Low-latency Kafka client
    • faker: Realistic data generation
    • sqlalchemy: ORM for persistence

๐Ÿ” Detection Strategy (Hybrid)

The logic was divided to maximize performance and demonstrate proper use of each tool:

Fraud Type Business Rule Responsible Technology Why?
High Amount Transactions > R$ 3,000.00 ksqlDB Simple filtering (WHERE) is instantaneous in SQL
High Velocity > 3 transactions from same account in 1 minute ksqlDB Window Aggregation (WINDOW TUMBLING) is native and performant in ksqlDB, avoiding complex state management in Python
Blacklist / Context Specific business logic Python Allows external queries and complex conditional logic before saving

โš™๏ธ Configuration and Execution

Prerequisites

  • Docker 20.10+ & Docker Compose
  • Python 3.10+
  • Git

Step by Step

  1. Clone the repository:
git clone https://github.com/your-username/stream-guard-kafka.git
cd stream-guard-kafka
  1. Virtual Environment:
python3 -m venv venv
source venv/bin/activate  # Linux/Mac
# or .\venv\Scripts\activate  # Windows
  1. Install dependencies:
pip install -r requirements.txt
  1. Start Infrastructure & Background Pipeline:
# Starts Docker, configures ksqlDB, and runs Python consumers/producers in background.
# You will be prompted at the end to view live logs.
./start.sh
  1. Stop Infrastructure & Clean Data:
# Stops Python background processes and Docker containers.
# You will be prompted if you want to wipe PostgreSQL/Kafka data volumes.
./stop.sh
  1. Access Services:
    • Kafka-UI: http://localhost:8080 (Topic Monitoring)
    • PostgreSQL: localhost:5432 (User: streamguard / Pass: streamguard_2024)

๐Ÿ“ Project Structure

stream-guard-kafka/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ consumer/        # Python Consumer logic
โ”‚   โ”œโ”€โ”€ producer/        # Data generator script (Faker)
โ”‚   โ”œโ”€โ”€ models/          # Pydantic Schemas (Data Contract)
โ”‚   โ”œโ”€โ”€ database/        # Postgres connection
โ”‚   โ””โ”€โ”€ config/          # Centralized settings
โ”œโ”€โ”€ ksqldb/
โ”‚   โ””โ”€โ”€ queries.sql      # Stream and Table creation scripts
โ”œโ”€โ”€ docker/
โ”‚   โ””โ”€โ”€ init-db.sql      # Initial Postgres schema
โ”œโ”€โ”€ tests/               # Unit tests
โ”œโ”€โ”€ docs/                # Documentation
โ”œโ”€โ”€ docker-compose.yaml  # Infrastructure (Kafka, ksqlDB, Postgres)
โ”œโ”€โ”€ requirements.txt
โ””โ”€โ”€ README.md

๐Ÿงช Quick Start

1. Generate Fake Transactions

# Run the interactive generator (Menu for Batch, Stream, Velocity)
python generate_transactions.py

2. Monitor with Kafka-UI

Access http://localhost:8080 to view:

  • Topics and messages
  • Consumer groups
  • Throughput metrics

3. Query with ksqlDB

# Access ksqlDB CLI
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

# Inside CLI
SHOW TOPICS;
SELECT * FROM transactions EMIT CHANGES;

๐Ÿ“Š Monitoring and Useful Commands

Kafka Commands

# List topics
docker exec -it stream-guard-kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 --list

# Consume messages
docker exec -it stream-guard-kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic transactions \
  --from-beginning

PostgreSQL Queries

-- Total transactions
SELECT COUNT(*) FROM transactions;

-- Fraud rate
SELECT 
  COUNT(*) as total,
  SUM(CASE WHEN is_fraud THEN 1 ELSE 0 END) as frauds,
  ROUND(SUM(CASE WHEN is_fraud THEN 1 ELSE 0 END)::numeric / COUNT(*)::numeric * 100, 2) as fraud_rate
FROM transactions;

-- Suspicious accounts
SELECT * FROM account_risk_profile
WHERE fraud_rate > 50
ORDER BY fraud_rate DESC;

๐Ÿ”ฎ Roadmap

  • Docker Infrastructure (Kafka KRaft + Postgres)
  • Python Producer (Faker)
  • Fake Transaction Generator with realistic distributions
  • Kafka Producer with batch and streaming modes
  • ksqlDB Queries implementation (Streams & Tables)
  • Python Consumer for persistence
  • Dashboard in Streamlit/PowerBI
  • CI/CD with GitHub Actions
  • Migration to Serverless (Upstash + Cloud Run)

๐Ÿ“š Documentation

๐ŸŽ“ Learning Objectives

This project demonstrates:

  • โœ… Kappa Architecture for real-time processing
  • โœ… Hybrid Processing: Declarative (ksqlDB) + Imperative (Python)
  • โœ… Stream Processing with windowing and aggregations
  • โœ… Event-Driven Architecture with Kafka
  • โœ… Data Modeling with Pydantic schemas
  • โœ… Clean Code principles (SOLID, type hints)
  • โœ… Infrastructure as Code with Docker Compose

๐Ÿ“ License

MIT License

About

Real-time fraud detection system representing a Hybrid Pipeline (Kappa Architecture). Built with Python, Apache Kafka (KRaft), ksqlDB for temporal windowing anomalies, and PostgreSQL. Features an interactive synthetic banking data generator. ๐Ÿ›ก๏ธ๐Ÿ’ธ

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors