⭐ Featured Article

Real-Time Data Pipeline Architecture: Streaming Analytics at Scale

Design and implement high-throughput real-time data pipelines using Apache Kafka, Apache Flink, and modern data lake architecture for streaming analytics.

πŸ“… November 5, 2024⏱️ 25 min read
#Data Engineering#Apache Kafka#Stream Processing#Analytics#Big Data

Real-Time Data Pipeline Architecture: Streaming Analytics at Scale

The Data Streaming Revolution

In today's digital economy, the ability to process and analyze data in real-time isn't just an advantageβ€”it's a necessity. Whether it's fraud detection, personalized recommendations, or operational monitoring, businesses need to act on data as it flows through their systems.

This comprehensive guide will walk you through building a complete real-time data pipeline architecture that can handle millions of events per second, provide sub-second analytics, and scale horizontally to meet any demand.

Why Real-Time Data Processing Matters

πŸ’Ό Business Impact

  • Instant Decision Making: React to events as they happen
  • Enhanced User Experience: Real-time personalization and recommendations
  • Fraud Prevention: Detect and stop fraudulent activity immediately
  • Operational Excellence: Monitor and optimize systems in real-time

⚑ Technical Benefits

  • Low Latency: Process events in milliseconds
  • High Throughput: Handle millions of events per second
  • Fault Tolerance: Built-in resilience and automatic recovery
  • Scalability: Linear scaling with demand

Architecture Overview

Our real-time data pipeline architecture consists of multiple layers:

Complete Data Pipeline Implementation

Apache Kafka Setup and Configuration

# docker-compose.yml for Kafka Cluster
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
      - zookeeper-logs:/var/lib/zookeeper/log

  kafka-1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-1
    container_name: kafka-1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    volumes:
      - kafka-1-data:/var/lib/kafka/data

  kafka-2:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-2
    container_name: kafka-2
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
      - "9102:9102"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29093,PLAINTEXT_HOST://localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
    volumes:
      - kafka-2-data:/var/lib/kafka/data

  kafka-3:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-3
    container_name: kafka-3
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
      - "9103:9103"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29094,PLAINTEXT_HOST://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9103
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
    volumes:
      - kafka-3-data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-1:29092,kafka-2:29093,kafka-3:29094'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.4.0
    hostname: connect
    container_name: connect
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka-1:29092,kafka-2:29093,kafka-3:29094'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    volumes:
      - ./connectors:/usr/share/confluent-hub-components

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.4.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "kafka-1:29092,kafka-2:29093,kafka-3:29094"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 3
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

volumes:
  zookeeper-data:
  zookeeper-logs:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:
yaml

Data Producer Implementation

# producers/event_producer.py
import json
import time
import random
from datetime import datetime, timezone
from typing import Dict, Any, List
from confluent_kafka import Producer
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
import logging
from dataclasses import dataclass, asdict
from threading import Thread
import uuid

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class UserEvent:
    """User interaction event"""
    event_id: str
    user_id: str
    session_id: str
    event_type: str
    timestamp: str
    properties: Dict[str, Any]
    user_agent: str
    ip_address: str
    location: Dict[str, str]

@dataclass
class TransactionEvent:
    """Financial transaction event"""
    transaction_id: str
    user_id: str
    amount: float
    currency: str
    merchant_id: str
    merchant_category: str
    timestamp: str
    status: str
    payment_method: str
    location: Dict[str, str]
    risk_score: float

@dataclass
class SystemMetric:
    """System performance metric"""
    metric_id: str
    service_name: str
    metric_name: str
    value: float
    unit: str
    timestamp: str
    tags: Dict[str, str]
    host: str

