JSONL for Data Streaming

Stream millions of events per second with JSONL - the format built for real-time data pipelines, Kafka, and event-driven architectures

Why JSONL for Streaming?

Perfect for Streaming

JSONL's line-based format makes it ideal for streaming data. Each line is a complete event that can be processed independently, allowing for true streaming without waiting for entire payloads to arrive.

  • Process events as they arrive
  • No buffering required
  • Minimal latency
  • Fault-tolerant streaming
  • Easy to replay and reprocess

Streaming Platforms

Major streaming platforms use JSONL for event serialization:

  • Apache Kafka: JSONL event format
  • AWS Kinesis: JSON records per partition
  • Apache Pulsar: Schema-on-read with JSON
  • RabbitMQ: Message payload format
  • Redis Streams: Field-value pair serialization

Jump to Topic

Apache Kafka with JSONL

Kafka Producer (Python)

Produce JSON events to Kafka topics for real-time processing.

from kafka import KafkaProducer
import json
from datetime import datetime

# Create producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',  # Wait for all replicas
    compression_type='gzip',  # Compress messages
    max_in_flight_requests_per_connection=5,
    retries=3
)

# Send user events
def send_user_event(user_id, action, metadata=None):
    event = {
        'event_id': str(uuid.uuid4()),
        'user_id': user_id,
        'action': action,
        'timestamp': datetime.utcnow().isoformat(),
        'metadata': metadata or {}
    }

    # Send to topic with user_id as key for partitioning
    future = producer.send(
        'user-events',
        key=str(user_id),
        value=event
    )

    # Wait for confirmation
    try:
        record_metadata = future.get(timeout=10)
        print(f"Sent to partition {record_metadata.partition} at offset {record_metadata.offset}")
    except Exception as e:
        print(f"Failed to send event: {e}")

# Usage
send_user_event(12345, 'login', {'ip': '192.168.1.100', 'device': 'mobile'})
send_user_event(12345, 'view_product', {'product_id': 'prod_789', 'category': 'electronics'})
send_user_event(12345, 'add_to_cart', {'product_id': 'prod_789', 'quantity': 1})

# Flush and close
producer.flush()
producer.close()

Kafka Consumer (Python)

from kafka import KafkaConsumer
import json

# Create consumer
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='event-processor-group',
    auto_offset_reset='earliest',  # Start from beginning if no offset
    enable_auto_commit=True,
    max_poll_records=500
)

# Process events in real-time
for message in consumer:
    event = message.value

    print(f"Processing event: {event['action']} for user {event['user_id']}")
    print(f"Partition: {message.partition}, Offset: {message.offset}")

    # Process based on action type
    if event['action'] == 'purchase':
        process_purchase(event)
    elif event['action'] == 'add_to_cart':
        update_cart(event)
    elif event['action'] == 'view_product':
        track_analytics(event)

def process_purchase(event):
    # Send confirmation email
    # Update inventory
    # Record in database
    pass

def update_cart(event):
    # Update user's shopping cart
    pass

def track_analytics(event):
    # Send to analytics platform
    pass

Kafka Streams (Java)

Stream processing with stateful transformations and aggregations.

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class UserEventProcessor {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        ObjectMapper mapper = new ObjectMapper();

        // Read events stream
        KStream events = builder.stream("user-events");

        // Parse JSON and filter
        KStream parsedEvents = events
            .mapValues(value -> {
                try {
                    return mapper.readTree(value);
                } catch (Exception e) {
                    return null;
                }
            })
            .filter((key, value) -> value != null);

        // Count purchases by user (windowed aggregation)
        KTable, Long> purchaseCounts = parsedEvents
            .filter((key, value) -> value.get("action").asText().equals("purchase"))
            .groupBy((key, value) -> value.get("user_id").asText())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count();

        // Alert on high-value purchases
        parsedEvents
            .filter((key, value) ->
                value.get("action").asText().equals("purchase") &&
                value.get("amount").asDouble() > 1000.0
            )
            .to("high-value-purchases");

        // Calculate average cart value
        parsedEvents
            .filter((key, value) -> value.get("action").asText().equals("add_to_cart"))
            .groupByKey()
            .aggregate(
                () -> new CartAggregate(),
                (key, value, aggregate) -> aggregate.add(value),
                Materialized.as("cart-aggregates")
            );

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
    }
}

