JSONL for Log Processing

Why structured logging with JSONL has become the industry standard for modern application monitoring and observability

Why JSONL for Logging?

Structured vs Unstructured

Traditional plain-text logs are hard to parse and query. JSONL provides structure while maintaining the simplicity of line-based logging - perfect for real-time streaming and analysis.

Plain Text:

2025-01-15 10:30:45 ERROR Database connection failed

JSONL:

{"timestamp":"2025-01-15T10:30:45Z","level":"error","msg":"Database connection failed","db":"users","host":"prod-01"}

Industry Adoption

Major logging platforms and tools have standardized on JSONL:

  • Elasticsearch: Native JSONL ingestion
  • Splunk: JSON event format support
  • Datadog: JSON structured logs
  • Fluentd/Logstash: JSONL as default format
  • CloudWatch: Structured logging support

Jump to Topic

Application Logging

Node.js with Winston

Winston is the most popular logging library for Node.js with built-in JSONL support.

const winston = require('winston');

// Configure Winston for JSONL output
const logger = winston.createLogger({
  level: 'info',
  format: winston.format.json(),
  transports: [
    // Write all logs to application.jsonl
    new winston.transports.File({
      filename: 'application.jsonl',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      )
    }),
    // Also log to console for development
    new winston.transports.Console({
      format: winston.format.simple()
    })
  ]
});

// Add request context middleware (Express)
const addContext = (req, res, next) => {
  req.logger = logger.child({
    request_id: req.id,
    user_id: req.user?.id,
    ip: req.ip,
    user_agent: req.get('user-agent')
  });
  next();
};

// Log examples
logger.info('Server started', { port: 3000, env: 'production' });

logger.error('Database query failed', {
  query: 'SELECT * FROM users',
  error: err.message,
  stack: err.stack,
  duration_ms: 1250
});

logger.warn('High memory usage', {
  memory_used_mb: 512,
  memory_limit_mb: 1024,
  threshold_percent: 80
});

// With request context
app.get('/api/users', addContext, async (req, res) => {
  req.logger.info('Fetching users', {
    page: req.query.page,
    limit: req.query.limit
  });
  // ... handler code
});

Pro Tip: Use child loggers to automatically include context like request IDs in all logs within a request.

Python with Structlog

Structlog makes structured logging in Python simple and performant.

import structlog
import logging
import sys

# Configure structlog for JSONL output
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

# Create logger
logger = structlog.get_logger()

# Basic logging
logger.info("server_started", port=8000, workers=4)

logger.error(
    "database_connection_failed",
    host="db.example.com",
    port=5432,
    retry_count=3,
    exc_info=True
)

# Add context for multiple log entries
log_with_context = logger.bind(
    user_id=12345,
    request_id="req_abc123",
    session_id="sess_xyz789"
)

log_with_context.info("user_action", action="login", ip="192.168.1.100")
log_with_context.info("user_action", action="view_profile")

# Django/Flask middleware example
class LoggingMiddleware:
    def __init__(self, app):
        self.app = app

    def __call__(self, environ, start_response):
        request_id = environ.get('HTTP_X_REQUEST_ID', 'unknown')
        logger = structlog.get_logger().bind(
            request_id=request_id,
            path=environ.get('PATH_INFO'),
            method=environ.get('REQUEST_METHOD')
        )

        logger.info("request_started")
        try:
            return self.app(environ, start_response)
        finally:
            logger.info("request_completed")

Go with Zerolog

Zerolog is a zero-allocation JSON logger for Go with excellent performance.

package main

import (
    "os"
    "github.com/rs/zerolog"
    "github.com/rs/zerolog/log"
)

func main() {
    // Configure logger for JSONL file output
    file, err := os.OpenFile(
        "application.jsonl",
        os.O_APPEND|os.O_CREATE|os.O_WRONLY,
        0644,
    )
    if err != nil {
        panic(err)
    }
    defer file.Close()

    log.Logger = zerolog.New(file).With().Timestamp().Caller().Logger()

    // Basic logging
    log.Info().
        Str("event", "server_start").
        Int("port", 8080).
        Str("env", "production").
        Msg("Server starting")

    // Error logging with context
    err = connectDatabase()
    if err != nil {
        log.Error().
            Err(err).
            Str("host", "db.example.com").
            Int("port", 5432).
            Int("retry", 3).
            Msg("Database connection failed")
    }

    // HTTP middleware example
    func loggingMiddleware(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()

            // Create logger with request context
            logger := log.With().
                Str("request_id", r.Header.Get("X-Request-ID")).
                Str("method", r.Method).
                Str("path", r.URL.Path).
                Str("remote_addr", r.RemoteAddr).
                Logger()

            logger.Info().Msg("request_started")

            // Wrap response writer to capture status
            wrapped := &statusWriter{ResponseWriter: w}
            next.ServeHTTP(wrapped, r)

            logger.Info().
                Int("status", wrapped.status).
                Dur("duration_ms", time.Since(start)).
                Msg("request_completed")
        })
    }
}