class EventProducer:
    def __init__(self, bootstrap_servers: str, schema_registry_url: str):
        # Kafka producer configuration
        self.producer_config = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': f'event-producer-{uuid.uuid4()}',
            'acks': 'all',  # Wait for all replicas
            'retries': 10,
            'retry.backoff.ms': 1000,
            'batch.size': 16384,
            'linger.ms': 5,  # Small batching for low latency
            'compression.type': 'snappy',
            'max.in.flight.requests.per.connection': 5,
            'enable.idempotence': True,
            'request.timeout.ms': 30000,
        }

        # Avro producer for schema-enabled topics
        self.avro_producer_config = {
            **self.producer_config,
            'schema.registry.url': schema_registry_url
        }

        # Initialize producers
        self.producer = Producer(self.producer_config)

        # Avro schemas
        self.user_event_schema = """
        {
            "type": "record",
            "name": "UserEvent",
            "fields": [
                {"name": "event_id", "type": "string"},
                {"name": "user_id", "type": "string"},
                {"name": "session_id", "type": "string"},
                {"name": "event_type", "type": "string"},
                {"name": "timestamp", "type": "string"},
                {"name": "properties", "type": {"type": "map", "values": "string"}},
                {"name": "user_agent", "type": "string"},
                {"name": "ip_address", "type": "string"},
                {"name": "location", "type": {"type": "map", "values": "string"}}
            ]
        }
        """

        self.transaction_schema = """
        {
            "type": "record",
            "name": "TransactionEvent",
            "fields": [
                {"name": "transaction_id", "type": "string"},
                {"name": "user_id", "type": "string"},
                {"name": "amount", "type": "double"},
                {"name": "currency", "type": "string"},
                {"name": "merchant_id", "type": "string"},
                {"name": "merchant_category", "type": "string"},
                {"name": "timestamp", "type": "string"},
                {"name": "status", "type": "string"},
                {"name": "payment_method", "type": "string"},
                {"name": "location", "type": {"type": "map", "values": "string"}},
                {"name": "risk_score", "type": "double"}
            ]
        }
        """

        # Initialize Avro producer
        self.avro_producer = AvroProducer(
            self.avro_producer_config,
            default_key_schema=None,
            default_value_schema=None
        )

        self.running = False

    def delivery_callback(self, err, msg):
        """Callback for delivery reports"""
        if err:
            logger.error(f'Message delivery failed: {err}')
        else:
            logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')

    def produce_user_event(self, event: UserEvent, topic: str = 'user-events'):
        """Produce user interaction events"""
        try:
            # Convert event to dict and serialize properties
            event_dict = asdict(event)
            event_dict['properties'] = {k: str(v) for k, v in event.properties.items()}

            self.avro_producer.produce(
                topic=topic,
                key=event.user_id,
                value=event_dict,
                value_schema=self.user_event_schema,
                callback=self.delivery_callback
            )

        except SerializerError as e:
            logger.error(f"Serialization error: {e}")
        except Exception as e:
            logger.error(f"Error producing user event: {e}")

    def produce_transaction_event(self, event: TransactionEvent, topic: str = 'transactions'):
        """Produce financial transaction events"""
        try:
            event_dict = asdict(event)
            event_dict['location'] = {k: str(v) for k, v in event.location.items()}

            self.avro_producer.produce(
                topic=topic,
                key=event.user_id,
                value=event_dict,
                value_schema=self.transaction_schema,
                callback=self.delivery_callback
            )

        except SerializerError as e:
            logger.error(f"Serialization error: {e}")
        except Exception as e:
            logger.error(f"Error producing transaction event: {e}")

    def produce_system_metric(self, metric: SystemMetric, topic: str = 'system-metrics'):
        """Produce system performance metrics"""
        try:
            metric_dict = asdict(metric)

            # Use JSON serialization for metrics (simpler schema)
            self.producer.produce(
                topic=topic,
                key=metric.service_name,
                value=json.dumps(metric_dict),
                callback=self.delivery_callback
            )

        except Exception as e:
            logger.error(f"Error producing system metric: {e}")

    def generate_sample_user_events(self, count: int = 1000):
        """Generate sample user interaction events"""
        event_types = ['page_view', 'click', 'purchase', 'login', 'logout', 'search']
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
        locations = [
            {'country': 'US', 'state': 'CA', 'city': 'San Francisco'},
            {'country': 'US', 'state': 'NY', 'city': 'New York'},
            {'country': 'UK', 'state': 'England', 'city': 'London'},
            {'country': 'DE', 'state': 'Bavaria', 'city': 'Munich'}
        ]

        for i in range(count):
            event = UserEvent(
                event_id=str(uuid.uuid4()),
                user_id=f"user_{random.randint(1, 10000)}",
                session_id=str(uuid.uuid4()),
                event_type=random.choice(event_types),
                timestamp=datetime.now(timezone.utc).isoformat(),
                properties={
                    'page_url': f'/page/{random.randint(1, 100)}',
                    'referrer': 'https://google.com',
                    'product_id': str(random.randint(1000, 9999)),
                    'category': random.choice(['electronics', 'clothing', 'books', 'home'])
                },
                user_agent=random.choice(user_agents),
                ip_address=f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
                location=random.choice(locations)
            )

            self.produce_user_event(event)

            # Add some delay to simulate realistic event timing
            if i % 100 == 0:
                time.sleep(0.1)

    def generate_sample_transactions(self, count: int = 500):
        """Generate sample financial transactions"""
        currencies = ['USD', 'EUR', 'GBP', 'JPY']
        statuses = ['completed', 'pending', 'failed', 'cancelled']
        payment_methods = ['credit_card', 'debit_card', 'paypal', 'bank_transfer']
        merchant_categories = ['grocery', 'restaurant', 'gas_station', 'retail', 'online']

        for i in range(count):
            transaction = TransactionEvent(
                transaction_id=str(uuid.uuid4()),
                user_id=f"user_{random.randint(1, 10000)}",
                amount=round(random.uniform(10.0, 1000.0), 2),
                currency=random.choice(currencies),
                merchant_id=f"merchant_{random.randint(1, 1000)}",
                merchant_category=random.choice(merchant_categories),
                timestamp=datetime.now(timezone.utc).isoformat(),
                status=random.choice(statuses),
                payment_method=random.choice(payment_methods),
                location={
                    'country': random.choice(['US', 'UK', 'DE', 'FR']),
                    'city': random.choice(['New York', 'London', 'Berlin', 'Paris'])
                },
                risk_score=round(random.uniform(0.0, 1.0), 3)
            )

            self.produce_transaction_event(transaction)

            if i % 50 == 0:
                time.sleep(0.1)

    def generate_sample_metrics(self, count: int = 200):
        """Generate sample system performance metrics"""
        services = ['web-server', 'api-server', 'database', 'cache', 'search-engine']
        metrics = ['cpu_usage', 'memory_usage', 'disk_usage', 'network_io', 'response_time']
        hosts = ['host-001', 'host-002', 'host-003', 'host-004', 'host-005']

        for i in range(count):
            service = random.choice(services)
            metric_name = random.choice(metrics)

            # Generate realistic values based on metric type
            if metric_name in ['cpu_usage', 'memory_usage', 'disk_usage']:
                value = random.uniform(0, 100)
                unit = 'percent'
            elif metric_name == 'network_io':
                value = random.uniform(0, 1000)
                unit = 'mbps'
            else:  # response_time
                value = random.uniform(50, 2000)
                unit = 'milliseconds'

            metric = SystemMetric(
                metric_id=str(uuid.uuid4()),
                service_name=service,
                metric_name=metric_name,
                value=round(value, 2),
                unit=unit,
                timestamp=datetime.now(timezone.utc).isoformat(),
                tags={
                    'environment': 'production',
                    'region': random.choice(['us-west-2', 'eu-west-1', 'ap-southeast-1']),
                    'datacenter': random.choice(['dc1', 'dc2', 'dc3'])
                },
                host=random.choice(hosts)
            )

            self.produce_system_metric(metric)

            if i % 25 == 0:
                time.sleep(0.1)

    def start_continuous_generation(self):
        """Start continuous event generation in background threads"""
        self.running = True

        def user_events_thread():
            while self.running:
                self.generate_sample_user_events(100)
                time.sleep(5)

        def transactions_thread():
            while self.running:
                self.generate_sample_transactions(50)
                time.sleep(10)

        def metrics_thread():
            while self.running:
                self.generate_sample_metrics(20)
                time.sleep(2)

        # Start background threads
        Thread(target=user_events_thread, daemon=True).start()
        Thread(target=transactions_thread, daemon=True).start()
        Thread(target=metrics_thread, daemon=True).start()

        logger.info("Started continuous event generation")

    def stop(self):
        """Stop event generation and flush producers"""
        self.running = False
        self.producer.flush(timeout=30)
        self.avro_producer.flush(timeout=30)
        logger.info("Stopped event generation and flushed producers")

if __name__ == "__main__":
    # Initialize producer
    producer = EventProducer(
        bootstrap_servers='localhost:9092,localhost:9093,localhost:9094',
        schema_registry_url='http://localhost:8081'
    )

    try:
        # Generate initial sample data
        logger.info("Generating sample data...")
        producer.generate_sample_user_events(1000)
        producer.generate_sample_transactions(500)
        producer.generate_sample_metrics(200)

        # Start continuous generation
        producer.start_continuous_generation()

        # Keep running
        while True:
            time.sleep(60)
            logger.info("Producer is running...")

    except KeyboardInterrupt:
        logger.info("Shutting down producer...")
        producer.stop()
python

Apache Flink Stream Processing

# stream_processors/flink_processor.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import MapFunction, FilterFunction, WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.common.time import Time
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Any, List
import statistics

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EventDeserializer(MapFunction):
    """Deserialize JSON events from Kafka"""

    def map(self, value: str) -> Dict[str, Any]:
        try:
            return json.loads(value)
        except json.JSONDecodeError as e:
            logger.error(f"Failed to parse JSON: {e}")
            return {}