KafkaJS (Node.js)

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

// Producer
const producer = kafka.producer();
await producer.connect();

async function sendEvent(topic, key, event) {
  await producer.send({
    topic,
    messages: [
      {
        key,
        value: JSON.stringify(event),
        headers: {
          'correlation-id': event.correlationId,
        },
      },
    ],
  });
}

// Consumer
const consumer = kafka.consumer({ groupId: 'event-processors' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value.toString());

    console.log({
      partition,
      offset: message.offset,
      value: event,
    });

    // Process event
    await processEvent(event);
  },
});

async function processEvent(event) {
  switch (event.action) {
    case 'purchase':
      await handlePurchase(event);
      break;
    case 'refund':
      await handleRefund(event);
      break;
    default:
      console.log(`Unknown action: ${event.action}`);
  }
}

Kafka Connect JSONL Sink

Export Kafka topics to JSONL files for archival or batch processing.

{
  "name": "jsonl-file-sink",
  "config": {
    "connector.class": "FileStreamSink",
    "tasks.max": "1",
    "file": "/data/events.jsonl",
    "topics": "user-events",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

# Load connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @jsonl-sink-connector.json

# Result: Continuous JSONL export
# /data/events.jsonl contains:
{"event_id": "evt_001", "user_id": 123, "action": "login", "timestamp": "2025-01-15T10:00:00Z"}
{"event_id": "evt_002", "user_id": 124, "action": "view", "timestamp": "2025-01-15T10:00:05Z"}
{"event_id": "evt_003", "user_id": 123, "action": "purchase", "timestamp": "2025-01-15T10:01:00Z"}

AWS Kinesis Data Streams

Kinesis Producer

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')
stream_name = 'user-events-stream'

def put_record(data, partition_key):
    """Put single record to Kinesis"""
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(data),
        PartitionKey=partition_key
    )
    return response['SequenceNumber']

def put_records_batch(records):
    """Batch put up to 500 records"""
    kinesis_records = [
        {
            'Data': json.dumps(record['data']),
            'PartitionKey': record['partition_key']
        }
        for record in records
    ]

    response = kinesis.put_records(
        StreamName=stream_name,
        Records=kinesis_records
    )

    # Check for failures
    failed_count = response['FailedRecordCount']
    if failed_count > 0:
        print(f"Failed to put {failed_count} records")

# Send events
event = {
    'event_id': 'evt_12345',
    'user_id': 98765,
    'action': 'purchase',
    'product_id': 'prod_456',
    'amount': 99.99,
    'timestamp': datetime.utcnow().isoformat()
}

sequence_number = put_record(event, partition_key=str(event['user_id']))
print(f"Event sent with sequence number: {sequence_number}")

# Batch sending for better throughput
events = [
    {'data': {'action': 'login', 'user_id': i}, 'partition_key': str(i)}
    for i in range(100)
]
put_records_batch(events)

Kinesis Consumer

import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='us-east-1')
stream_name = 'user-events-stream'

def get_shard_iterator(shard_id, iterator_type='LATEST'):
    """Get iterator for a shard"""
    response = kinesis.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard_id,
        ShardIteratorType=iterator_type
    )
    return response['ShardIterator']

def process_records():
    """Process records from all shards"""
    # Get all shards
    response = kinesis.describe_stream(StreamName=stream_name)
    shards = response['StreamDescription']['Shards']

    for shard in shards:
        shard_id = shard['ShardId']
        shard_iterator = get_shard_iterator(shard_id)

        while True:
            response = kinesis.get_records(
                ShardIterator=shard_iterator,
                Limit=100
            )

            records = response['Records']
            for record in records:
                data = json.loads(record['Data'])
                print(f"Processing: {data}")

                # Process event
                handle_event(data)

            # Get next iterator
            shard_iterator = response['NextShardIterator']
            if not shard_iterator:
                break

            # Rate limiting
            time.sleep(1)

def handle_event(event):
    """Process individual event"""
    action = event.get('action')
    if action == 'purchase':
        process_purchase(event)
    elif action == 'refund':
        process_refund(event)

Kinesis Data Firehose

Automatically deliver streaming data to S3, Redshift, or Elasticsearch as JSONL.

import boto3
import json