Java with Logback

Configure Logback for JSON structured logging in Spring Boot applications.

<!-- logback-spring.xml -->
<configuration>
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>application.jsonl</file>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder">
            <includeContext>false</includeContext>
            <timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSS'Z'</timestampPattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="FILE" />
    </root>
</configuration>
// Java application code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class UserService {
    private static final Logger logger = LoggerFactory.getLogger(UserService.class);

    public User getUser(Long userId) {
        // Add context to MDC (Mapped Diagnostic Context)
        MDC.put("user_id", userId.toString());
        MDC.put("operation", "get_user");

        try {
            logger.info("Fetching user from database");
            User user = userRepository.findById(userId);

            if (user == null) {
                logger.warn("User not found");
                return null;
            }

            logger.info("User retrieved successfully");
            return user;
        } catch (Exception e) {
            logger.error("Error fetching user", e);
            throw e;
        } finally {
            MDC.clear();
        }
    }
}

Rust with Tracing

use tracing::{info, error, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn main() {
    // Configure JSON formatter
    let file = std::fs::File::create("application.jsonl")
        .expect("Failed to create log file");

    tracing_subscriber::registry()
        .with(
            tracing_subscriber::fmt::layer()
                .json()
                .with_writer(std::sync::Arc::new(file))
        )
        .init();

    info!(port = 8080, env = "production", "Server started");

    // Structured logging with context
    let user_id = 12345;
    let request_id = "req_abc123";

    info!(
        user_id = user_id,
        request_id = request_id,
        action = "login",
        "User logged in"
    );

    // Error logging
    if let Err(e) = connect_database() {
        error!(
            error = %e,
            host = "db.example.com",
            port = 5432,
            "Database connection failed"
        );
    }
}

ELK Stack Integration

Elasticsearch Bulk API

Ingest JSONL logs directly into Elasticsearch using the Bulk API for high-throughput indexing.

# Bulk API expects NDJSON (JSONL) format
# Each document needs an action line followed by the document

curl -X POST "localhost:9200/_bulk" \
  -H "Content-Type: application/x-ndjson" \
  --data-binary @logs.jsonl

# logs.jsonl format:
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:00Z", "level": "info", "message": "Server started", "service": "api"}
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:05Z", "level": "error", "message": "Database timeout", "service": "api"}
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:10Z", "level": "warn", "message": "High memory usage", "service": "worker"}

Logstash Pipeline

Process JSONL logs with Logstash for filtering, enrichment, and routing.

# logstash.conf
input {
  file {
    path => "/var/log/application.jsonl"
    codec => "json_lines"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  # Parse timestamp
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }

  # Add hostname
  mutate {
    add_field => { "host" => "%{HOSTNAME}" }
  }

  # Extract error details
  if [level] == "error" {
    mutate {
      add_tag => ["error"]
      add_field => { "alert" => "true" }
    }
  }

  # GeoIP lookup for IP addresses
  if [ip] {
    geoip {
      source => "ip"
      target => "geoip"
    }
  }

  # User-Agent parsing
  if [user_agent] {
    useragent {
      source => "user_agent"
      target => "ua"
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }

  # Also output errors to separate index
  if [level] == "error" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "errors-%{+YYYY.MM.dd}"
    }
  }
}

Kibana Queries

Once logs are in Elasticsearch, query them with Kibana's powerful search.

# Find all errors in the last hour
level:error AND @timestamp:[now-1h TO now]

# Search for specific user activity
user_id:12345 AND action:(login OR logout)

# Find slow database queries
service:database AND duration_ms:>1000

# Search across multiple fields
message:"connection failed" OR error.message:"connection failed"

# Complex boolean query
(level:error OR level:warn) AND service:api AND NOT status:404