class FraudDetectionFilter(FilterFunction):
    """Real-time fraud detection for transactions"""

    def __init__(self):
        self.fraud_patterns = {
            'high_amount_threshold': 5000.0,
            'high_risk_score_threshold': 0.8,
            'suspicious_countries': ['XX', 'YY'],  # Example country codes
            'velocity_threshold': 3  # Max transactions per minute
        }

    def filter(self, transaction: Dict[str, Any]) -> bool:
        """Return True if transaction is potentially fraudulent"""
        try:
            amount = float(transaction.get('amount', 0))
            risk_score = float(transaction.get('risk_score', 0))
            country = transaction.get('location', {}).get('country', '')

            # High amount check
            if amount > self.fraud_patterns['high_amount_threshold']:
                return True

            # High risk score check
            if risk_score > self.fraud_patterns['high_risk_score_threshold']:
                return True

            # Suspicious country check
            if country in self.fraud_patterns['suspicious_countries']:
                return True

            return False

        except (ValueError, KeyError) as e:
            logger.error(f"Error in fraud detection: {e}")
            return False

class TransactionAggregator(WindowFunction):
    """Aggregate transactions within time windows"""

    def apply(self, key: str, window, inputs, collector):
        """Aggregate transaction data for the window"""
        transactions = list(inputs)
        if not transactions:
            return

        # Calculate aggregations
        total_amount = sum(float(t.get('amount', 0)) for t in transactions)
        avg_amount = total_amount / len(transactions)
        transaction_count = len(transactions)
        unique_users = len(set(t.get('user_id') for t in transactions))

        # Calculate risk statistics
        risk_scores = [float(t.get('risk_score', 0)) for t in transactions]
        avg_risk_score = statistics.mean(risk_scores) if risk_scores else 0
        max_risk_score = max(risk_scores) if risk_scores else 0

        # Group by merchant category
        category_stats = {}
        for transaction in transactions:
            category = transaction.get('merchant_category', 'unknown')
            if category not in category_stats:
                category_stats[category] = {'count': 0, 'total_amount': 0}
            category_stats[category]['count'] += 1
            category_stats[category]['total_amount'] += float(transaction.get('amount', 0))

        # Create aggregated result
        result = {
            'window_start': window.start,
            'window_end': window.end,
            'total_amount': round(total_amount, 2),
            'avg_amount': round(avg_amount, 2),
            'transaction_count': transaction_count,
            'unique_users': unique_users,
            'avg_risk_score': round(avg_risk_score, 3),
            'max_risk_score': round(max_risk_score, 3),
            'category_breakdown': category_stats,
            'timestamp': datetime.now(timezone.utc).isoformat()
        }

        collector.collect(json.dumps(result))

class UserActivityAggregator(WindowFunction):
    """Aggregate user activity within time windows"""

    def apply(self, key: str, window, inputs, collector):
        """Aggregate user activity for the window"""
        events = list(inputs)
        if not events:
            return

        # Calculate event type distribution
        event_type_counts = {}
        for event in events:
            event_type = event.get('event_type', 'unknown')
            event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1

        # Calculate unique sessions and users
        unique_sessions = len(set(e.get('session_id') for e in events))
        unique_users = len(set(e.get('user_id') for e in events))

        # Location analysis
        locations = [e.get('location', {}) for e in events]
        countries = [l.get('country') for l in locations if l.get('country')]
        country_counts = {}
        for country in countries:
            country_counts[country] = country_counts.get(country, 0) + 1

        result = {
            'window_start': window.start,
            'window_end': window.end,
            'total_events': len(events),
            'unique_sessions': unique_sessions,
            'unique_users': unique_users,
            'event_type_distribution': event_type_counts,
            'top_countries': dict(sorted(country_counts.items(), key=lambda x: x[1], reverse=True)[:5]),
            'timestamp': datetime.now(timezone.utc).isoformat()
        }

        collector.collect(json.dumps(result))

class SystemMetricsProcessor:
    """Process system metrics for real-time monitoring"""

    def __init__(self, env: StreamExecutionEnvironment):
        self.env = env

    def setup_metric_processing(self):
        """Setup metric processing pipeline"""
        # Kafka consumer for system metrics
        kafka_consumer = FlinkKafkaConsumer(
            topics='system-metrics',
            deserialization_schema=SimpleStringSchema(),
            properties={
                'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
                'group.id': 'metrics-processor'
            }
        )

        # Create data stream
        metrics_stream = self.env.add_source(kafka_consumer)

        # Parse JSON metrics
        parsed_metrics = metrics_stream.map(EventDeserializer(), output_type=Types.PICKLED_BYTE_ARRAY())

        # Filter for high-severity metrics
        critical_metrics = parsed_metrics.filter(self.is_critical_metric)

        # Aggregate metrics by service
        service_aggregates = (parsed_metrics
                            .key_by(lambda x: x.get('service_name', 'unknown'))
                            .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                            .apply(MetricAggregator()))

        # Send alerts for critical metrics
        critical_alerts = critical_metrics.map(self.create_alert, output_type=Types.STRING())

        # Send aggregated metrics to output topics
        self.send_to_kafka(service_aggregates, 'service-metrics-aggregated')
        self.send_to_kafka(critical_alerts, 'metric-alerts')

    def is_critical_metric(self, metric: Dict[str, Any]) -> bool:
        """Check if metric indicates a critical condition"""
        metric_name = metric.get('metric_name', '')
        value = float(metric.get('value', 0))

        # Define thresholds for different metrics
        thresholds = {
            'cpu_usage': 90.0,
            'memory_usage': 85.0,
            'disk_usage': 90.0,
            'response_time': 5000.0  # milliseconds
        }

        threshold = thresholds.get(metric_name)
        return threshold and value > threshold

    def create_alert(self, metric: Dict[str, Any]) -> str:
        """Create alert message for critical metrics"""
        alert = {
            'alert_type': 'metric_threshold_exceeded',
            'service': metric.get('service_name'),
            'metric': metric.get('metric_name'),
            'value': metric.get('value'),
            'threshold_exceeded': True,
            'severity': 'critical',
            'timestamp': datetime.now(timezone.utc).isoformat(),
            'host': metric.get('host'),
            'tags': metric.get('tags', {})
        }
        return json.dumps(alert)

    def send_to_kafka(self, stream, topic: str):
        """Send processed data to Kafka topic"""
        kafka_producer = FlinkKafkaProducer(
            topic=topic,
            serialization_schema=SimpleStringSchema(),
            producer_config={
                'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
                'acks': 'all',
                'retries': 3
            }
        )
        stream.add_sink(kafka_producer)

class MetricAggregator(WindowFunction):
    """Aggregate system metrics within time windows"""

    def apply(self, key: str, window, inputs, collector):
        """Aggregate metrics for a service within the window"""
        metrics = list(inputs)
        if not metrics:
            return

        service_name = key
        metric_summaries = {}

        # Group metrics by metric name
        for metric in metrics:
            metric_name = metric.get('metric_name', 'unknown')
            value = float(metric.get('value', 0))

            if metric_name not in metric_summaries:
                metric_summaries[metric_name] = {
                    'values': [],
                    'unit': metric.get('unit', ''),
                    'count': 0
                }

            metric_summaries[metric_name]['values'].append(value)
            metric_summaries[metric_name]['count'] += 1

        # Calculate statistics for each metric
        aggregated_metrics = {}
        for metric_name, summary in metric_summaries.items():
            values = summary['values']
            aggregated_metrics[metric_name] = {
                'count': len(values),
                'min': min(values),
                'max': max(values),
                'avg': statistics.mean(values),
                'median': statistics.median(values),
                'std_dev': statistics.stdev(values) if len(values) > 1 else 0,
                'unit': summary['unit']
            }

        result = {
            'service_name': service_name,
            'window_start': window.start,
            'window_end': window.end,
            'metrics': aggregated_metrics,
            'timestamp': datetime.now(timezone.utc).isoformat()
        }

        collector.collect(json.dumps(result))