firehose = boto3.client('firehose', region_name='us-east-1')
delivery_stream = 'events-to-s3'

def send_to_firehose(records):
    """Send records to Firehose for S3 delivery"""
    firehose_records = [
        {'Data': json.dumps(record) + '\n'}  # Firehose adds newlines
        for record in records
    ]

    response = firehose.put_record_batch(
        DeliveryStreamName=delivery_stream,
        Records=firehose_records
    )

    return response['FailedPutCount']

# Send events
events = [
    {'user_id': 123, 'action': 'login', 'timestamp': '2025-01-15T10:00:00Z'},
    {'user_id': 124, 'action': 'purchase', 'amount': 49.99, 'timestamp': '2025-01-15T10:01:00Z'}
]

failed = send_to_firehose(events)
print(f"Failed records: {failed}")

# Firehose Configuration (Terraform)
resource "aws_kinesis_firehose_delivery_stream" "events" {
  name        = "events-to-s3"
  destination = "s3"

  s3_configuration {
    role_arn   = aws_iam_role.firehose.arn
    bucket_arn = aws_s3_bucket.events.arn
    prefix     = "events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"

    # JSONL output format
    cloudwatch_logging_options {
      enabled = true
    }
  }
}

Change Data Capture (CDC)

Debezium CDC to Kafka

Stream database changes as JSONL events for real-time data synchronization.

# Debezium PostgreSQL Connector Configuration
{
  "name": "postgres-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres.example.com",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "myapp",
    "database.server.name": "myapp-db",
    "table.include.list": "public.users,public.orders",
    "plugin.name": "pgoutput",

    # JSON output format
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    # Kafka topic configuration
    "topic.prefix": "cdc",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}

# Example CDC event format
{
  "before": null,
  "after": {
    "id": 12345,
    "email": "[email protected]",
    "name": "John Doe",
    "created_at": "2025-01-15T10:00:00Z"
  },
  "source": {
    "version": "1.9.0",
    "connector": "postgresql",
    "name": "myapp-db",
    "ts_ms": 1705315200000,
    "db": "myapp",
    "schema": "public",
    "table": "users"
  },
  "op": "c",  // c=create, u=update, d=delete
  "ts_ms": 1705315200000
}

CDC Consumer

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'cdc.public.users',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='cdc-processor'
)

for message in consumer:
    event = message.value

    operation = event['op']
    table = event['source']['table']

    if operation == 'c':  # Create
        handle_insert(event['after'])
    elif operation == 'u':  # Update
        handle_update(event['before'], event['after'])
    elif operation == 'd':  # Delete
        handle_delete(event['before'])

def handle_insert(record):
    """Sync new record to search index, cache, etc."""
    print(f"New record: {record}")
    # Update Elasticsearch
    # Invalidate cache
    # Send to analytics

def handle_update(before, after):
    """Handle record updates"""
    print(f"Updated: {before} -> {after}")
    # Track field changes
    # Update downstream systems

def handle_delete(record):
    """Handle record deletions"""
    print(f"Deleted: {record}")
    # Remove from search index
    # Archive for compliance

CDC Use Cases

Database Replication

Real-time replication from PostgreSQL to MongoDB, keeping multiple databases in sync without custom code.

Search Index Sync

Automatically update Elasticsearch whenever database records change, ensuring search results are always current.

Cache Invalidation

Invalidate Redis cache entries when underlying data changes, preventing stale data issues.

Audit Logging

Capture all database changes for compliance, debugging, and analytics without application code changes.

Event Sourcing

Event Store with JSONL

Store immutable events as JSONL for complete system history and state reconstruction.

# Event Store Format
{"event_id": "evt_001", "aggregate_id": "order_12345", "event_type": "OrderCreated", "timestamp": "2025-01-15T10:00:00Z", "data": {"user_id": 98765, "items": [{"product_id": "prod_1", "quantity": 2}], "total": 99.99}, "version": 1}
{"event_id": "evt_002", "aggregate_id": "order_12345", "event_type": "OrderItemAdded", "timestamp": "2025-01-15T10:01:00Z", "data": {"product_id": "prod_2", "quantity": 1, "price": 49.99}, "version": 2}
{"event_id": "evt_003", "aggregate_id": "order_12345", "event_type": "OrderConfirmed", "timestamp": "2025-01-15T10:05:00Z", "data": {"confirmation_id": "conf_abc123", "payment_method": "credit_card"}, "version": 3}
{"event_id": "evt_004", "aggregate_id": "order_12345", "event_type": "OrderShipped", "timestamp": "2025-01-15T14:00:00Z", "data": {"tracking_number": "TRACK123", "carrier": "UPS"}, "version": 4}