# Aggregation query (in Kibana Dev Tools)
GET /logs-*/_search
{
  "size": 0,
  "aggs": {
    "errors_by_service": {
      "terms": {
        "field": "service.keyword",
        "size": 10
      },
      "aggs": {
        "error_count": {
          "filter": {
            "term": { "level": "error" }
          }
        }
      }
    }
  }
}

Filebeat Configuration

Ship JSONL logs to Elasticsearch with Filebeat for lightweight, efficient log forwarding.

# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/application.jsonl
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: message

# Optional: Add fields
  fields:
    environment: production
    datacenter: us-east-1
  fields_under_root: true

# Multiline support for stack traces
  multiline.type: pattern
  multiline.pattern: '^\s'
  multiline.negate: false
  multiline.match: after

# Output to Elasticsearch
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "logs-%{+yyyy.MM.dd}"

# Or output to Logstash
output.logstash:
  hosts: ["localhost:5044"]

# Enable modules
filebeat.modules:
- module: nginx
  access:
    enabled: true
    var.paths: ["/var/log/nginx/access.jsonl"]

Fluentd Log Aggregation

Fluentd Configuration

# fluent.conf
<source>
  @type tail
  path /var/log/application.jsonl
  pos_file /var/log/td-agent/application.pos
  tag application.logs

  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S%z
  </parse>
</source>

# Filter: Add hostname and enrich data
<filter application.logs>
  @type record_transformer
  <record>
    hostname ${hostname}
    tag ${tag}
    env production
  </record>
</filter>

# Filter: Parse user agent
<filter application.logs>
  @type parser
  key_name user_agent
  reserve_data true
  <parse>
    @type user_agent
  </parse>
</filter>

# Output to Elasticsearch
<match application.logs>
  @type elasticsearch
  host localhost
  port 9200
  index_name logs-%Y.%m.%d
  type_name _doc

  <buffer>
    @type file
    path /var/log/td-agent/buffer/logs
    flush_interval 10s
    chunk_limit_size 5M
  </buffer>
</match>

# Output to S3 for archival
<match application.logs>
  @type s3
  s3_bucket my-logs-bucket
  s3_region us-east-1
  path logs/%Y/%m/%d/

  <buffer time>
    @type file
    path /var/log/td-agent/buffer/s3
    timekey 3600  # 1 hour
    timekey_wait 10m
    chunk_limit_size 256m
  </buffer>

  <format>
    @type json
  </format>
</match>

Application Integration

// Node.js: fluent-logger
const logger = require('fluent-logger');

logger.configure('application', {
  host: 'localhost',
  port: 24224,
  timeout: 3.0
});

// Send structured logs
logger.emit('user.action', {
  user_id: 12345,
  action: 'login',
  ip: '192.168.1.100',
  timestamp: new Date().toISOString()
});

logger.emit('database.query', {
  query: 'SELECT * FROM users',
  duration_ms: 45,
  rows: 100
});

// Python: fluent-logger-python
from fluent import sender
from fluent import event

logger = sender.FluentSender('application', host='localhost', port=24224)

logger.emit('user.action', {
    'user_id': 12345,
    'action': 'login',
    'ip': '192.168.1.100'
})

logger.emit('database.query', {
    'query': 'SELECT * FROM users',
    'duration_ms': 45,
    'rows': 100
})

Log Aggregation Patterns

Centralized Logging Architecture

Collect logs from multiple services into a central location for analysis.

# Architecture:
# App Servers -> Log Shipper -> Message Queue -> Processor -> Storage

# Example flow:
1. Applications write JSONL to local files
2. Filebeat/Fluentd ships logs to Kafka
3. Logstash consumes from Kafka and processes
4. Elasticsearch indexes for searching
5. S3 archives for long-term storage

# Sample log aggregation script
import json
import boto3
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch

