Stream millions of events per second with JSONL - the format built for real-time data pipelines, Kafka, and event-driven architectures
JSONL's line-based format makes it ideal for streaming data. Each line is a complete event that can be processed independently, allowing for true streaming without waiting for entire payloads to arrive.
Major streaming platforms use JSONL for event serialization:
Produce JSON events to Kafka topics for real-time processing.
from kafka import KafkaProducer
import json
from datetime import datetime
# Create producer with JSON serialization
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
compression_type='gzip', # Compress messages
max_in_flight_requests_per_connection=5,
retries=3
)
# Send user events
def send_user_event(user_id, action, metadata=None):
event = {
'event_id': str(uuid.uuid4()),
'user_id': user_id,
'action': action,
'timestamp': datetime.utcnow().isoformat(),
'metadata': metadata or {}
}
# Send to topic with user_id as key for partitioning
future = producer.send(
'user-events',
key=str(user_id),
value=event
)
# Wait for confirmation
try:
record_metadata = future.get(timeout=10)
print(f"Sent to partition {record_metadata.partition} at offset {record_metadata.offset}")
except Exception as e:
print(f"Failed to send event: {e}")
# Usage
send_user_event(12345, 'login', {'ip': '192.168.1.100', 'device': 'mobile'})
send_user_event(12345, 'view_product', {'product_id': 'prod_789', 'category': 'electronics'})
send_user_event(12345, 'add_to_cart', {'product_id': 'prod_789', 'quantity': 1})
# Flush and close
producer.flush()
producer.close()
from kafka import KafkaConsumer
import json
# Create consumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='event-processor-group',
auto_offset_reset='earliest', # Start from beginning if no offset
enable_auto_commit=True,
max_poll_records=500
)
# Process events in real-time
for message in consumer:
event = message.value
print(f"Processing event: {event['action']} for user {event['user_id']}")
print(f"Partition: {message.partition}, Offset: {message.offset}")
# Process based on action type
if event['action'] == 'purchase':
process_purchase(event)
elif event['action'] == 'add_to_cart':
update_cart(event)
elif event['action'] == 'view_product':
track_analytics(event)
def process_purchase(event):
# Send confirmation email
# Update inventory
# Record in database
pass
def update_cart(event):
# Update user's shopping cart
pass
def track_analytics(event):
# Send to analytics platform
pass
Stream processing with stateful transformations and aggregations.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class UserEventProcessor {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
ObjectMapper mapper = new ObjectMapper();
// Read events stream
KStream events = builder.stream("user-events");
// Parse JSON and filter
KStream parsedEvents = events
.mapValues(value -> {
try {
return mapper.readTree(value);
} catch (Exception e) {
return null;
}
})
.filter((key, value) -> value != null);
// Count purchases by user (windowed aggregation)
KTable, Long> purchaseCounts = parsedEvents
.filter((key, value) -> value.get("action").asText().equals("purchase"))
.groupBy((key, value) -> value.get("user_id").asText())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Alert on high-value purchases
parsedEvents
.filter((key, value) ->
value.get("action").asText().equals("purchase") &&
value.get("amount").asDouble() > 1000.0
)
.to("high-value-purchases");
// Calculate average cart value
parsedEvents
.filter((key, value) -> value.get("action").asText().equals("add_to_cart"))
.groupByKey()
.aggregate(
() -> new CartAggregate(),
(key, value, aggregate) -> aggregate.add(value),
Materialized.as("cart-aggregates")
);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
// Producer
const producer = kafka.producer();
await producer.connect();
async function sendEvent(topic, key, event) {
await producer.send({
topic,
messages: [
{
key,
value: JSON.stringify(event),
headers: {
'correlation-id': event.correlationId,
},
},
],
});
}
// Consumer
const consumer = kafka.consumer({ groupId: 'event-processors' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
console.log({
partition,
offset: message.offset,
value: event,
});
// Process event
await processEvent(event);
},
});
async function processEvent(event) {
switch (event.action) {
case 'purchase':
await handlePurchase(event);
break;
case 'refund':
await handleRefund(event);
break;
default:
console.log(`Unknown action: ${event.action}`);
}
}
Export Kafka topics to JSONL files for archival or batch processing.
{
"name": "jsonl-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "/data/events.jsonl",
"topics": "user-events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
# Load connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jsonl-sink-connector.json
# Result: Continuous JSONL export
# /data/events.jsonl contains:
{"event_id": "evt_001", "user_id": 123, "action": "login", "timestamp": "2025-01-15T10:00:00Z"}
{"event_id": "evt_002", "user_id": 124, "action": "view", "timestamp": "2025-01-15T10:00:05Z"}
{"event_id": "evt_003", "user_id": 123, "action": "purchase", "timestamp": "2025-01-15T10:01:00Z"}
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
stream_name = 'user-events-stream'
def put_record(data, partition_key):
"""Put single record to Kinesis"""
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key
)
return response['SequenceNumber']
def put_records_batch(records):
"""Batch put up to 500 records"""
kinesis_records = [
{
'Data': json.dumps(record['data']),
'PartitionKey': record['partition_key']
}
for record in records
]
response = kinesis.put_records(
StreamName=stream_name,
Records=kinesis_records
)
# Check for failures
failed_count = response['FailedRecordCount']
if failed_count > 0:
print(f"Failed to put {failed_count} records")
# Send events
event = {
'event_id': 'evt_12345',
'user_id': 98765,
'action': 'purchase',
'product_id': 'prod_456',
'amount': 99.99,
'timestamp': datetime.utcnow().isoformat()
}
sequence_number = put_record(event, partition_key=str(event['user_id']))
print(f"Event sent with sequence number: {sequence_number}")
# Batch sending for better throughput
events = [
{'data': {'action': 'login', 'user_id': i}, 'partition_key': str(i)}
for i in range(100)
]
put_records_batch(events)
import boto3
import json
import time
kinesis = boto3.client('kinesis', region_name='us-east-1')
stream_name = 'user-events-stream'
def get_shard_iterator(shard_id, iterator_type='LATEST'):
"""Get iterator for a shard"""
response = kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType=iterator_type
)
return response['ShardIterator']
def process_records():
"""Process records from all shards"""
# Get all shards
response = kinesis.describe_stream(StreamName=stream_name)
shards = response['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
shard_iterator = get_shard_iterator(shard_id)
while True:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
records = response['Records']
for record in records:
data = json.loads(record['Data'])
print(f"Processing: {data}")
# Process event
handle_event(data)
# Get next iterator
shard_iterator = response['NextShardIterator']
if not shard_iterator:
break
# Rate limiting
time.sleep(1)
def handle_event(event):
"""Process individual event"""
action = event.get('action')
if action == 'purchase':
process_purchase(event)
elif action == 'refund':
process_refund(event)
Automatically deliver streaming data to S3, Redshift, or Elasticsearch as JSONL.
import boto3
import json
firehose = boto3.client('firehose', region_name='us-east-1')
delivery_stream = 'events-to-s3'
def send_to_firehose(records):
"""Send records to Firehose for S3 delivery"""
firehose_records = [
{'Data': json.dumps(record) + '\n'} # Firehose adds newlines
for record in records
]
response = firehose.put_record_batch(
DeliveryStreamName=delivery_stream,
Records=firehose_records
)
return response['FailedPutCount']
# Send events
events = [
{'user_id': 123, 'action': 'login', 'timestamp': '2025-01-15T10:00:00Z'},
{'user_id': 124, 'action': 'purchase', 'amount': 49.99, 'timestamp': '2025-01-15T10:01:00Z'}
]
failed = send_to_firehose(events)
print(f"Failed records: {failed}")
# Firehose Configuration (Terraform)
resource "aws_kinesis_firehose_delivery_stream" "events" {
name = "events-to-s3"
destination = "s3"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.events.arn
prefix = "events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
# JSONL output format
cloudwatch_logging_options {
enabled = true
}
}
}
Stream database changes as JSONL events for real-time data synchronization.
# Debezium PostgreSQL Connector Configuration
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.example.com",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "myapp",
"database.server.name": "myapp-db",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput",
# JSON output format
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
# Kafka topic configuration
"topic.prefix": "cdc",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
# Example CDC event format
{
"before": null,
"after": {
"id": 12345,
"email": "[email protected]",
"name": "John Doe",
"created_at": "2025-01-15T10:00:00Z"
},
"source": {
"version": "1.9.0",
"connector": "postgresql",
"name": "myapp-db",
"ts_ms": 1705315200000,
"db": "myapp",
"schema": "public",
"table": "users"
},
"op": "c", // c=create, u=update, d=delete
"ts_ms": 1705315200000
}
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'cdc.public.users',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='cdc-processor'
)
for message in consumer:
event = message.value
operation = event['op']
table = event['source']['table']
if operation == 'c': # Create
handle_insert(event['after'])
elif operation == 'u': # Update
handle_update(event['before'], event['after'])
elif operation == 'd': # Delete
handle_delete(event['before'])
def handle_insert(record):
"""Sync new record to search index, cache, etc."""
print(f"New record: {record}")
# Update Elasticsearch
# Invalidate cache
# Send to analytics
def handle_update(before, after):
"""Handle record updates"""
print(f"Updated: {before} -> {after}")
# Track field changes
# Update downstream systems
def handle_delete(record):
"""Handle record deletions"""
print(f"Deleted: {record}")
# Remove from search index
# Archive for compliance
Real-time replication from PostgreSQL to MongoDB, keeping multiple databases in sync without custom code.
Automatically update Elasticsearch whenever database records change, ensuring search results are always current.
Invalidate Redis cache entries when underlying data changes, preventing stale data issues.
Capture all database changes for compliance, debugging, and analytics without application code changes.
Store immutable events as JSONL for complete system history and state reconstruction.
# Event Store Format
{"event_id": "evt_001", "aggregate_id": "order_12345", "event_type": "OrderCreated", "timestamp": "2025-01-15T10:00:00Z", "data": {"user_id": 98765, "items": [{"product_id": "prod_1", "quantity": 2}], "total": 99.99}, "version": 1}
{"event_id": "evt_002", "aggregate_id": "order_12345", "event_type": "OrderItemAdded", "timestamp": "2025-01-15T10:01:00Z", "data": {"product_id": "prod_2", "quantity": 1, "price": 49.99}, "version": 2}
{"event_id": "evt_003", "aggregate_id": "order_12345", "event_type": "OrderConfirmed", "timestamp": "2025-01-15T10:05:00Z", "data": {"confirmation_id": "conf_abc123", "payment_method": "credit_card"}, "version": 3}
{"event_id": "evt_004", "aggregate_id": "order_12345", "event_type": "OrderShipped", "timestamp": "2025-01-15T14:00:00Z", "data": {"tracking_number": "TRACK123", "carrier": "UPS"}, "version": 4}
# Python Event Store Implementation
import json
from datetime import datetime
from typing import List, Dict
class EventStore:
def __init__(self, filepath='events.jsonl'):
self.filepath = filepath
def append_event(self, aggregate_id: str, event_type: str, data: Dict, version: int):
"""Append event to store"""
event = {
'event_id': str(uuid.uuid4()),
'aggregate_id': aggregate_id,
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'data': data,
'version': version
}
with open(self.filepath, 'a') as f:
f.write(json.dumps(event) + '\n')
return event['event_id']
def get_events(self, aggregate_id: str) -> List[Dict]:
"""Get all events for an aggregate"""
events = []
with open(self.filepath, 'r') as f:
for line in f:
event = json.loads(line)
if event['aggregate_id'] == aggregate_id:
events.append(event)
return events
def rebuild_state(self, aggregate_id: str) -> Dict:
"""Reconstruct current state from events"""
events = self.get_events(aggregate_id)
state = {}
for event in events:
state = self.apply_event(state, event)
return state
def apply_event(self, state: Dict, event: Dict) -> Dict:
"""Apply event to state"""
event_type = event['event_type']
data = event['data']
if event_type == 'OrderCreated':
state = {
'order_id': event['aggregate_id'],
'user_id': data['user_id'],
'items': data['items'],
'total': data['total'],
'status': 'created'
}
elif event_type == 'OrderItemAdded':
state['items'].append({
'product_id': data['product_id'],
'quantity': data['quantity']
})
state['total'] += data['price']
elif event_type == 'OrderConfirmed':
state['status'] = 'confirmed'
state['confirmation_id'] = data['confirmation_id']
elif event_type == 'OrderShipped':
state['status'] = 'shipped'
state['tracking_number'] = data['tracking_number']
return state
# Usage
store = EventStore()
# Append events
store.append_event('order_12345', 'OrderCreated', {'user_id': 98765, 'items': [], 'total': 0}, 1)
store.append_event('order_12345', 'OrderItemAdded', {'product_id': 'prod_1', 'quantity': 2, 'price': 99.99}, 2)
store.append_event('order_12345', 'OrderConfirmed', {'confirmation_id': 'conf_123'}, 3)
# Rebuild current state
current_state = store.rebuild_state('order_12345')
print(current_state)
class EventStoreWithSnapshots(EventStore):
def __init__(self, filepath='events.jsonl', snapshot_interval=100):
super().__init__(filepath)
self.snapshot_file = filepath.replace('.jsonl', '_snapshots.jsonl')
self.snapshot_interval = snapshot_interval
def get_latest_snapshot(self, aggregate_id: str):
"""Get most recent snapshot"""
snapshots = []
try:
with open(self.snapshot_file, 'r') as f:
for line in f:
snapshot = json.loads(line)
if snapshot['aggregate_id'] == aggregate_id:
snapshots.append(snapshot)
except FileNotFoundError:
return None
return snapshots[-1] if snapshots else None
def save_snapshot(self, aggregate_id: str, state: Dict, version: int):
"""Save state snapshot"""
snapshot = {
'aggregate_id': aggregate_id,
'state': state,
'version': version,
'timestamp': datetime.utcnow().isoformat()
}
with open(self.snapshot_file, 'a') as f:
f.write(json.dumps(snapshot) + '\n')
def rebuild_state(self, aggregate_id: str) -> Dict:
"""Rebuild from latest snapshot + subsequent events"""
snapshot = self.get_latest_snapshot(aggregate_id)
if snapshot:
state = snapshot['state']
from_version = snapshot['version']
else:
state = {}
from_version = 0
# Apply events after snapshot
events = self.get_events(aggregate_id)
events_to_apply = [e for e in events if e['version'] > from_version]
for event in events_to_apply:
state = self.apply_event(state, event)
# Create new snapshot if needed
if len(events) % self.snapshot_interval == 0:
self.save_snapshot(aggregate_id, state, len(events))
return state
// Server (Node.js with ws)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
// Stream events to client as JSONL
const interval = setInterval(() => {
const event = {
type: 'stock_update',
symbol: 'AAPL',
price: (Math.random() * 200).toFixed(2),
timestamp: new Date().toISOString()
};
// Send as JSON string (one per message)
ws.send(JSON.stringify(event));
}, 1000);
ws.on('close', () => {
clearInterval(interval);
console.log('Client disconnected');
});
});
// Client (Browser)
const ws = new WebSocket('ws://localhost:8080');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
// Update UI with real-time data
updateStockPrice(data.symbol, data.price);
};
ws.onopen = () => {
console.log('Connected to stream');
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
// Server (Express)
const express = require('express');
const app = express();
app.get('/events', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Send events every second
const interval = setInterval(() => {
const event = {
id: Date.now(),
type: 'update',
data: {
metric: 'active_users',
value: Math.floor(Math.random() * 1000)
}
};
// SSE format: data: JSON\n\n
res.write(`data: ${JSON.stringify(event)}\n\n`);
}, 1000);
req.on('close', () => {
clearInterval(interval);
});
});
app.listen(3000);
// Client (Browser)
const eventSource = new EventSource('/events');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Event received:', data);
// Update dashboard
updateMetric(data.data.metric, data.data.value);
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.json4s._
import org.json4s.jackson.JsonMethods._
object EventProcessor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Read from Kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-consumer")
val consumer = new FlinkKafkaConsumer[String](
"user-events",
new SimpleStringSchema(),
properties
)
val stream = env.addSource(consumer)
// Parse JSON events
val events = stream
.map(json => parse(json))
.filter(event => (event \ "action").extract[String] == "purchase")
// Window aggregation
val revenue = events
.keyBy(event => (event \ "user_id").extract[Int])
.timeWindow(Time.minutes(5))
.aggregate(new RevenueAggregator)
// Alert on anomalies
revenue
.filter(_.total > 10000)
.addSink(new AlertSink)
env.execute("Event Processor")
}
}
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("EventProcessor").getOrCreate()
# Read from Kafka as streaming DataFrame
events_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.load()
# Parse JSON
schema = StructType([
StructField("user_id", IntegerType()),
StructField("action", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", StringType())
])
parsed_df = events_df \
.selectExpr("CAST(value AS STRING) as json") \
.select(from_json(col("json"), schema).alias("data")) \
.select("data.*")
# Aggregate by window
windowed_revenue = parsed_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("user_id")
) \
.agg(
sum("amount").alias("total_amount"),
count("*").alias("purchase_count")
)
# Write to console
query = windowed_revenue \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()