class RealTimeDataProcessor:
    """Main class for real-time data processing with Apache Flink"""

    def __init__(self):
        # Create Flink execution environment
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.env.set_parallelism(4)  # Adjust based on your cluster

        # Enable checkpointing for fault tolerance
        self.env.enable_checkpointing(60000)  # Checkpoint every minute

    def setup_transaction_processing(self):
        """Setup real-time transaction processing pipeline"""
        logger.info("Setting up transaction processing pipeline...")

        # Kafka consumer for transactions
        kafka_consumer = FlinkKafkaConsumer(
            topics='transactions',
            deserialization_schema=SimpleStringSchema(),
            properties={
                'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
                'group.id': 'transaction-processor',
                'auto.offset.reset': 'latest'
            }
        )

        # Create transaction stream
        transaction_stream = self.env.add_source(kafka_consumer)

        # Parse JSON transactions
        parsed_transactions = transaction_stream.map(EventDeserializer(), output_type=Types.PICKLED_BYTE_ARRAY())

        # Fraud detection
        fraud_alerts = (parsed_transactions
                       .filter(FraudDetectionFilter())
                       .map(lambda t: json.dumps({
                           'alert_type': 'potential_fraud',
                           'transaction_id': t.get('transaction_id'),
                           'user_id': t.get('user_id'),
                           'amount': t.get('amount'),
                           'risk_score': t.get('risk_score'),
                           'timestamp': datetime.now(timezone.utc).isoformat(),
                           'severity': 'high'
                       }), output_type=Types.STRING()))

        # Transaction aggregations (5-minute windows)
        transaction_aggregates = (parsed_transactions
                                .key_by(lambda x: 'global')  # Global aggregation
                                .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
                                .apply(TransactionAggregator()))

        # Send results to output topics
        self.send_to_kafka(fraud_alerts, 'fraud-alerts')
        self.send_to_kafka(transaction_aggregates, 'transaction-analytics')

    def setup_user_activity_processing(self):
        """Setup real-time user activity processing pipeline"""
        logger.info("Setting up user activity processing pipeline...")

        # Kafka consumer for user events
        kafka_consumer = FlinkKafkaConsumer(
            topics='user-events',
            deserialization_schema=SimpleStringSchema(),
            properties={
                'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
                'group.id': 'user-activity-processor',
                'auto.offset.reset': 'latest'
            }
        )

        # Create user activity stream
        activity_stream = self.env.add_source(kafka_consumer)

        # Parse JSON events
        parsed_events = activity_stream.map(EventDeserializer(), output_type=Types.PICKLED_BYTE_ARRAY())

        # User session analysis (1-minute windows)
        session_analytics = (parsed_events
                           .key_by(lambda x: 'global')
                           .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                           .apply(UserActivityAggregator()))

        # Real-time user segmentation
        user_segments = (parsed_events
                        .key_by(lambda x: x.get('user_id', 'unknown'))
                        .map(self.calculate_user_segment, output_type=Types.STRING()))

        # Send results to output topics
        self.send_to_kafka(session_analytics, 'user-analytics')
        self.send_to_kafka(user_segments, 'user-segments')

    def calculate_user_segment(self, event: Dict[str, Any]) -> str:
        """Calculate user segment based on activity patterns"""
        user_id = event.get('user_id', 'unknown')
        event_type = event.get('event_type', '')

        # Simple segmentation logic (in production, use ML models)
        segment = 'regular'

        if event_type == 'purchase':
            segment = 'buyer'
        elif event_type in ['search', 'click']:
            segment = 'browser'
        elif event_type in ['login', 'logout']:
            segment = 'authenticated'

        result = {
            'user_id': user_id,
            'segment': segment,
            'event_type': event_type,
            'timestamp': datetime.now(timezone.utc).isoformat()
        }

        return json.dumps(result)

    def send_to_kafka(self, stream, topic: str):
        """Send processed data to Kafka topic"""
        kafka_producer = FlinkKafkaProducer(
            topic=topic,
            serialization_schema=SimpleStringSchema(),
            producer_config={
                'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
                'acks': 'all',
                'retries': 3,
                'batch.size': 16384,
                'linger.ms': 5
            }
        )
        stream.add_sink(kafka_producer)

    def run_processing_pipeline(self):
        """Run the complete data processing pipeline"""
        logger.info("Starting real-time data processing pipeline...")

        # Setup all processing pipelines
        self.setup_transaction_processing()
        self.setup_user_activity_processing()

        # Setup system metrics processing
        metrics_processor = SystemMetricsProcessor(self.env)
        metrics_processor.setup_metric_processing()

        # Execute the job
        try:
            self.env.execute("Real-Time Data Processing Pipeline")
        except Exception as e:
            logger.error(f"Pipeline execution failed: {e}")
            raise

if __name__ == "__main__":
    # Initialize and run the processor
    processor = RealTimeDataProcessor()
    processor.run_processing_pipeline()
python

Data Lake Storage with Apache Iceberg