# Consume from Kafka
consumer = KafkaConsumer(
    'application-logs',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

es = Elasticsearch(['http://elasticsearch:9200'])
s3 = boto3.client('s3')

for message in consumer:
    log_entry = message.value

    # Index in Elasticsearch for searching
    es.index(
        index=f"logs-{log_entry['timestamp'][:10]}",
        document=log_entry
    )

    # Archive to S3
    if log_entry['level'] == 'error':
        s3.put_object(
            Bucket='error-logs',
            Key=f"errors/{log_entry['timestamp']}.json",
            Body=json.dumps(log_entry)
        )

Multi-Tenant Log Segregation

# Separate logs by tenant/customer for SaaS applications

{"tenant_id": "acme_corp", "timestamp": "2025-01-15T10:00:00Z", "user": "[email protected]", "action": "login"}
{"tenant_id": "widgets_inc", "timestamp": "2025-01-15T10:00:01Z", "user": "[email protected]", "action": "purchase"}

# Logstash routing by tenant
filter {
  if [tenant_id] {
    mutate {
      add_field => { "[@metadata][target_index]" => "logs-%{tenant_id}-%{+YYYY.MM.dd}" }
    }
  }
}

output {
  elasticsearch {
    index => "%{[@metadata][target_index]}"
  }
}

# Query specific tenant logs
GET /logs-acme_corp-*/_search
{
  "query": {
    "match": {
      "user": "[email protected]"
    }
  }
}

Time-Series Log Retention

# Implement log retention policies with time-based indices

# Elasticsearch ILM (Index Lifecycle Management) policy
PUT _ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "50GB",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "freeze": {}
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

# Python script to archive old logs to S3
import gzip
import json
from datetime import datetime, timedelta

def archive_old_logs(days_old=30):
    cutoff_date = datetime.now() - timedelta(days=days_old)

    with open('application.jsonl', 'r') as infile:
        current_logs = []
        archive_logs = []

        for line in infile:
            log = json.loads(line)
            log_date = datetime.fromisoformat(log['timestamp'])

            if log_date < cutoff_date:
                archive_logs.append(line)
            else:
                current_logs.append(line)

    # Write current logs back
    with open('application.jsonl', 'w') as outfile:
        outfile.writelines(current_logs)

    # Compress and archive old logs
    archive_name = f"logs-{cutoff_date.strftime('%Y-%m')}.jsonl.gz"
    with gzip.open(archive_name, 'wt') as zipfile:
        zipfile.writelines(archive_logs)

Real-Time Log Processing

Streaming with Python

import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class LogFileHandler(FileSystemEventHandler):
    def __init__(self, filepath):
        self.filepath = filepath
        self.file = open(filepath, 'r')
        # Move to end of file
        self.file.seek(0, 2)

    def on_modified(self, event):
        if event.src_path == self.filepath:
            # Read new lines
            for line in self.file:
                try:
                    log_entry = json.loads(line)
                    self.process_log(log_entry)
                except json.JSONDecodeError:
                    pass

    def process_log(self, log_entry):
        # Real-time processing
        if log_entry.get('level') == 'error':
            self.alert_on_error(log_entry)

        if log_entry.get('duration_ms', 0) > 1000:
            self.alert_slow_request(log_entry)

    def alert_on_error(self, log):
        print(f"ERROR ALERT: {log['message']}")
        # Send to Slack, PagerDuty, etc.

    def alert_slow_request(self, log):
        print(f"SLOW REQUEST: {log.get('path')} took {log['duration_ms']}ms")

# Watch log file
handler = LogFileHandler('/var/log/application.jsonl')
observer = Observer()
observer.schedule(handler, path='/var/log', recursive=False)
observer.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

Real-Time Alerting

import json
from collections import defaultdict
from datetime import datetime, timedelta

class LogAlerter:
    def __init__(self):
        self.error_counts = defaultdict(int)
        self.last_reset = datetime.now()

    def process_log_stream(self, log_file):
        with open(log_file, 'r') as f:
            # Follow file like 'tail -f'
            f.seek(0, 2)
            while True:
                line = f.readline()
                if not line:
                    time.sleep(0.1)
                    continue

                try:
                    log = json.loads(line)
                    self.analyze(log)
                except json.JSONDecodeError:
                    continue

    def analyze(self, log):
        # Reset counters every minute
        if datetime.now() - self.last_reset > timedelta(minutes=1):
            self.check_thresholds()
            self.error_counts.clear()
            self.last_reset = datetime.now()

        # Count errors by service
        if log.get('level') == 'error':
            service = log.get('service', 'unknown')
            self.error_counts[service] += 1

        # Immediate alerts
        if 'out of memory' in log.get('message', '').lower():
            self.send_alert('CRITICAL', 'Out of memory error detected', log)

        if log.get('status_code') == 500:
            self.send_alert('HIGH', '500 error detected', log)

    def check_thresholds(self):
        for service, count in self.error_counts.items():
            if count > 100:  # More than 100 errors per minute
                self.send_alert(
                    'HIGH',
                    f'{service} error rate exceeded: {count}/min',
                    {'service': service, 'count': count}
                )

    def send_alert(self, severity, message, context):
        # Integrate with alerting systems
        print(f"[{severity}] {message}")
        print(f"Context: {json.dumps(context, indent=2)}")
        # Send to Slack, PagerDuty, Email, etc.

Cloud Logging Services

AWS CloudWatch Logs

import json
import boto3
from datetime import datetime

logs = boto3.client('logs')
log_group = '/aws/application/api'
log_stream = f"instance-{datetime.now().strftime('%Y-%m-%d')}"

# Create log stream if doesn't exist
try:
    logs.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
except logs.exceptions.ResourceAlreadyExistsException:
    pass

# Send structured logs
events = []
with open('application.jsonl', 'r') as f:
    for line in f:
        log = json.loads(line)
        events.append({
            'timestamp': int(datetime.fromisoformat(log['timestamp']).timestamp() * 1000),
            'message': json.dumps(log)
        })

# Batch upload (max 10,000 events or 1 MB)
logs.put_log_events(
    logGroupName=log_group,
    logStreamName=log_stream,
    logEvents=events
)

# Query with CloudWatch Insights
query = '''
fields @timestamp, level, message, duration_ms
| filter level = "error"
| sort @timestamp desc
| limit 100
'''

response = logs.start_query(
    logGroupName=log_group,
    startTime=int((datetime.now() - timedelta(hours=1)).timestamp()),
    endTime=int(datetime.now().timestamp()),
    queryString=query
)

Google Cloud Logging

from google.cloud import logging
import json

client = logging.Client()
logger = client.logger('application-logs')

# Read and upload JSONL logs
with open('application.jsonl', 'r') as f:
    for line in f:
        log_entry = json.loads(line)

        # Map severity
        severity_map = {
            'debug': 'DEBUG',
            'info': 'INFO',
            'warn': 'WARNING',
            'error': 'ERROR',
            'fatal': 'CRITICAL'
        }

        logger.log_struct(
            log_entry,
            severity=severity_map.get(log_entry.get('level'), 'DEFAULT')
        )

# Query logs
from google.cloud.logging import DESCENDING

filter_str = '''
resource.type="gce_instance"
severity="ERROR"
timestamp>="2025-01-15T00:00:00Z"
'''

for entry in client.list_entries(filter_=filter_str, order_by=DESCENDING):
    print(f"{entry.timestamp}: {entry.payload}")

Azure Monitor Logs

from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import json

credential = DefaultAzureCredential()
endpoint = "https://<data-collection-endpoint>.ingest.monitor.azure.com"
rule_id = "dcr-xxxxxxxxxxxxx"
stream_name = "Custom-ApplicationLogs"

client = LogsIngestionClient(endpoint=endpoint, credential=credential)

# Read JSONL and upload
logs_data = []
with open('application.jsonl', 'r') as f:
    for line in f:
        logs_data.append(json.loads(line))

# Upload in batches
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs_data)

