Real-Time Data Pipeline Architecture: Streaming Analytics at Scale
Design and implement high-throughput real-time data pipelines using Apache Kafka, Apache Flink, and modern data lake architecture for streaming analytics.
Real-Time Data Pipeline Architecture: Streaming Analytics at Scale
The Data Streaming Revolution
In today's digital economy, the ability to process and analyze data in real-time isn't just an advantageβit's a necessity. Whether it's fraud detection, personalized recommendations, or operational monitoring, businesses need to act on data as it flows through their systems.
This comprehensive guide will walk you through building a complete real-time data pipeline architecture that can handle millions of events per second, provide sub-second analytics, and scale horizontally to meet any demand.
Why Real-Time Data Processing Matters
πΌ Business Impact
- Instant Decision Making: React to events as they happen
- Enhanced User Experience: Real-time personalization and recommendations
- Fraud Prevention: Detect and stop fraudulent activity immediately
- Operational Excellence: Monitor and optimize systems in real-time
β‘ Technical Benefits
- Low Latency: Process events in milliseconds
- High Throughput: Handle millions of events per second
- Fault Tolerance: Built-in resilience and automatic recovery
- Scalability: Linear scaling with demand
Architecture Overview
Our real-time data pipeline architecture consists of multiple layers:
Complete Data Pipeline Implementation
Apache Kafka Setup and Configuration
# docker-compose.yml for Kafka Cluster
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
kafka-1:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-1
container_name: kafka-1
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
volumes:
- kafka-1-data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-2
container_name: kafka-2
depends_on:
- zookeeper
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29093,PLAINTEXT_HOST://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
volumes:
- kafka-2-data:/var/lib/kafka/data
kafka-3:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-3
container_name: kafka-3
depends_on:
- zookeeper
ports:
- "9094:9094"
- "9103:9103"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29094,PLAINTEXT_HOST://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9103
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
volumes:
- kafka-3-data:/var/lib/kafka/data
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-1:29092,kafka-2:29093,kafka-3:29094'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-connect:
image: confluentinc/cp-kafka-connect:7.4.0
hostname: connect
container_name: connect
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka-1:29092,kafka-2:29093,kafka-3:29094'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
volumes:
- ./connectors:/usr/share/confluent-hub-components
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.4.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "kafka-1:29092,kafka-2:29093,kafka-3:29094"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 3
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
volumes:
zookeeper-data:
zookeeper-logs:
kafka-1-data:
kafka-2-data:
kafka-3-data:
yamlData Producer Implementation
# producers/event_producer.py
import json
import time
import random
from datetime import datetime, timezone
from typing import Dict, Any, List
from confluent_kafka import Producer
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
import logging
from dataclasses import dataclass, asdict
from threading import Thread
import uuid
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class UserEvent:
"""User interaction event"""
event_id: str
user_id: str
session_id: str
event_type: str
timestamp: str
properties: Dict[str, Any]
user_agent: str
ip_address: str
location: Dict[str, str]
@dataclass
class TransactionEvent:
"""Financial transaction event"""
transaction_id: str
user_id: str
amount: float
currency: str
merchant_id: str
merchant_category: str
timestamp: str
status: str
payment_method: str
location: Dict[str, str]
risk_score: float
@dataclass
class SystemMetric:
"""System performance metric"""
metric_id: str
service_name: str
metric_name: str
value: float
unit: str
timestamp: str
tags: Dict[str, str]
host: str
class EventProducer:
def __init__(self, bootstrap_servers: str, schema_registry_url: str):
# Kafka producer configuration
self.producer_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'event-producer-{uuid.uuid4()}',
'acks': 'all', # Wait for all replicas
'retries': 10,
'retry.backoff.ms': 1000,
'batch.size': 16384,
'linger.ms': 5, # Small batching for low latency
'compression.type': 'snappy',
'max.in.flight.requests.per.connection': 5,
'enable.idempotence': True,
'request.timeout.ms': 30000,
}
# Avro producer for schema-enabled topics
self.avro_producer_config = {
**self.producer_config,
'schema.registry.url': schema_registry_url
}
# Initialize producers
self.producer = Producer(self.producer_config)
# Avro schemas
self.user_event_schema = """
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "session_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "properties", "type": {"type": "map", "values": "string"}},
{"name": "user_agent", "type": "string"},
{"name": "ip_address", "type": "string"},
{"name": "location", "type": {"type": "map", "values": "string"}}
]
}
"""
self.transaction_schema = """
{
"type": "record",
"name": "TransactionEvent",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "merchant_id", "type": "string"},
{"name": "merchant_category", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "status", "type": "string"},
{"name": "payment_method", "type": "string"},
{"name": "location", "type": {"type": "map", "values": "string"}},
{"name": "risk_score", "type": "double"}
]
}
"""
# Initialize Avro producer
self.avro_producer = AvroProducer(
self.avro_producer_config,
default_key_schema=None,
default_value_schema=None
)
self.running = False
def delivery_callback(self, err, msg):
"""Callback for delivery reports"""
if err:
logger.error(f'Message delivery failed: {err}')
else:
logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
def produce_user_event(self, event: UserEvent, topic: str = 'user-events'):
"""Produce user interaction events"""
try:
# Convert event to dict and serialize properties
event_dict = asdict(event)
event_dict['properties'] = {k: str(v) for k, v in event.properties.items()}
self.avro_producer.produce(
topic=topic,
key=event.user_id,
value=event_dict,
value_schema=self.user_event_schema,
callback=self.delivery_callback
)
except SerializerError as e:
logger.error(f"Serialization error: {e}")
except Exception as e:
logger.error(f"Error producing user event: {e}")
def produce_transaction_event(self, event: TransactionEvent, topic: str = 'transactions'):
"""Produce financial transaction events"""
try:
event_dict = asdict(event)
event_dict['location'] = {k: str(v) for k, v in event.location.items()}
self.avro_producer.produce(
topic=topic,
key=event.user_id,
value=event_dict,
value_schema=self.transaction_schema,
callback=self.delivery_callback
)
except SerializerError as e:
logger.error(f"Serialization error: {e}")
except Exception as e:
logger.error(f"Error producing transaction event: {e}")
def produce_system_metric(self, metric: SystemMetric, topic: str = 'system-metrics'):
"""Produce system performance metrics"""
try:
metric_dict = asdict(metric)
# Use JSON serialization for metrics (simpler schema)
self.producer.produce(
topic=topic,
key=metric.service_name,
value=json.dumps(metric_dict),
callback=self.delivery_callback
)
except Exception as e:
logger.error(f"Error producing system metric: {e}")
def generate_sample_user_events(self, count: int = 1000):
"""Generate sample user interaction events"""
event_types = ['page_view', 'click', 'purchase', 'login', 'logout', 'search']
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
locations = [
{'country': 'US', 'state': 'CA', 'city': 'San Francisco'},
{'country': 'US', 'state': 'NY', 'city': 'New York'},
{'country': 'UK', 'state': 'England', 'city': 'London'},
{'country': 'DE', 'state': 'Bavaria', 'city': 'Munich'}
]
for i in range(count):
event = UserEvent(
event_id=str(uuid.uuid4()),
user_id=f"user_{random.randint(1, 10000)}",
session_id=str(uuid.uuid4()),
event_type=random.choice(event_types),
timestamp=datetime.now(timezone.utc).isoformat(),
properties={
'page_url': f'/page/{random.randint(1, 100)}',
'referrer': 'https://google.com',
'product_id': str(random.randint(1000, 9999)),
'category': random.choice(['electronics', 'clothing', 'books', 'home'])
},
user_agent=random.choice(user_agents),
ip_address=f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
location=random.choice(locations)
)
self.produce_user_event(event)
# Add some delay to simulate realistic event timing
if i % 100 == 0:
time.sleep(0.1)
def generate_sample_transactions(self, count: int = 500):
"""Generate sample financial transactions"""
currencies = ['USD', 'EUR', 'GBP', 'JPY']
statuses = ['completed', 'pending', 'failed', 'cancelled']
payment_methods = ['credit_card', 'debit_card', 'paypal', 'bank_transfer']
merchant_categories = ['grocery', 'restaurant', 'gas_station', 'retail', 'online']
for i in range(count):
transaction = TransactionEvent(
transaction_id=str(uuid.uuid4()),
user_id=f"user_{random.randint(1, 10000)}",
amount=round(random.uniform(10.0, 1000.0), 2),
currency=random.choice(currencies),
merchant_id=f"merchant_{random.randint(1, 1000)}",
merchant_category=random.choice(merchant_categories),
timestamp=datetime.now(timezone.utc).isoformat(),
status=random.choice(statuses),
payment_method=random.choice(payment_methods),
location={
'country': random.choice(['US', 'UK', 'DE', 'FR']),
'city': random.choice(['New York', 'London', 'Berlin', 'Paris'])
},
risk_score=round(random.uniform(0.0, 1.0), 3)
)
self.produce_transaction_event(transaction)
if i % 50 == 0:
time.sleep(0.1)
def generate_sample_metrics(self, count: int = 200):
"""Generate sample system performance metrics"""
services = ['web-server', 'api-server', 'database', 'cache', 'search-engine']
metrics = ['cpu_usage', 'memory_usage', 'disk_usage', 'network_io', 'response_time']
hosts = ['host-001', 'host-002', 'host-003', 'host-004', 'host-005']
for i in range(count):
service = random.choice(services)
metric_name = random.choice(metrics)
# Generate realistic values based on metric type
if metric_name in ['cpu_usage', 'memory_usage', 'disk_usage']:
value = random.uniform(0, 100)
unit = 'percent'
elif metric_name == 'network_io':
value = random.uniform(0, 1000)
unit = 'mbps'
else: # response_time
value = random.uniform(50, 2000)
unit = 'milliseconds'
metric = SystemMetric(
metric_id=str(uuid.uuid4()),
service_name=service,
metric_name=metric_name,
value=round(value, 2),
unit=unit,
timestamp=datetime.now(timezone.utc).isoformat(),
tags={
'environment': 'production',
'region': random.choice(['us-west-2', 'eu-west-1', 'ap-southeast-1']),
'datacenter': random.choice(['dc1', 'dc2', 'dc3'])
},
host=random.choice(hosts)
)
self.produce_system_metric(metric)
if i % 25 == 0:
time.sleep(0.1)
def start_continuous_generation(self):
"""Start continuous event generation in background threads"""
self.running = True
def user_events_thread():
while self.running:
self.generate_sample_user_events(100)
time.sleep(5)
def transactions_thread():
while self.running:
self.generate_sample_transactions(50)
time.sleep(10)
def metrics_thread():
while self.running:
self.generate_sample_metrics(20)
time.sleep(2)
# Start background threads
Thread(target=user_events_thread, daemon=True).start()
Thread(target=transactions_thread, daemon=True).start()
Thread(target=metrics_thread, daemon=True).start()
logger.info("Started continuous event generation")
def stop(self):
"""Stop event generation and flush producers"""
self.running = False
self.producer.flush(timeout=30)
self.avro_producer.flush(timeout=30)
logger.info("Stopped event generation and flushed producers")
if __name__ == "__main__":
# Initialize producer
producer = EventProducer(
bootstrap_servers='localhost:9092,localhost:9093,localhost:9094',
schema_registry_url='http://localhost:8081'
)
try:
# Generate initial sample data
logger.info("Generating sample data...")
producer.generate_sample_user_events(1000)
producer.generate_sample_transactions(500)
producer.generate_sample_metrics(200)
# Start continuous generation
producer.start_continuous_generation()
# Keep running
while True:
time.sleep(60)
logger.info("Producer is running...")
except KeyboardInterrupt:
logger.info("Shutting down producer...")
producer.stop()
pythonApache Flink Stream Processing
# stream_processors/flink_processor.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import MapFunction, FilterFunction, WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.common.time import Time
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Any, List
import statistics
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventDeserializer(MapFunction):
"""Deserialize JSON events from Kafka"""
def map(self, value: str) -> Dict[str, Any]:
try:
return json.loads(value)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON: {e}")
return {}
class FraudDetectionFilter(FilterFunction):
"""Real-time fraud detection for transactions"""
def __init__(self):
self.fraud_patterns = {
'high_amount_threshold': 5000.0,
'high_risk_score_threshold': 0.8,
'suspicious_countries': ['XX', 'YY'], # Example country codes
'velocity_threshold': 3 # Max transactions per minute
}
def filter(self, transaction: Dict[str, Any]) -> bool:
"""Return True if transaction is potentially fraudulent"""
try:
amount = float(transaction.get('amount', 0))
risk_score = float(transaction.get('risk_score', 0))
country = transaction.get('location', {}).get('country', '')
# High amount check
if amount > self.fraud_patterns['high_amount_threshold']:
return True
# High risk score check
if risk_score > self.fraud_patterns['high_risk_score_threshold']:
return True
# Suspicious country check
if country in self.fraud_patterns['suspicious_countries']:
return True
return False
except (ValueError, KeyError) as e:
logger.error(f"Error in fraud detection: {e}")
return False
class TransactionAggregator(WindowFunction):
"""Aggregate transactions within time windows"""
def apply(self, key: str, window, inputs, collector):
"""Aggregate transaction data for the window"""
transactions = list(inputs)
if not transactions:
return
# Calculate aggregations
total_amount = sum(float(t.get('amount', 0)) for t in transactions)
avg_amount = total_amount / len(transactions)
transaction_count = len(transactions)
unique_users = len(set(t.get('user_id') for t in transactions))
# Calculate risk statistics
risk_scores = [float(t.get('risk_score', 0)) for t in transactions]
avg_risk_score = statistics.mean(risk_scores) if risk_scores else 0
max_risk_score = max(risk_scores) if risk_scores else 0
# Group by merchant category
category_stats = {}
for transaction in transactions:
category = transaction.get('merchant_category', 'unknown')
if category not in category_stats:
category_stats[category] = {'count': 0, 'total_amount': 0}
category_stats[category]['count'] += 1
category_stats[category]['total_amount'] += float(transaction.get('amount', 0))
# Create aggregated result
result = {
'window_start': window.start,
'window_end': window.end,
'total_amount': round(total_amount, 2),
'avg_amount': round(avg_amount, 2),
'transaction_count': transaction_count,
'unique_users': unique_users,
'avg_risk_score': round(avg_risk_score, 3),
'max_risk_score': round(max_risk_score, 3),
'category_breakdown': category_stats,
'timestamp': datetime.now(timezone.utc).isoformat()
}
collector.collect(json.dumps(result))
class UserActivityAggregator(WindowFunction):
"""Aggregate user activity within time windows"""
def apply(self, key: str, window, inputs, collector):
"""Aggregate user activity for the window"""
events = list(inputs)
if not events:
return
# Calculate event type distribution
event_type_counts = {}
for event in events:
event_type = event.get('event_type', 'unknown')
event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1
# Calculate unique sessions and users
unique_sessions = len(set(e.get('session_id') for e in events))
unique_users = len(set(e.get('user_id') for e in events))
# Location analysis
locations = [e.get('location', {}) for e in events]
countries = [l.get('country') for l in locations if l.get('country')]
country_counts = {}
for country in countries:
country_counts[country] = country_counts.get(country, 0) + 1
result = {
'window_start': window.start,
'window_end': window.end,
'total_events': len(events),
'unique_sessions': unique_sessions,
'unique_users': unique_users,
'event_type_distribution': event_type_counts,
'top_countries': dict(sorted(country_counts.items(), key=lambda x: x[1], reverse=True)[:5]),
'timestamp': datetime.now(timezone.utc).isoformat()
}
collector.collect(json.dumps(result))
class SystemMetricsProcessor:
"""Process system metrics for real-time monitoring"""
def __init__(self, env: StreamExecutionEnvironment):
self.env = env
def setup_metric_processing(self):
"""Setup metric processing pipeline"""
# Kafka consumer for system metrics
kafka_consumer = FlinkKafkaConsumer(
topics='system-metrics',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'metrics-processor'
}
)
# Create data stream
metrics_stream = self.env.add_source(kafka_consumer)
# Parse JSON metrics
parsed_metrics = metrics_stream.map(EventDeserializer(), output_type=Types.PICKLED_BYTE_ARRAY())
# Filter for high-severity metrics
critical_metrics = parsed_metrics.filter(self.is_critical_metric)
# Aggregate metrics by service
service_aggregates = (parsed_metrics
.key_by(lambda x: x.get('service_name', 'unknown'))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.apply(MetricAggregator()))
# Send alerts for critical metrics
critical_alerts = critical_metrics.map(self.create_alert, output_type=Types.STRING())
# Send aggregated metrics to output topics
self.send_to_kafka(service_aggregates, 'service-metrics-aggregated')
self.send_to_kafka(critical_alerts, 'metric-alerts')
def is_critical_metric(self, metric: Dict[str, Any]) -> bool:
"""Check if metric indicates a critical condition"""
metric_name = metric.get('metric_name', '')
value = float(metric.get('value', 0))
# Define thresholds for different metrics
thresholds = {
'cpu_usage': 90.0,
'memory_usage': 85.0,
'disk_usage': 90.0,
'response_time': 5000.0 # milliseconds
}
threshold = thresholds.get(metric_name)
return threshold and value > threshold
def create_alert(self, metric: Dict[str, Any]) -> str:
"""Create alert message for critical metrics"""
alert = {
'alert_type': 'metric_threshold_exceeded',
'service': metric.get('service_name'),
'metric': metric.get('metric_name'),
'value': metric.get('value'),
'threshold_exceeded': True,
'severity': 'critical',
'timestamp': datetime.now(timezone.utc).isoformat(),
'host': metric.get('host'),
'tags': metric.get('tags', {})
}
return json.dumps(alert)
def send_to_kafka(self, stream, topic: str):
"""Send processed data to Kafka topic"""
kafka_producer = FlinkKafkaProducer(
topic=topic,
serialization_schema=SimpleStringSchema(),
producer_config={
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'acks': 'all',
'retries': 3
}
)
stream.add_sink(kafka_producer)
class MetricAggregator(WindowFunction):
"""Aggregate system metrics within time windows"""
def apply(self, key: str, window, inputs, collector):
"""Aggregate metrics for a service within the window"""
metrics = list(inputs)
if not metrics:
return
service_name = key
metric_summaries = {}
# Group metrics by metric name
for metric in metrics:
metric_name = metric.get('metric_name', 'unknown')
value = float(metric.get('value', 0))
if metric_name not in metric_summaries:
metric_summaries[metric_name] = {
'values': [],
'unit': metric.get('unit', ''),
'count': 0
}
metric_summaries[metric_name]['values'].append(value)
metric_summaries[metric_name]['count'] += 1
# Calculate statistics for each metric
aggregated_metrics = {}
for metric_name, summary in metric_summaries.items():
values = summary['values']
aggregated_metrics[metric_name] = {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': statistics.mean(values),
'median': statistics.median(values),
'std_dev': statistics.stdev(values) if len(values) > 1 else 0,
'unit': summary['unit']
}
result = {
'service_name': service_name,
'window_start': window.start,
'window_end': window.end,
'metrics': aggregated_metrics,
'timestamp': datetime.now(timezone.utc).isoformat()
}
collector.collect(json.dumps(result))
class RealTimeDataProcessor:
"""Main class for real-time data processing with Apache Flink"""
def __init__(self):
# Create Flink execution environment
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(4) # Adjust based on your cluster
# Enable checkpointing for fault tolerance
self.env.enable_checkpointing(60000) # Checkpoint every minute
def setup_transaction_processing(self):
"""Setup real-time transaction processing pipeline"""
logger.info("Setting up transaction processing pipeline...")
# Kafka consumer for transactions
kafka_consumer = FlinkKafkaConsumer(
topics='transactions',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'transaction-processor',
'auto.offset.reset': 'latest'
}
)
# Create transaction stream
transaction_stream = self.env.add_source(kafka_consumer)
# Parse JSON transactions
parsed_transactions = transaction_stream.map(EventDeserializer(), output_type=Types.PICKLED_BYTE_ARRAY())
# Fraud detection
fraud_alerts = (parsed_transactions
.filter(FraudDetectionFilter())
.map(lambda t: json.dumps({
'alert_type': 'potential_fraud',
'transaction_id': t.get('transaction_id'),
'user_id': t.get('user_id'),
'amount': t.get('amount'),
'risk_score': t.get('risk_score'),
'timestamp': datetime.now(timezone.utc).isoformat(),
'severity': 'high'
}), output_type=Types.STRING()))
# Transaction aggregations (5-minute windows)
transaction_aggregates = (parsed_transactions
.key_by(lambda x: 'global') # Global aggregation
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.apply(TransactionAggregator()))
# Send results to output topics
self.send_to_kafka(fraud_alerts, 'fraud-alerts')
self.send_to_kafka(transaction_aggregates, 'transaction-analytics')
def setup_user_activity_processing(self):
"""Setup real-time user activity processing pipeline"""
logger.info("Setting up user activity processing pipeline...")
# Kafka consumer for user events
kafka_consumer = FlinkKafkaConsumer(
topics='user-events',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'user-activity-processor',
'auto.offset.reset': 'latest'
}
)
# Create user activity stream
activity_stream = self.env.add_source(kafka_consumer)
# Parse JSON events
parsed_events = activity_stream.map(EventDeserializer(), output_type=Types.PICKLED_BYTE_ARRAY())
# User session analysis (1-minute windows)
session_analytics = (parsed_events
.key_by(lambda x: 'global')
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.apply(UserActivityAggregator()))
# Real-time user segmentation
user_segments = (parsed_events
.key_by(lambda x: x.get('user_id', 'unknown'))
.map(self.calculate_user_segment, output_type=Types.STRING()))
# Send results to output topics
self.send_to_kafka(session_analytics, 'user-analytics')
self.send_to_kafka(user_segments, 'user-segments')
def calculate_user_segment(self, event: Dict[str, Any]) -> str:
"""Calculate user segment based on activity patterns"""
user_id = event.get('user_id', 'unknown')
event_type = event.get('event_type', '')
# Simple segmentation logic (in production, use ML models)
segment = 'regular'
if event_type == 'purchase':
segment = 'buyer'
elif event_type in ['search', 'click']:
segment = 'browser'
elif event_type in ['login', 'logout']:
segment = 'authenticated'
result = {
'user_id': user_id,
'segment': segment,
'event_type': event_type,
'timestamp': datetime.now(timezone.utc).isoformat()
}
return json.dumps(result)
def send_to_kafka(self, stream, topic: str):
"""Send processed data to Kafka topic"""
kafka_producer = FlinkKafkaProducer(
topic=topic,
serialization_schema=SimpleStringSchema(),
producer_config={
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'acks': 'all',
'retries': 3,
'batch.size': 16384,
'linger.ms': 5
}
)
stream.add_sink(kafka_producer)
def run_processing_pipeline(self):
"""Run the complete data processing pipeline"""
logger.info("Starting real-time data processing pipeline...")
# Setup all processing pipelines
self.setup_transaction_processing()
self.setup_user_activity_processing()
# Setup system metrics processing
metrics_processor = SystemMetricsProcessor(self.env)
metrics_processor.setup_metric_processing()
# Execute the job
try:
self.env.execute("Real-Time Data Processing Pipeline")
except Exception as e:
logger.error(f"Pipeline execution failed: {e}")
raise
if __name__ == "__main__":
# Initialize and run the processor
processor = RealTimeDataProcessor()
processor.run_processing_pipeline()
pythonData Lake Storage with Apache Iceberg
# storage/iceberg_writer.py
import json
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField, StringType, DoubleType, TimestampType,
IntegerType, BooleanType, MapType
)
import pandas as pd
from datetime import datetime, timezone
import logging
from typing import Dict, Any, List
import boto3
from confluent_kafka import Consumer
import threading
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IcebergDataLakeWriter:
"""Write streaming data to Apache Iceberg tables in data lake"""
def __init__(self, catalog_name: str = 'glue_catalog'):
# Initialize Iceberg catalog (AWS Glue in this example)
self.catalog = load_catalog(
name=catalog_name,
**{
'type': 'glue',
'warehouse': 's3://your-data-lake-bucket/warehouse/',
'io-impl': 'pyiceberg.io.pyarrow.PyArrowFileIO'
}
)
self.consumer_config = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'iceberg-writer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000,
'fetch.min.bytes': 1024,
'fetch.max.wait.ms': 500
}
# Buffer for batch writing
self.batch_size = 1000
self.flush_interval = 60 # seconds
# Data buffers
self.user_events_buffer = []
self.transactions_buffer = []
self.metrics_buffer = []
# Schemas for different data types
self.setup_iceberg_schemas()
def setup_iceberg_schemas(self):
"""Define Iceberg table schemas"""
# User events schema
self.user_events_schema = Schema(
NestedField(1, "event_id", StringType(), required=True),
NestedField(2, "user_id", StringType(), required=True),
NestedField(3, "session_id", StringType(), required=True),
NestedField(4, "event_type", StringType(), required=True),
NestedField(5, "timestamp", TimestampType(), required=True),
NestedField(6, "properties", MapType(7, StringType(), 8, StringType(), value_required=False), required=False),
NestedField(9, "user_agent", StringType(), required=False),
NestedField(10, "ip_address", StringType(), required=False),
NestedField(11, "country", StringType(), required=False),
NestedField(12, "city", StringType(), required=False),
NestedField(13, "year", IntegerType(), required=True),
NestedField(14, "month", IntegerType(), required=True),
NestedField(15, "day", IntegerType(), required=True),
NestedField(16, "hour", IntegerType(), required=True)
)
# Transactions schema
self.transactions_schema = Schema(
NestedField(1, "transaction_id", StringType(), required=True),
NestedField(2, "user_id", StringType(), required=True),
NestedField(3, "amount", DoubleType(), required=True),
NestedField(4, "currency", StringType(), required=True),
NestedField(5, "merchant_id", StringType(), required=True),
NestedField(6, "merchant_category", StringType(), required=True),
NestedField(7, "timestamp", TimestampType(), required=True),
NestedField(8, "status", StringType(), required=True),
NestedField(9, "payment_method", StringType(), required=True),
NestedField(10, "country", StringType(), required=False),
NestedField(11, "city", StringType(), required=False),
NestedField(12, "risk_score", DoubleType(), required=True),
NestedField(13, "year", IntegerType(), required=True),
NestedField(14, "month", IntegerType(), required=True),
NestedField(15, "day", IntegerType(), required=True),
NestedField(16, "hour", IntegerType(), required=True)
)
# System metrics schema
self.metrics_schema = Schema(
NestedField(1, "metric_id", StringType(), required=True),
NestedField(2, "service_name", StringType(), required=True),
NestedField(3, "metric_name", StringType(), required=True),
NestedField(4, "value", DoubleType(), required=True),
NestedField(5, "unit", StringType(), required=True),
NestedField(6, "timestamp", TimestampType(), required=True),
NestedField(7, "host", StringType(), required=True),
NestedField(8, "environment", StringType(), required=False),
NestedField(9, "region", StringType(), required=False),
NestedField(10, "year", IntegerType(), required=True),
NestedField(11, "month", IntegerType(), required=True),
NestedField(12, "day", IntegerType(), required=True),
NestedField(13, "hour", IntegerType(), required=True)
)
def create_tables_if_not_exists(self):
"""Create Iceberg tables if they don't exist"""
namespace = "data_lake"
# Create namespace if it doesn't exist
try:
self.catalog.create_namespace(namespace)
except Exception as e:
logger.info(f"Namespace {namespace} might already exist: {e}")
# Create tables
tables = [
("user_events", self.user_events_schema),
("transactions", self.transactions_schema),
("system_metrics", self.metrics_schema)
]
for table_name, schema in tables:
table_identifier = f"{namespace}.{table_name}"
try:
self.catalog.create_table(
identifier=table_identifier,
schema=schema,
partition_spec={
"year": "identity",
"month": "identity",
"day": "identity"
}
)
logger.info(f"Created table: {table_identifier}")
except Exception as e:
logger.info(f"Table {table_identifier} might already exist: {e}")
def process_user_event(self, event_data: Dict[str, Any]):
"""Process and buffer user event for batch writing"""
try:
timestamp = datetime.fromisoformat(event_data['timestamp'].replace('Z', '+00:00'))
location = event_data.get('location', {})
processed_event = {
'event_id': event_data['event_id'],
'user_id': event_data['user_id'],
'session_id': event_data['session_id'],
'event_type': event_data['event_type'],
'timestamp': timestamp,
'properties': event_data.get('properties', {}),
'user_agent': event_data.get('user_agent', ''),
'ip_address': event_data.get('ip_address', ''),
'country': location.get('country', ''),
'city': location.get('city', ''),
'year': timestamp.year,
'month': timestamp.month,
'day': timestamp.day,
'hour': timestamp.hour
}
self.user_events_buffer.append(processed_event)
if len(self.user_events_buffer) >= self.batch_size:
self.flush_user_events()
except Exception as e:
logger.error(f"Error processing user event: {e}")
def process_transaction(self, transaction_data: Dict[str, Any]):
"""Process and buffer transaction for batch writing"""
try:
timestamp = datetime.fromisoformat(transaction_data['timestamp'].replace('Z', '+00:00'))
location = transaction_data.get('location', {})
processed_transaction = {
'transaction_id': transaction_data['transaction_id'],
'user_id': transaction_data['user_id'],
'amount': float(transaction_data['amount']),
'currency': transaction_data['currency'],
'merchant_id': transaction_data['merchant_id'],
'merchant_category': transaction_data['merchant_category'],
'timestamp': timestamp,
'status': transaction_data['status'],
'payment_method': transaction_data['payment_method'],
'country': location.get('country', ''),
'city': location.get('city', ''),
'risk_score': float(transaction_data['risk_score']),
'year': timestamp.year,
'month': timestamp.month,
'day': timestamp.day,
'hour': timestamp.hour
}
self.transactions_buffer.append(processed_transaction)
if len(self.transactions_buffer) >= self.batch_size:
self.flush_transactions()
except Exception as e:
logger.error(f"Error processing transaction: {e}")
def process_metric(self, metric_data: Dict[str, Any]):
"""Process and buffer system metric for batch writing"""
try:
timestamp = datetime.fromisoformat(metric_data['timestamp'].replace('Z', '+00:00'))
tags = metric_data.get('tags', {})
processed_metric = {
'metric_id': metric_data['metric_id'],
'service_name': metric_data['service_name'],
'metric_name': metric_data['metric_name'],
'value': float(metric_data['value']),
'unit': metric_data['unit'],
'timestamp': timestamp,
'host': metric_data['host'],
'environment': tags.get('environment', ''),
'region': tags.get('region', ''),
'year': timestamp.year,
'month': timestamp.month,
'day': timestamp.day,
'hour': timestamp.hour
}
self.metrics_buffer.append(processed_metric)
if len(self.metrics_buffer) >= self.batch_size:
self.flush_metrics()
except Exception as e:
logger.error(f"Error processing metric: {e}")
def flush_user_events(self):
"""Write buffered user events to Iceberg table"""
if not self.user_events_buffer:
return
try:
table = self.catalog.load_table("data_lake.user_events")
# Convert to PyArrow table
df = pd.DataFrame(self.user_events_buffer)
arrow_table = pa.Table.from_pandas(df)
# Append to Iceberg table
table.append(arrow_table)
logger.info(f"Flushed {len(self.user_events_buffer)} user events to Iceberg")
self.user_events_buffer.clear()
except Exception as e:
logger.error(f"Error flushing user events: {e}")
def flush_transactions(self):
"""Write buffered transactions to Iceberg table"""
if not self.transactions_buffer:
return
try:
table = self.catalog.load_table("data_lake.transactions")
df = pd.DataFrame(self.transactions_buffer)
arrow_table = pa.Table.from_pandas(df)
table.append(arrow_table)
logger.info(f"Flushed {len(self.transactions_buffer)} transactions to Iceberg")
self.transactions_buffer.clear()
except Exception as e:
logger.error(f"Error flushing transactions: {e}")
def flush_metrics(self):
"""Write buffered metrics to Iceberg table"""
if not self.metrics_buffer:
return
try:
table = self.catalog.load_table("data_lake.system_metrics")
df = pd.DataFrame(self.metrics_buffer)
arrow_table = pa.Table.from_pandas(df)
table.append(arrow_table)
logger.info(f"Flushed {len(self.metrics_buffer)} metrics to Iceberg")
self.metrics_buffer.clear()
except Exception as e:
logger.error(f"Error flushing metrics: {e}")
def flush_all_buffers(self):
"""Flush all data buffers"""
self.flush_user_events()
self.flush_transactions()
self.flush_metrics()
def start_kafka_consumers(self):
"""Start Kafka consumers for different topics"""
def consume_topic(topic: str, process_func):
"""Generic topic consumer"""
consumer = Consumer({
**self.consumer_config,
'group.id': f'iceberg-writer-{topic}'
})
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
logger.error(f"Consumer error: {msg.error()}")
continue
try:
data = json.loads(msg.value().decode('utf-8'))
process_func(data)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode message: {e}")
except KeyboardInterrupt:
logger.info(f"Stopping consumer for {topic}")
finally:
consumer.close()
# Start consumer threads
topics_and_processors = [
('user-events', self.process_user_event),
('transactions', self.process_transaction),
('system-metrics', self.process_metric)
]
for topic, process_func in topics_and_processors:
thread = threading.Thread(
target=consume_topic,
args=(topic, process_func),
daemon=True
)
thread.start()
logger.info(f"Started consumer for {topic}")
def start_flush_scheduler(self):
"""Start periodic buffer flushing"""
def flush_periodically():
while True:
time.sleep(self.flush_interval)
self.flush_all_buffers()
flush_thread = threading.Thread(target=flush_periodically, daemon=True)
flush_thread.start()
logger.info(f"Started periodic flushing every {self.flush_interval} seconds")
def run(self):
"""Run the Iceberg data lake writer"""
logger.info("Starting Iceberg data lake writer...")
# Create tables if they don't exist
self.create_tables_if_not_exists()
# Start Kafka consumers
self.start_kafka_consumers()
# Start flush scheduler
self.start_flush_scheduler()
# Keep the main thread alive
try:
while True:
time.sleep(60)
logger.info("Data lake writer is running...")
except KeyboardInterrupt:
logger.info("Shutting down data lake writer...")
self.flush_all_buffers()
if __name__ == "__main__":
writer = IcebergDataLakeWriter()
writer.run()
pythonReal-Time Analytics Dashboard
# analytics/real_time_dashboard.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
import redis
from confluent_kafka import Consumer
import threading
import time
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RealTimeDashboard:
"""Real-time analytics dashboard using Streamlit"""
def __init__(self):
# Redis for caching dashboard data
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Kafka consumer configuration
self.consumer_config = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'dashboard-consumer',
'auto.offset.reset': 'latest',
'enable.auto.commit': True
}
# Initialize session state
if 'data_initialized' not in st.session_state:
st.session_state.data_initialized = False
self.setup_data_consumers()
def setup_data_consumers(self):
"""Setup Kafka consumers for real-time data"""
if st.session_state.data_initialized:
return
def consume_analytics_data():
"""Consume processed analytics data"""
consumer = Consumer({
**self.consumer_config,
'group.id': 'dashboard-analytics'
})
topics = [
'transaction-analytics',
'user-analytics',
'service-metrics-aggregated',
'fraud-alerts'
]
consumer.subscribe(topics)
while True:
try:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
logger.error(f"Consumer error: {msg.error()}")
continue
topic = msg.topic()
data = json.loads(msg.value().decode('utf-8'))
# Store in Redis with expiration
self.redis_client.setex(
f"dashboard:{topic}:latest",
300, # 5 minutes TTL
json.dumps(data)
)
# Keep historical data (last hour)
timestamp = int(time.time())
self.redis_client.zadd(
f"dashboard:{topic}:history",
{json.dumps(data): timestamp}
)
# Remove old data (older than 1 hour)
cutoff = timestamp - 3600
self.redis_client.zremrangebyscore(
f"dashboard:{topic}:history",
0, cutoff
)
except Exception as e:
logger.error(f"Error in analytics consumer: {e}")
time.sleep(5)
# Start consumer in background thread
if not st.session_state.data_initialized:
consumer_thread = threading.Thread(
target=consume_analytics_data,
daemon=True
)
consumer_thread.start()
st.session_state.data_initialized = True
def get_latest_data(self, data_type: str) -> dict:
"""Get latest data for dashboard"""
try:
data = self.redis_client.get(f"dashboard:{data_type}:latest")
if data:
return json.loads(data)
return {}
except Exception as e:
logger.error(f"Error getting latest data for {data_type}: {e}")
return {}
def get_historical_data(self, data_type: str, hours: int = 1) -> List[dict]:
"""Get historical data for charts"""
try:
cutoff = int(time.time()) - (hours * 3600)
data_list = self.redis_client.zrangebyscore(
f"dashboard:{data_type}:history",
cutoff, '+inf'
)
return [json.loads(data) for data in data_list]
except Exception as e:
logger.error(f"Error getting historical data for {data_type}: {e}")
return []
def render_transaction_analytics(self):
"""Render transaction analytics section"""
st.subheader("π³ Transaction Analytics")
# Get latest transaction data
latest_data = self.get_latest_data('transaction-analytics')
if latest_data:
# KPI metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Total Volume",
f"${latest_data.get('total_amount', 0):,.2f}",
delta=None
)
with col2:
st.metric(
"Transaction Count",
f"{latest_data.get('transaction_count', 0):,}",
delta=None
)
with col3:
st.metric(
"Average Amount",
f"${latest_data.get('avg_amount', 0):,.2f}",
delta=None
)
with col4:
st.metric(
"Unique Users",
f"{latest_data.get('unique_users', 0):,}",
delta=None
)
# Category breakdown
if 'category_breakdown' in latest_data:
st.subheader("Transaction Categories")
categories = latest_data['category_breakdown']
category_df = pd.DataFrame([
{'Category': cat, 'Count': data['count'], 'Amount': data['total_amount']}
for cat, data in categories.items()
])
col1, col2 = st.columns(2)
with col1:
fig = px.pie(
category_df,
values='Count',
names='Category',
title='Transactions by Category (Count)'
)
st.plotly_chart(fig)
with col2:
fig = px.pie(
category_df,
values='Amount',
names='Category',
title='Transaction Volume by Category'
)
st.plotly_chart(fig)
# Historical trend
historical_data = self.get_historical_data('transaction-analytics', hours=2)
if historical_data:
st.subheader("Transaction Trends")
trend_df = pd.DataFrame([
{
'timestamp': pd.to_datetime(data['timestamp']),
'total_amount': data.get('total_amount', 0),
'transaction_count': data.get('transaction_count', 0),
'avg_risk_score': data.get('avg_risk_score', 0)
}
for data in historical_data
])
fig = make_subplots(
rows=2, cols=2,
subplot_titles=[
'Transaction Volume', 'Transaction Count',
'Average Risk Score', 'Unique Users'
]
)
fig.add_trace(
go.Scatter(
x=trend_df['timestamp'],
y=trend_df['total_amount'],
name='Volume'
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=trend_df['timestamp'],
y=trend_df['transaction_count'],
name='Count'
),
row=1, col=2
)
fig.add_trace(
go.Scatter(
x=trend_df['timestamp'],
y=trend_df['avg_risk_score'],
name='Risk Score'
),
row=2, col=1
)
fig.update_layout(height=600, showlegend=False)
st.plotly_chart(fig)
def render_user_analytics(self):
"""Render user activity analytics section"""
st.subheader("π₯ User Activity Analytics")
latest_data = self.get_latest_data('user-analytics')
if latest_data:
# User activity metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Total Events",
f"{latest_data.get('total_events', 0):,}",
delta=None
)
with col2:
st.metric(
"Unique Users",
f"{latest_data.get('unique_users', 0):,}",
delta=None
)
with col3:
st.metric(
"Active Sessions",
f"{latest_data.get('unique_sessions', 0):,}",
delta=None
)
with col4:
avg_events_per_user = (
latest_data.get('total_events', 0) /
max(latest_data.get('unique_users', 1), 1)
)
st.metric(
"Events per User",
f"{avg_events_per_user:.1f}",
delta=None
)
# Event type distribution
if 'event_type_distribution' in latest_data:
st.subheader("Event Types")
event_types = latest_data['event_type_distribution']
event_df = pd.DataFrame([
{'Event Type': event, 'Count': count}
for event, count in event_types.items()
])
fig = px.bar(
event_df,
x='Event Type',
y='Count',
title='Event Distribution'
)
st.plotly_chart(fig)
# Top countries
if 'top_countries' in latest_data:
st.subheader("Top Countries")
countries = latest_data['top_countries']
country_df = pd.DataFrame([
{'Country': country, 'Users': count}
for country, count in countries.items()
])
fig = px.bar(
country_df,
x='Country',
y='Users',
title='Users by Country'
)
st.plotly_chart(fig)
def render_system_metrics(self):
"""Render system performance metrics"""
st.subheader("π₯οΈ System Performance")
latest_data = self.get_latest_data('service-metrics-aggregated')
if latest_data and 'metrics' in latest_data:
service_name = latest_data.get('service_name', 'Unknown Service')
st.write(f"**Service:** {service_name}")
metrics = latest_data['metrics']
# Create metrics grid
metric_names = list(metrics.keys())
cols = st.columns(min(len(metric_names), 4))
for i, metric_name in enumerate(metric_names):
metric_data = metrics[metric_name]
col_idx = i % len(cols)
with cols[col_idx]:
st.metric(
f"{metric_name.replace('_', ' ').title()}",
f"{metric_data.get('avg', 0):.2f} {metric_data.get('unit', '')}",
delta=f"Max: {metric_data.get('max', 0):.2f}"
)
# Detailed metrics chart
if len(metric_names) > 0:
selected_metric = st.selectbox(
"Select metric for detailed view:",
metric_names
)
if selected_metric:
metric_info = metrics[selected_metric]
col1, col2 = st.columns(2)
with col1:
st.json({
"Average": metric_info.get('avg', 0),
"Minimum": metric_info.get('min', 0),
"Maximum": metric_info.get('max', 0),
"Median": metric_info.get('median', 0),
"Std Dev": metric_info.get('std_dev', 0),
"Count": metric_info.get('count', 0)
})
def render_fraud_alerts(self):
"""Render fraud detection alerts"""
st.subheader("π¨ Fraud Alerts")
# Get recent alerts
try:
alerts_data = self.redis_client.zrevrange(
"dashboard:fraud-alerts:history",
0, 9, # Last 10 alerts
withscores=True
)
if alerts_data:
alerts = []
for alert_data, timestamp in alerts_data:
alert = json.loads(alert_data)
alert['timestamp'] = datetime.fromtimestamp(timestamp)
alerts.append(alert)
# Display alerts
for alert in alerts:
with st.expander(
f"β οΈ {alert.get('alert_type', 'Unknown')} - "
f"{alert['timestamp'].strftime('%H:%M:%S')}",
expanded=len(alerts) <= 3
):
col1, col2 = st.columns(2)
with col1:
st.write(f"**Transaction ID:** {alert.get('transaction_id', 'N/A')}")
st.write(f"**User ID:** {alert.get('user_id', 'N/A')}")
st.write(f"**Amount:** ${alert.get('amount', 0):,.2f}")
with col2:
st.write(f"**Risk Score:** {alert.get('risk_score', 0):.3f}")
st.write(f"**Severity:** {alert.get('severity', 'Unknown')}")
st.write(f"**Time:** {alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
else:
st.info("No recent fraud alerts")
except Exception as e:
st.error(f"Error loading fraud alerts: {e}")
def render_dashboard(self):
"""Main dashboard rendering function"""
st.set_page_config(
page_title="Real-Time Data Analytics Dashboard",
page_icon="π",
layout="wide"
)
st.title("π Real-Time Data Analytics Dashboard")
st.markdown("---")
# Auto-refresh
if st.button("π Refresh Data"):
st.experimental_rerun()
# Add auto-refresh every 30 seconds
st.markdown(
"""
<script>
setTimeout(function(){
window.location.reload(1);
}, 30000);
</script>
""",
unsafe_allow_html=True
)
# Render different sections
try:
self.render_transaction_analytics()
st.markdown("---")
self.render_user_analytics()
st.markdown("---")
self.render_system_metrics()
st.markdown("---")
self.render_fraud_alerts()
except Exception as e:
st.error(f"Error rendering dashboard: {e}")
logger.error(f"Dashboard rendering error: {e}")
# Footer
st.markdown("---")
st.markdown(
"**Real-Time Data Pipeline Dashboard** | "
f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
if __name__ == "__main__":
dashboard = RealTimeDashboard()
dashboard.render_dashboard()
pythonDeployment Configuration
# kubernetes/data-pipeline.yaml
apiVersion: v1
kind: Namespace
metadata:
name: data-pipeline
---
# Kafka Cluster
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: data-cluster
namespace: data-pipeline
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.6"
storage:
type: persistent-claim
size: 500Gi
class: fast-ssd
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
class: fast-ssd
entityOperator:
topicOperator: {}
userOperator: {}
---
# Schema Registry
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
namespace: data-pipeline
spec:
replicas: 2
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: schema-registry
image: confluentinc/cp-schema-registry:7.4.0
env:
- name: SCHEMA_REGISTRY_HOST_NAME
value: "schema-registry"
- name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
value: "data-cluster-kafka-bootstrap:9092"
- name: SCHEMA_REGISTRY_LISTENERS
value: "http://0.0.0.0:8081"
ports:
- containerPort: 8081
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
---
# Flink Job Manager
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: data-pipeline
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: apache/flink:1.18-java11
args: ["jobmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
jobmanager.memory.process.size: 2g
taskmanager.memory.process.size: 4g
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
---
# Flink Task Manager
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: data-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: apache/flink:1.18-java11
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 4g
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
resources:
requests:
memory: "4Gi"
cpu: "2000m"
limits:
memory: "8Gi"
cpu: "4000m"
---
# Redis for Dashboard
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: data-pipeline
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "200m"
volumeMounts:
- name: redis-data
mountPath: /data
volumes:
- name: redis-data
persistentVolumeClaim:
claimName: redis-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: redis-pvc
namespace: data-pipeline
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: fast-ssd
---
# Services
apiVersion: v1
kind: Service
metadata:
name: schema-registry-service
namespace: data-pipeline
spec:
selector:
app: schema-registry
ports:
- port: 8081
targetPort: 8081
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: data-pipeline
spec:
selector:
app: flink
component: jobmanager
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: data-pipeline
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
type: ClusterIP
yamlConclusion
This comprehensive real-time data pipeline architecture provides enterprise-grade streaming analytics capabilities:
β Key Achievements
- High-Throughput Ingestion: Apache Kafka handles millions of events per second
- Real-Time Processing: Apache Flink provides sub-second stream processing
- Scalable Storage: Apache Iceberg enables petabyte-scale data lake storage
- Live Analytics: Real-time dashboard with interactive visualizations
- Fault Tolerance: Built-in resilience and automatic recovery mechanisms
- Schema Evolution: Avro schema registry for backward compatibility
π Business Benefits
- Instant Insights: React to business events as they happen
- Fraud Prevention: Real-time fraud detection and alerting
- Operational Excellence: Live monitoring of system performance
- Cost Efficiency: Pay only for resources you use with cloud-native architecture
- Scalability: Linear scaling to handle any data volume
π Next Steps
- Machine Learning Integration: Add real-time ML model serving for predictions
- Advanced Analytics: Implement complex event processing and pattern detection
- Data Governance: Add comprehensive data lineage and quality monitoring
- Multi-Region Deployment: Scale across multiple geographic regions
- Cost Optimization: Implement intelligent data tiering and lifecycle management
This real-time data pipeline architecture transforms raw streaming data into actionable business insights, enabling data-driven decision making at the speed of business.
Ready to build your real-time data platform? Contact our data engineering team for a comprehensive data strategy consultation and implementation roadmap.