Building Real-Time Analytics with Kafka, Flink, and ClickHouse
Architecture and implementation of streaming data pipelines for real-time analytics, handling millions of events per second with sub-second latency.
Introduction: The Real-Time Analytics Challenge
Modern applications generate millions of events per second - user clicks, sensor data, financial transactions, IoT telemetry. Traditional batch processing (running jobs nightly or hourly) is too slow for use cases that demand sub-second insights: fraud detection, recommendation engines, operational dashboards, and anomaly detection.
Real-time analytics streaming architectures enable processing data as it arrives, providing immediate insights and triggering automated actions. This post covers production-ready streaming architectures using Apache Kafka, Apache Flink, and cloud-native services.
- •Fraud Detection: Analyze transactions in real-time to block suspicious activity
- •Personalization: Update recommendations instantly based on user behavior
- •Operational Monitoring: Dashboard metrics with <1 second latency
- •IoT Analytics: Process sensor data from thousands of devices
- •Log Analytics: Real-time error detection and alerting
- 1.Stream Ingestion: Kafka, Kinesis, Pub/Sub
- 2.Stream Processing: Flink, Spark Streaming, Kafka Streams
- 3.State Management: RocksDB, Redis, DynamoDB
- 4.Data Sink: Elasticsearch, ClickHouse, TimescaleDB, S3
- 5.Monitoring: Prometheus, Grafana, Datadog
Architecture Overview
Stream Processing with Apache Flink
Why Flink for Streaming?
Flink provides true event-time processing, exactly-once semantics, and low-latency state management - making it ideal for production real-time analytics.
- 1.Event Time Processing: Handle late-arriving data correctly
- 2.Stateful Processing: Maintain aggregations, windows, joins
- 3.Exactly-Once Semantics: No duplicate processing
- 4.High Throughput: Process millions of events/second
- 5.Fault Tolerance: Automatic recovery from failures
Common Streaming Patterns:
1. Windowed Aggregations: Count/sum events over time windows
2. Stream Enrichment: Join streaming data with reference tables
3. Pattern Detection: Identify sequences (e.g., user journey analysis)
4. Anomaly Detection: ML models on streaming data
5. Stream-to-Stream Joins: Correlate events from multiple sources
# Flink SQL for Real-Time Analytics
# Example: Real-time user activity dashboard
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Tumble
# Setup Flink environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Configure Kafka source
table_env.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
event_timestamp BIGINT,
page_url STRING,
session_id STRING,
device_type STRING,
country STRING,
event_time AS TO_TIMESTAMP(FROM_UNIXTIME(event_timestamp)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-analytics',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
""")
# Real-time aggregation: Events per minute by device type
table_env.execute_sql("""
CREATE TABLE events_per_minute AS
SELECT
window_start,
window_end,
device_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users,
COUNT(DISTINCT session_id) as sessions
FROM TABLE(
TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, device_type
""")
# Stream enrichment: Join with user profile data
table_env.execute_sql("""
CREATE TABLE user_profiles (
user_id STRING,
plan_type STRING,
signup_date STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/users',
'table-name' = 'user_profiles',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '1 hour'
)
""")
# Enriched event stream
enriched_events = table_env.sql_query("""
SELECT
e.user_id,
e.event_type,
e.event_time,
e.page_url,
e.device_type,
p.plan_type,
p.signup_date
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.event_time AS p
ON e.user_id = p.user_id
""")
# Anomaly detection: Users with >100 events in 5 minutes
anomalies = table_env.sql_query("""
SELECT
user_id,
window_start,
COUNT(*) as event_count
FROM TABLE(
TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, user_id
HAVING COUNT(*) > 100
""")
# Write to Elasticsearch for real-time dashboards
table_env.execute_sql("""
CREATE TABLE es_events_per_minute (
window_start TIMESTAMP(3),
device_type STRING,
event_count BIGINT,
unique_users BIGINT,
sessions BIGINT,
PRIMARY KEY (window_start, device_type) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'events-per-minute',
'document-id.key-delimiter' = '-',
'sink.flush-on-checkpoint' = 'true',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.interval' = '1s',
'format' = 'json'
)
""")
# Insert aggregated data into Elasticsearch
table_env.execute_sql("""
INSERT INTO es_events_per_minute
SELECT * FROM events_per_minute
""")
# Write anomalies to Kafka for alerting
table_env.execute_sql("""
CREATE TABLE kafka_anomalies (
user_id STRING,
window_start TIMESTAMP(3),
event_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'user-anomalies',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
table_env.execute_sql("""
INSERT INTO kafka_anomalies
SELECT * FROM anomalies
""")
# Execute all jobs
table_env.execute("Real-Time Analytics Pipeline")
State Management and Windowing
Stateful Stream Processing:
- •Running aggregations (count, sum, avg over time windows)
- •Session tracking (user activity within a session)
- •Pattern detection (sequence of events)
- •Stream joins (correlating events from multiple streams)
Windowing Strategies:
- •Use case: Metrics every 1 minute
- •Example: "Events per minute by device type"
- •Use case: Moving averages, trend detection
- •Example: "Average response time over last 5 minutes, updated every 30 seconds"
- •Use case: User session analytics
- •Example: "User activity within a session (15-min inactivity timeout)"
- •Use case: Custom aggregation logic
- •Example: "Alert after 10 failed login attempts"
State Backend Options:
- •Disk-based, scales to terabytes
- •Supports incremental checkpoints
- •Slightly higher latency than in-memory
- •Faster access (<1ms)
- •Limited by JVM memory
- •Use for small state (<1GB)
- •Auto-cleanup old state
- •Reduces memory footprint
- •Example: Keep only last 7 days of user activity
Production Considerations
Scalability:
- •Increase Kafka partitions and Flink parallelism together
- •Rule of thumb: 1 Flink task per Kafka partition
- •Example: 12 partitions = 12 Flink parallel instances
- •Monitor task queue sizes
- •Add more Flink task managers if backpressure detected
- •Consider sampling if processing can't keep up
- •CPU-bound: Increase parallelism
- •Memory-bound: Increase heap size or use RocksDB
- •Network-bound: Optimize serialization (Avro, Protobuf)
Fault Tolerance:
- •Frequency: Every 1-5 minutes (trade-off: recovery time vs overhead)
- •Incremental checkpoints for large state (RocksDB)
- •Store checkpoints in S3/HDFS (durable storage)
- •Requires idempotent sinks or transactional writes
- •Kafka sink: Use transactional producer
- •Database sink: Use upserts with event ID
Monitoring & Alerting:
- •Throughput: Events/second processed
- •Latency: End-to-end processing time (p50, p95, p99)
- •Backpressure: Task buffer utilization
- •Checkpoint Duration: Time to complete checkpoint
- •State Size: Growth over time
- •Backpressure > 80% for 5 minutes
- •Checkpoint failure
- •Job restart
- •Latency p99 > 5 seconds
- •Consumer lag > 1 million messages
Cost Optimization:
- •Use spot instances for non-critical jobs (50-70% savings)
- •Right-size task managers (4-8 cores typical)
- •Auto-scale based on consumer lag
- •Compress checkpoints (Snappy, ZSTD)
- •Use S3 Intelligent-Tiering for old checkpoints
- •Clean up old savepoints
- •Co-locate Kafka and Flink in same AZ
- •Use compression for Kafka messages
- •Batch small messages
- •Traffic: 1M events/second
- •Infrastructure: 10 Kafka brokers, 20 Flink task managers
- •Latency: p99 < 500ms
- •Cost: $15K/month (AWS)
- •Availability: 99.9%
Related Articles
Optimizing Apache Spark on AWS EMR for Petabyte-Scale Data Processing
Deep dive into performance tuning Spark clusters on EMR, memory management, partitioning strategies, and cost reduction techniques for processing massive datasets.
Data EngineeringBuilding a Serverless Data Lake on AWS: S3, Athena, and Glue
Complete guide to architecting a cost-effective, scalable data lake using AWS services with automated ETL pipelines and real-time analytics capabilities.