# Query with KQL (Kusto Query Language)
from azure.monitor.query import LogsQueryClient

query_client = LogsQueryClient(credential)

query = """
ApplicationLogs
| where TimeGenerated > ago(1h)
| where Level == "error"
| project TimeGenerated, Message, Service, Host
| order by TimeGenerated desc
| limit 100
"""

response = query_client.query_workspace(
    workspace_id="<workspace-id>",
    query=query,
    timespan=timedelta(hours=1)
)

Best Practices

Log Structure

  • Always include timestamp, level, and message fields
  • Add request_id for tracing requests across services
  • Include service name and hostname for distributed systems
  • Use consistent field names across all services
  • Include environment (dev/staging/prod) in logs

Performance

  • Use async/non-blocking logging to avoid slowing down applications
  • Implement log sampling for high-volume endpoints
  • Compress logs (gzip) for storage efficiency
  • Use log levels appropriately (don't log everything as INFO)
  • Rotate log files automatically to prevent disk space issues

Security

  • Never log sensitive data (passwords, tokens, credit cards)
  • Redact PII (personally identifiable information) before logging
  • Encrypt log files at rest and in transit
  • Implement access controls for log viewing
  • Set up audit trails for who accessed logs

Storage & Retention

  • Define retention policies based on compliance requirements
  • Hot storage: Recent logs (7-30 days) for quick access
  • Cold storage: Archive older logs to S3/Glacier
  • Delete logs according to GDPR/compliance after retention period
  • Monitor storage costs and optimize accordingly

Related Resources