# storage/iceberg_writer.py
import json
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField, StringType, DoubleType, TimestampType,
    IntegerType, BooleanType, MapType
)
import pandas as pd
from datetime import datetime, timezone
import logging
from typing import Dict, Any, List
import boto3
from confluent_kafka import Consumer
import threading
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IcebergDataLakeWriter:
    """Write streaming data to Apache Iceberg tables in data lake"""

    def __init__(self, catalog_name: str = 'glue_catalog'):
        # Initialize Iceberg catalog (AWS Glue in this example)
        self.catalog = load_catalog(
            name=catalog_name,
            **{
                'type': 'glue',
                'warehouse': 's3://your-data-lake-bucket/warehouse/',
                'io-impl': 'pyiceberg.io.pyarrow.PyArrowFileIO'
            }
        )

        self.consumer_config = {
            'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
            'group.id': 'iceberg-writer',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': True,
            'auto.commit.interval.ms': 5000,
            'fetch.min.bytes': 1024,
            'fetch.max.wait.ms': 500
        }

        # Buffer for batch writing
        self.batch_size = 1000
        self.flush_interval = 60  # seconds

        # Data buffers
        self.user_events_buffer = []
        self.transactions_buffer = []
        self.metrics_buffer = []

        # Schemas for different data types
        self.setup_iceberg_schemas()

    def setup_iceberg_schemas(self):
        """Define Iceberg table schemas"""

        # User events schema
        self.user_events_schema = Schema(
            NestedField(1, "event_id", StringType(), required=True),
            NestedField(2, "user_id", StringType(), required=True),
            NestedField(3, "session_id", StringType(), required=True),
            NestedField(4, "event_type", StringType(), required=True),
            NestedField(5, "timestamp", TimestampType(), required=True),
            NestedField(6, "properties", MapType(7, StringType(), 8, StringType(), value_required=False), required=False),
            NestedField(9, "user_agent", StringType(), required=False),
            NestedField(10, "ip_address", StringType(), required=False),
            NestedField(11, "country", StringType(), required=False),
            NestedField(12, "city", StringType(), required=False),
            NestedField(13, "year", IntegerType(), required=True),
            NestedField(14, "month", IntegerType(), required=True),
            NestedField(15, "day", IntegerType(), required=True),
            NestedField(16, "hour", IntegerType(), required=True)
        )

        # Transactions schema
        self.transactions_schema = Schema(
            NestedField(1, "transaction_id", StringType(), required=True),
            NestedField(2, "user_id", StringType(), required=True),
            NestedField(3, "amount", DoubleType(), required=True),
            NestedField(4, "currency", StringType(), required=True),
            NestedField(5, "merchant_id", StringType(), required=True),
            NestedField(6, "merchant_category", StringType(), required=True),
            NestedField(7, "timestamp", TimestampType(), required=True),
            NestedField(8, "status", StringType(), required=True),
            NestedField(9, "payment_method", StringType(), required=True),
            NestedField(10, "country", StringType(), required=False),
            NestedField(11, "city", StringType(), required=False),
            NestedField(12, "risk_score", DoubleType(), required=True),
            NestedField(13, "year", IntegerType(), required=True),
            NestedField(14, "month", IntegerType(), required=True),
            NestedField(15, "day", IntegerType(), required=True),
            NestedField(16, "hour", IntegerType(), required=True)
        )

        # System metrics schema
        self.metrics_schema = Schema(
            NestedField(1, "metric_id", StringType(), required=True),
            NestedField(2, "service_name", StringType(), required=True),
            NestedField(3, "metric_name", StringType(), required=True),
            NestedField(4, "value", DoubleType(), required=True),
            NestedField(5, "unit", StringType(), required=True),
            NestedField(6, "timestamp", TimestampType(), required=True),
            NestedField(7, "host", StringType(), required=True),
            NestedField(8, "environment", StringType(), required=False),
            NestedField(9, "region", StringType(), required=False),
            NestedField(10, "year", IntegerType(), required=True),
            NestedField(11, "month", IntegerType(), required=True),
            NestedField(12, "day", IntegerType(), required=True),
            NestedField(13, "hour", IntegerType(), required=True)
        )

    def create_tables_if_not_exists(self):
        """Create Iceberg tables if they don't exist"""
        namespace = "data_lake"

        # Create namespace if it doesn't exist
        try:
            self.catalog.create_namespace(namespace)
        except Exception as e:
            logger.info(f"Namespace {namespace} might already exist: {e}")

        # Create tables
        tables = [
            ("user_events", self.user_events_schema),
            ("transactions", self.transactions_schema),
            ("system_metrics", self.metrics_schema)
        ]

        for table_name, schema in tables:
            table_identifier = f"{namespace}.{table_name}"
            try:
                self.catalog.create_table(
                    identifier=table_identifier,
                    schema=schema,
                    partition_spec={
                        "year": "identity",
                        "month": "identity",
                        "day": "identity"
                    }
                )
                logger.info(f"Created table: {table_identifier}")
            except Exception as e:
                logger.info(f"Table {table_identifier} might already exist: {e}")

    def process_user_event(self, event_data: Dict[str, Any]):
        """Process and buffer user event for batch writing"""
        try:
            timestamp = datetime.fromisoformat(event_data['timestamp'].replace('Z', '+00:00'))
            location = event_data.get('location', {})

            processed_event = {
                'event_id': event_data['event_id'],
                'user_id': event_data['user_id'],
                'session_id': event_data['session_id'],
                'event_type': event_data['event_type'],
                'timestamp': timestamp,
                'properties': event_data.get('properties', {}),
                'user_agent': event_data.get('user_agent', ''),
                'ip_address': event_data.get('ip_address', ''),
                'country': location.get('country', ''),
                'city': location.get('city', ''),
                'year': timestamp.year,
                'month': timestamp.month,
                'day': timestamp.day,
                'hour': timestamp.hour
            }

            self.user_events_buffer.append(processed_event)

            if len(self.user_events_buffer) >= self.batch_size:
                self.flush_user_events()

        except Exception as e:
            logger.error(f"Error processing user event: {e}")

    def process_transaction(self, transaction_data: Dict[str, Any]):
        """Process and buffer transaction for batch writing"""
        try:
            timestamp = datetime.fromisoformat(transaction_data['timestamp'].replace('Z', '+00:00'))
            location = transaction_data.get('location', {})

            processed_transaction = {
                'transaction_id': transaction_data['transaction_id'],
                'user_id': transaction_data['user_id'],
                'amount': float(transaction_data['amount']),
                'currency': transaction_data['currency'],
                'merchant_id': transaction_data['merchant_id'],
                'merchant_category': transaction_data['merchant_category'],
                'timestamp': timestamp,
                'status': transaction_data['status'],
                'payment_method': transaction_data['payment_method'],
                'country': location.get('country', ''),
                'city': location.get('city', ''),
                'risk_score': float(transaction_data['risk_score']),
                'year': timestamp.year,
                'month': timestamp.month,
                'day': timestamp.day,
                'hour': timestamp.hour
            }

            self.transactions_buffer.append(processed_transaction)

            if len(self.transactions_buffer) >= self.batch_size:
                self.flush_transactions()

        except Exception as e:
            logger.error(f"Error processing transaction: {e}")

    def process_metric(self, metric_data: Dict[str, Any]):
        """Process and buffer system metric for batch writing"""
        try:
            timestamp = datetime.fromisoformat(metric_data['timestamp'].replace('Z', '+00:00'))
            tags = metric_data.get('tags', {})

            processed_metric = {
                'metric_id': metric_data['metric_id'],
                'service_name': metric_data['service_name'],
                'metric_name': metric_data['metric_name'],
                'value': float(metric_data['value']),
                'unit': metric_data['unit'],
                'timestamp': timestamp,
                'host': metric_data['host'],
                'environment': tags.get('environment', ''),
                'region': tags.get('region', ''),
                'year': timestamp.year,
                'month': timestamp.month,
                'day': timestamp.day,
                'hour': timestamp.hour
            }

            self.metrics_buffer.append(processed_metric)

            if len(self.metrics_buffer) >= self.batch_size:
                self.flush_metrics()

        except Exception as e:
            logger.error(f"Error processing metric: {e}")

    def flush_user_events(self):
        """Write buffered user events to Iceberg table"""
        if not self.user_events_buffer:
            return

        try:
            table = self.catalog.load_table("data_lake.user_events")

            # Convert to PyArrow table
            df = pd.DataFrame(self.user_events_buffer)
            arrow_table = pa.Table.from_pandas(df)

            # Append to Iceberg table
            table.append(arrow_table)

            logger.info(f"Flushed {len(self.user_events_buffer)} user events to Iceberg")
            self.user_events_buffer.clear()

        except Exception as e:
            logger.error(f"Error flushing user events: {e}")

    def flush_transactions(self):
        """Write buffered transactions to Iceberg table"""
        if not self.transactions_buffer:
            return

        try:
            table = self.catalog.load_table("data_lake.transactions")

            df = pd.DataFrame(self.transactions_buffer)
            arrow_table = pa.Table.from_pandas(df)

            table.append(arrow_table)

            logger.info(f"Flushed {len(self.transactions_buffer)} transactions to Iceberg")
            self.transactions_buffer.clear()

        except Exception as e:
            logger.error(f"Error flushing transactions: {e}")

    def flush_metrics(self):
        """Write buffered metrics to Iceberg table"""
        if not self.metrics_buffer:
            return

        try:
            table = self.catalog.load_table("data_lake.system_metrics")

            df = pd.DataFrame(self.metrics_buffer)
            arrow_table = pa.Table.from_pandas(df)

            table.append(arrow_table)

            logger.info(f"Flushed {len(self.metrics_buffer)} metrics to Iceberg")
            self.metrics_buffer.clear()

        except Exception as e:
            logger.error(f"Error flushing metrics: {e}")

    def flush_all_buffers(self):
        """Flush all data buffers"""
        self.flush_user_events()
        self.flush_transactions()
        self.flush_metrics()

    def start_kafka_consumers(self):
        """Start Kafka consumers for different topics"""

        def consume_topic(topic: str, process_func):
            """Generic topic consumer"""
            consumer = Consumer({
                **self.consumer_config,
                'group.id': f'iceberg-writer-{topic}'
            })
            consumer.subscribe([topic])

            try:
                while True:
                    msg = consumer.poll(timeout=1.0)
                    if msg is None:
                        continue

                    if msg.error():
                        logger.error(f"Consumer error: {msg.error()}")
                        continue

                    try:
                        data = json.loads(msg.value().decode('utf-8'))
                        process_func(data)
                    except json.JSONDecodeError as e:
                        logger.error(f"Failed to decode message: {e}")

            except KeyboardInterrupt:
                logger.info(f"Stopping consumer for {topic}")
            finally:
                consumer.close()

        # Start consumer threads
        topics_and_processors = [
            ('user-events', self.process_user_event),
            ('transactions', self.process_transaction),
            ('system-metrics', self.process_metric)
        ]

        for topic, process_func in topics_and_processors:
            thread = threading.Thread(
                target=consume_topic,
                args=(topic, process_func),
                daemon=True
            )
            thread.start()
            logger.info(f"Started consumer for {topic}")

    def start_flush_scheduler(self):
        """Start periodic buffer flushing"""
        def flush_periodically():
            while True:
                time.sleep(self.flush_interval)
                self.flush_all_buffers()

        flush_thread = threading.Thread(target=flush_periodically, daemon=True)
        flush_thread.start()
        logger.info(f"Started periodic flushing every {self.flush_interval} seconds")

    def run(self):
        """Run the Iceberg data lake writer"""
        logger.info("Starting Iceberg data lake writer...")

        # Create tables if they don't exist
        self.create_tables_if_not_exists()

        # Start Kafka consumers
        self.start_kafka_consumers()

        # Start flush scheduler
        self.start_flush_scheduler()

        # Keep the main thread alive
        try:
            while True:
                time.sleep(60)
                logger.info("Data lake writer is running...")
        except KeyboardInterrupt:
            logger.info("Shutting down data lake writer...")
            self.flush_all_buffers()