# Python Event Store Implementation
import json
from datetime import datetime
from typing import List, Dict

class EventStore:
    def __init__(self, filepath='events.jsonl'):
        self.filepath = filepath

    def append_event(self, aggregate_id: str, event_type: str, data: Dict, version: int):
        """Append event to store"""
        event = {
            'event_id': str(uuid.uuid4()),
            'aggregate_id': aggregate_id,
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'data': data,
            'version': version
        }

        with open(self.filepath, 'a') as f:
            f.write(json.dumps(event) + '\n')

        return event['event_id']

    def get_events(self, aggregate_id: str) -> List[Dict]:
        """Get all events for an aggregate"""
        events = []
        with open(self.filepath, 'r') as f:
            for line in f:
                event = json.loads(line)
                if event['aggregate_id'] == aggregate_id:
                    events.append(event)
        return events

    def rebuild_state(self, aggregate_id: str) -> Dict:
        """Reconstruct current state from events"""
        events = self.get_events(aggregate_id)
        state = {}

        for event in events:
            state = self.apply_event(state, event)

        return state

    def apply_event(self, state: Dict, event: Dict) -> Dict:
        """Apply event to state"""
        event_type = event['event_type']
        data = event['data']

        if event_type == 'OrderCreated':
            state = {
                'order_id': event['aggregate_id'],
                'user_id': data['user_id'],
                'items': data['items'],
                'total': data['total'],
                'status': 'created'
            }
        elif event_type == 'OrderItemAdded':
            state['items'].append({
                'product_id': data['product_id'],
                'quantity': data['quantity']
            })
            state['total'] += data['price']
        elif event_type == 'OrderConfirmed':
            state['status'] = 'confirmed'
            state['confirmation_id'] = data['confirmation_id']
        elif event_type == 'OrderShipped':
            state['status'] = 'shipped'
            state['tracking_number'] = data['tracking_number']

        return state

# Usage
store = EventStore()

# Append events
store.append_event('order_12345', 'OrderCreated', {'user_id': 98765, 'items': [], 'total': 0}, 1)
store.append_event('order_12345', 'OrderItemAdded', {'product_id': 'prod_1', 'quantity': 2, 'price': 99.99}, 2)
store.append_event('order_12345', 'OrderConfirmed', {'confirmation_id': 'conf_123'}, 3)

# Rebuild current state
current_state = store.rebuild_state('order_12345')
print(current_state)

Snapshots for Performance

class EventStoreWithSnapshots(EventStore):
    def __init__(self, filepath='events.jsonl', snapshot_interval=100):
        super().__init__(filepath)
        self.snapshot_file = filepath.replace('.jsonl', '_snapshots.jsonl')
        self.snapshot_interval = snapshot_interval

    def get_latest_snapshot(self, aggregate_id: str):
        """Get most recent snapshot"""
        snapshots = []
        try:
            with open(self.snapshot_file, 'r') as f:
                for line in f:
                    snapshot = json.loads(line)
                    if snapshot['aggregate_id'] == aggregate_id:
                        snapshots.append(snapshot)
        except FileNotFoundError:
            return None

        return snapshots[-1] if snapshots else None

    def save_snapshot(self, aggregate_id: str, state: Dict, version: int):
        """Save state snapshot"""
        snapshot = {
            'aggregate_id': aggregate_id,
            'state': state,
            'version': version,
            'timestamp': datetime.utcnow().isoformat()
        }

        with open(self.snapshot_file, 'a') as f:
            f.write(json.dumps(snapshot) + '\n')

    def rebuild_state(self, aggregate_id: str) -> Dict:
        """Rebuild from latest snapshot + subsequent events"""
        snapshot = self.get_latest_snapshot(aggregate_id)

        if snapshot:
            state = snapshot['state']
            from_version = snapshot['version']
        else:
            state = {}
            from_version = 0

        # Apply events after snapshot
        events = self.get_events(aggregate_id)
        events_to_apply = [e for e in events if e['version'] > from_version]

        for event in events_to_apply:
            state = self.apply_event(state, event)

        # Create new snapshot if needed
        if len(events) % self.snapshot_interval == 0:
            self.save_snapshot(aggregate_id, state, len(events))

        return state

Real-Time Streaming to Clients

WebSocket JSONL Streaming

// Server (Node.js with ws)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  console.log('Client connected');

  // Stream events to client as JSONL
  const interval = setInterval(() => {
    const event = {
      type: 'stock_update',
      symbol: 'AAPL',
      price: (Math.random() * 200).toFixed(2),
      timestamp: new Date().toISOString()
    };

    // Send as JSON string (one per message)
    ws.send(JSON.stringify(event));
  }, 1000);

  ws.on('close', () => {
    clearInterval(interval);
    console.log('Client disconnected');
  });
});

// Client (Browser)
const ws = new WebSocket('ws://localhost:8080');

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Received:', data);

  // Update UI with real-time data
  updateStockPrice(data.symbol, data.price);
};

ws.onopen = () => {
  console.log('Connected to stream');
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

Server-Sent Events (SSE)

// Server (Express)
const express = require('express');
const app = express();

app.get('/events', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Send events every second
  const interval = setInterval(() => {
    const event = {
      id: Date.now(),
      type: 'update',
      data: {
        metric: 'active_users',
        value: Math.floor(Math.random() * 1000)
      }
    };

    // SSE format: data: JSON\n\n
    res.write(`data: ${JSON.stringify(event)}\n\n`);
  }, 1000);

  req.on('close', () => {
    clearInterval(interval);
  });
});

app.listen(3000);

// Client (Browser)
const eventSource = new EventSource('/events');

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Event received:', data);

  // Update dashboard
  updateMetric(data.data.metric, data.data.value);
};

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  eventSource.close();
};

Stream Processing Frameworks

Apache Flink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.json4s._
import org.json4s.jackson.JsonMethods._

object EventProcessor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Read from Kafka
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "flink-consumer")

    val consumer = new FlinkKafkaConsumer[String](
      "user-events",
      new SimpleStringSchema(),
      properties
    )

    val stream = env.addSource(consumer)

    // Parse JSON events
    val events = stream
      .map(json => parse(json))
      .filter(event => (event \ "action").extract[String] == "purchase")

    // Window aggregation
    val revenue = events
      .keyBy(event => (event \ "user_id").extract[Int])
      .timeWindow(Time.minutes(5))
      .aggregate(new RevenueAggregator)

    // Alert on anomalies
    revenue
      .filter(_.total > 10000)
      .addSink(new AlertSink)

    env.execute("Event Processor")
  }
}

Apache Spark Structured Streaming

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("EventProcessor").getOrCreate()

# Read from Kafka as streaming DataFrame
events_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .load()

# Parse JSON
schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("action", StringType()),
    StructField("amount", DoubleType()),
    StructField("timestamp", StringType())
])

parsed_df = events_df \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

# Aggregate by window
windowed_revenue = parsed_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("user_id")
    ) \
    .agg(
        sum("amount").alias("total_amount"),
        count("*").alias("purchase_count")
    )

# Write to console
query = windowed_revenue \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

Best Practices

Event Design

  • Include unique event ID and timestamp in every event
  • Use clear, descriptive event types (UserLoggedIn vs Action1)
  • Include all context needed to process event independently
  • Use consistent field names across all events
  • Version your event schemas for backward compatibility

Performance

  • Batch events when possible (Kafka batch size, Kinesis PutRecords)
  • Enable compression (gzip, snappy, lz4) for network efficiency
  • Use appropriate partition keys for even load distribution
  • Monitor consumer lag and scale accordingly
  • Implement backpressure handling for slow consumers

Reliability

  • Implement idempotent consumers to handle duplicate events
  • Use at-least-once delivery with deduplication
  • Set up dead letter queues for failed events
  • Implement retry logic with exponential backoff
  • Monitor and alert on processing failures

Data Management

  • Define retention policies for stream data
  • Archive events to cold storage (S3, Glacier) for compliance
  • Implement data quality checks on ingestion
  • Use schema registry for event validation
  • Document event formats and maintain changelog

Related Resources