if __name__ == "__main__":
    writer = IcebergDataLakeWriter()
    writer.run()
python

Real-Time Analytics Dashboard

# analytics/real_time_dashboard.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
import redis
from confluent_kafka import Consumer
import threading
import time
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RealTimeDashboard:
    """Real-time analytics dashboard using Streamlit"""

    def __init__(self):
        # Redis for caching dashboard data
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)

        # Kafka consumer configuration
        self.consumer_config = {
            'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
            'group.id': 'dashboard-consumer',
            'auto.offset.reset': 'latest',
            'enable.auto.commit': True
        }

        # Initialize session state
        if 'data_initialized' not in st.session_state:
            st.session_state.data_initialized = False

        self.setup_data_consumers()

    def setup_data_consumers(self):
        """Setup Kafka consumers for real-time data"""
        if st.session_state.data_initialized:
            return

        def consume_analytics_data():
            """Consume processed analytics data"""
            consumer = Consumer({
                **self.consumer_config,
                'group.id': 'dashboard-analytics'
            })

            topics = [
                'transaction-analytics',
                'user-analytics',
                'service-metrics-aggregated',
                'fraud-alerts'
            ]
            consumer.subscribe(topics)

            while True:
                try:
                    msg = consumer.poll(timeout=1.0)
                    if msg is None:
                        continue

                    if msg.error():
                        logger.error(f"Consumer error: {msg.error()}")
                        continue

                    topic = msg.topic()
                    data = json.loads(msg.value().decode('utf-8'))

                    # Store in Redis with expiration
                    self.redis_client.setex(
                        f"dashboard:{topic}:latest",
                        300,  # 5 minutes TTL
                        json.dumps(data)
                    )

                    # Keep historical data (last hour)
                    timestamp = int(time.time())
                    self.redis_client.zadd(
                        f"dashboard:{topic}:history",
                        {json.dumps(data): timestamp}
                    )

                    # Remove old data (older than 1 hour)
                    cutoff = timestamp - 3600
                    self.redis_client.zremrangebyscore(
                        f"dashboard:{topic}:history",
                        0, cutoff
                    )

                except Exception as e:
                    logger.error(f"Error in analytics consumer: {e}")
                    time.sleep(5)

        # Start consumer in background thread
        if not st.session_state.data_initialized:
            consumer_thread = threading.Thread(
                target=consume_analytics_data,
                daemon=True
            )
            consumer_thread.start()
            st.session_state.data_initialized = True

    def get_latest_data(self, data_type: str) -> dict:
        """Get latest data for dashboard"""
        try:
            data = self.redis_client.get(f"dashboard:{data_type}:latest")
            if data:
                return json.loads(data)
            return {}
        except Exception as e:
            logger.error(f"Error getting latest data for {data_type}: {e}")
            return {}

    def get_historical_data(self, data_type: str, hours: int = 1) -> List[dict]:
        """Get historical data for charts"""
        try:
            cutoff = int(time.time()) - (hours * 3600)
            data_list = self.redis_client.zrangebyscore(
                f"dashboard:{data_type}:history",
                cutoff, '+inf'
            )
            return [json.loads(data) for data in data_list]
        except Exception as e:
            logger.error(f"Error getting historical data for {data_type}: {e}")
            return []

    def render_transaction_analytics(self):
        """Render transaction analytics section"""
        st.subheader("πŸ’³ Transaction Analytics")

        # Get latest transaction data
        latest_data = self.get_latest_data('transaction-analytics')

        if latest_data:
            # KPI metrics
            col1, col2, col3, col4 = st.columns(4)

            with col1:
                st.metric(
                    "Total Volume",
                    f"${latest_data.get('total_amount', 0):,.2f}",
                    delta=None
                )

            with col2:
                st.metric(
                    "Transaction Count",
                    f"{latest_data.get('transaction_count', 0):,}",
                    delta=None
                )

            with col3:
                st.metric(
                    "Average Amount",
                    f"${latest_data.get('avg_amount', 0):,.2f}",
                    delta=None
                )

            with col4:
                st.metric(
                    "Unique Users",
                    f"{latest_data.get('unique_users', 0):,}",
                    delta=None
                )

            # Category breakdown
            if 'category_breakdown' in latest_data:
                st.subheader("Transaction Categories")

                categories = latest_data['category_breakdown']
                category_df = pd.DataFrame([
                    {'Category': cat, 'Count': data['count'], 'Amount': data['total_amount']}
                    for cat, data in categories.items()
                ])

                col1, col2 = st.columns(2)

                with col1:
                    fig = px.pie(
                        category_df,
                        values='Count',
                        names='Category',
                        title='Transactions by Category (Count)'
                    )
                    st.plotly_chart(fig)

                with col2:
                    fig = px.pie(
                        category_df,
                        values='Amount',
                        names='Category',
                        title='Transaction Volume by Category'
                    )
                    st.plotly_chart(fig)

        # Historical trend
        historical_data = self.get_historical_data('transaction-analytics', hours=2)
        if historical_data:
            st.subheader("Transaction Trends")

            trend_df = pd.DataFrame([
                {
                    'timestamp': pd.to_datetime(data['timestamp']),
                    'total_amount': data.get('total_amount', 0),
                    'transaction_count': data.get('transaction_count', 0),
                    'avg_risk_score': data.get('avg_risk_score', 0)
                }
                for data in historical_data
            ])

            fig = make_subplots(
                rows=2, cols=2,
                subplot_titles=[
                    'Transaction Volume', 'Transaction Count',
                    'Average Risk Score', 'Unique Users'
                ]
            )

            fig.add_trace(
                go.Scatter(
                    x=trend_df['timestamp'],
                    y=trend_df['total_amount'],
                    name='Volume'
                ),
                row=1, col=1
            )

            fig.add_trace(
                go.Scatter(
                    x=trend_df['timestamp'],
                    y=trend_df['transaction_count'],
                    name='Count'
                ),
                row=1, col=2
            )

            fig.add_trace(
                go.Scatter(
                    x=trend_df['timestamp'],
                    y=trend_df['avg_risk_score'],
                    name='Risk Score'
                ),
                row=2, col=1
            )

            fig.update_layout(height=600, showlegend=False)
            st.plotly_chart(fig)

    def render_user_analytics(self):
        """Render user activity analytics section"""
        st.subheader("πŸ‘₯ User Activity Analytics")

        latest_data = self.get_latest_data('user-analytics')

        if latest_data:
            # User activity metrics
            col1, col2, col3, col4 = st.columns(4)

            with col1:
                st.metric(
                    "Total Events",
                    f"{latest_data.get('total_events', 0):,}",
                    delta=None
                )

            with col2:
                st.metric(
                    "Unique Users",
                    f"{latest_data.get('unique_users', 0):,}",
                    delta=None
                )

            with col3:
                st.metric(
                    "Active Sessions",
                    f"{latest_data.get('unique_sessions', 0):,}",
                    delta=None
                )

            with col4:
                avg_events_per_user = (
                    latest_data.get('total_events', 0) /
                    max(latest_data.get('unique_users', 1), 1)
                )
                st.metric(
                    "Events per User",
                    f"{avg_events_per_user:.1f}",
                    delta=None
                )

            # Event type distribution
            if 'event_type_distribution' in latest_data:
                st.subheader("Event Types")

                event_types = latest_data['event_type_distribution']
                event_df = pd.DataFrame([
                    {'Event Type': event, 'Count': count}
                    for event, count in event_types.items()
                ])

                fig = px.bar(
                    event_df,
                    x='Event Type',
                    y='Count',
                    title='Event Distribution'
                )
                st.plotly_chart(fig)

            # Top countries
            if 'top_countries' in latest_data:
                st.subheader("Top Countries")

                countries = latest_data['top_countries']
                country_df = pd.DataFrame([
                    {'Country': country, 'Users': count}
                    for country, count in countries.items()
                ])

                fig = px.bar(
                    country_df,
                    x='Country',
                    y='Users',
                    title='Users by Country'
                )
                st.plotly_chart(fig)

    def render_system_metrics(self):
        """Render system performance metrics"""
        st.subheader("πŸ–₯️ System Performance")

        latest_data = self.get_latest_data('service-metrics-aggregated')

        if latest_data and 'metrics' in latest_data:
            service_name = latest_data.get('service_name', 'Unknown Service')
            st.write(f"**Service:** {service_name}")

            metrics = latest_data['metrics']

            # Create metrics grid
            metric_names = list(metrics.keys())
            cols = st.columns(min(len(metric_names), 4))

            for i, metric_name in enumerate(metric_names):
                metric_data = metrics[metric_name]
                col_idx = i % len(cols)

                with cols[col_idx]:
                    st.metric(
                        f"{metric_name.replace('_', ' ').title()}",
                        f"{metric_data.get('avg', 0):.2f} {metric_data.get('unit', '')}",
                        delta=f"Max: {metric_data.get('max', 0):.2f}"
                    )

            # Detailed metrics chart
            if len(metric_names) > 0:
                selected_metric = st.selectbox(
                    "Select metric for detailed view:",
                    metric_names
                )

                if selected_metric:
                    metric_info = metrics[selected_metric]

                    col1, col2 = st.columns(2)

                    with col1:
                        st.json({
                            "Average": metric_info.get('avg', 0),
                            "Minimum": metric_info.get('min', 0),
                            "Maximum": metric_info.get('max', 0),
                            "Median": metric_info.get('median', 0),
                            "Std Dev": metric_info.get('std_dev', 0),
                            "Count": metric_info.get('count', 0)
                        })

    def render_fraud_alerts(self):
        """Render fraud detection alerts"""
        st.subheader("🚨 Fraud Alerts")

        # Get recent alerts
        try:
            alerts_data = self.redis_client.zrevrange(
                "dashboard:fraud-alerts:history",
                0, 9,  # Last 10 alerts
                withscores=True
            )

            if alerts_data:
                alerts = []
                for alert_data, timestamp in alerts_data:
                    alert = json.loads(alert_data)
                    alert['timestamp'] = datetime.fromtimestamp(timestamp)
                    alerts.append(alert)

                # Display alerts
                for alert in alerts:
                    with st.expander(
                        f"⚠️ {alert.get('alert_type', 'Unknown')} - "
                        f"{alert['timestamp'].strftime('%H:%M:%S')}",
                        expanded=len(alerts) <= 3
                    ):
                        col1, col2 = st.columns(2)

                        with col1:
                            st.write(f"**Transaction ID:** {alert.get('transaction_id', 'N/A')}")
                            st.write(f"**User ID:** {alert.get('user_id', 'N/A')}")
                            st.write(f"**Amount:** ${alert.get('amount', 0):,.2f}")

                        with col2:
                            st.write(f"**Risk Score:** {alert.get('risk_score', 0):.3f}")
                            st.write(f"**Severity:** {alert.get('severity', 'Unknown')}")
                            st.write(f"**Time:** {alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
            else:
                st.info("No recent fraud alerts")

        except Exception as e:
            st.error(f"Error loading fraud alerts: {e}")

    def render_dashboard(self):
        """Main dashboard rendering function"""
        st.set_page_config(
            page_title="Real-Time Data Analytics Dashboard",
            page_icon="πŸ“Š",
            layout="wide"
        )

        st.title("πŸš€ Real-Time Data Analytics Dashboard")
        st.markdown("---")

        # Auto-refresh
        if st.button("πŸ”„ Refresh Data"):
            st.experimental_rerun()

        # Add auto-refresh every 30 seconds
        st.markdown(
            """
            <script>
            setTimeout(function(){
                window.location.reload(1);
            }, 30000);
            </script>
            """,
            unsafe_allow_html=True
        )

        # Render different sections
        try:
            self.render_transaction_analytics()
            st.markdown("---")

            self.render_user_analytics()
            st.markdown("---")

            self.render_system_metrics()
            st.markdown("---")

            self.render_fraud_alerts()

        except Exception as e:
            st.error(f"Error rendering dashboard: {e}")
            logger.error(f"Dashboard rendering error: {e}")

        # Footer
        st.markdown("---")
        st.markdown(
            "**Real-Time Data Pipeline Dashboard** | "
            f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        )

if __name__ == "__main__":
    dashboard = RealTimeDashboard()
    dashboard.render_dashboard()
python

Deployment Configuration

# kubernetes/data-pipeline.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: data-pipeline
---
# Kafka Cluster
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: data-cluster
  namespace: data-pipeline
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
    storage:
      type: persistent-claim
      size: 500Gi
      class: fast-ssd
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      class: fast-ssd
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
# Schema Registry
apiVersion: apps/v1
kind: Deployment
metadata:
  name: schema-registry
  namespace: data-pipeline
spec:
  replicas: 2
  selector:
    matchLabels:
      app: schema-registry
  template:
    metadata:
      labels:
        app: schema-registry
    spec:
      containers:
      - name: schema-registry
        image: confluentinc/cp-schema-registry:7.4.0
        env:
        - name: SCHEMA_REGISTRY_HOST_NAME
          value: "schema-registry"
        - name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
          value: "data-cluster-kafka-bootstrap:9092"
        - name: SCHEMA_REGISTRY_LISTENERS
          value: "http://0.0.0.0:8081"
        ports:
        - containerPort: 8081
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
---
# Flink Job Manager
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: data-pipeline
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: apache/flink:1.18-java11
        args: ["jobmanager"]
        env:
        - name: FLINK_PROPERTIES
          value: |
            jobmanager.rpc.address: flink-jobmanager
            taskmanager.numberOfTaskSlots: 4
            parallelism.default: 4
            jobmanager.memory.process.size: 2g
            taskmanager.memory.process.size: 4g
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
---
# Flink Task Manager
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: data-pipeline
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: apache/flink:1.18-java11
        args: ["taskmanager"]
        env:
        - name: FLINK_PROPERTIES
          value: |
            jobmanager.rpc.address: flink-jobmanager
            taskmanager.numberOfTaskSlots: 4
            taskmanager.memory.process.size: 4g
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        resources:
          requests:
            memory: "4Gi"
            cpu: "2000m"
          limits:
            memory: "8Gi"
            cpu: "4000m"
---
# Redis for Dashboard
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
  namespace: data-pipeline
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "200m"
        volumeMounts:
        - name: redis-data
          mountPath: /data
      volumes:
      - name: redis-data
        persistentVolumeClaim:
          claimName: redis-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: redis-pvc
  namespace: data-pipeline
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: fast-ssd
---
# Services
apiVersion: v1
kind: Service
metadata:
  name: schema-registry-service
  namespace: data-pipeline
spec:
  selector:
    app: schema-registry
  ports:
  - port: 8081
    targetPort: 8081
  type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: data-pipeline
spec:
  selector:
    app: flink
    component: jobmanager
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
  namespace: data-pipeline
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379
  type: ClusterIP
yaml

Conclusion

This comprehensive real-time data pipeline architecture provides enterprise-grade streaming analytics capabilities:

βœ… Key Achievements

  1. High-Throughput Ingestion: Apache Kafka handles millions of events per second
  2. Real-Time Processing: Apache Flink provides sub-second stream processing
  3. Scalable Storage: Apache Iceberg enables petabyte-scale data lake storage
  4. Live Analytics: Real-time dashboard with interactive visualizations
  5. Fault Tolerance: Built-in resilience and automatic recovery mechanisms
  6. Schema Evolution: Avro schema registry for backward compatibility

πŸš€ Business Benefits

  • Instant Insights: React to business events as they happen
  • Fraud Prevention: Real-time fraud detection and alerting
  • Operational Excellence: Live monitoring of system performance
  • Cost Efficiency: Pay only for resources you use with cloud-native architecture
  • Scalability: Linear scaling to handle any data volume

πŸ“ˆ Next Steps

  1. Machine Learning Integration: Add real-time ML model serving for predictions
  2. Advanced Analytics: Implement complex event processing and pattern detection
  3. Data Governance: Add comprehensive data lineage and quality monitoring
  4. Multi-Region Deployment: Scale across multiple geographic regions
  5. Cost Optimization: Implement intelligent data tiering and lifecycle management

This real-time data pipeline architecture transforms raw streaming data into actionable business insights, enabling data-driven decision making at the speed of business.


Ready to build your real-time data platform? Contact our data engineering team for a comprehensive data strategy consultation and implementation roadmap.

Published on 11/5/2024

Found this helpful? Share it with your network!

Share:πŸ¦πŸ’Ό

Yogesh Bhandari

Technology Visionary & Co-Founder

Building the future through cloud innovation, AI solutions, and open-source contributions.

CTO & Co-Founder☁️ Cloud ExpertπŸš€ AI Pioneer
Β© 2025 Yogesh Bhandari.Made with in Nepal

Empowering organizations through cloud transformation, AI innovation, and scalable solutions.

🌐 Global Remoteβ€’β˜οΈ Cloud-Firstβ€’πŸš€ Always Buildingβ€’πŸ€ Open to Collaborate