Why structured logging with JSONL has become the industry standard for modern application monitoring and observability
Traditional plain-text logs are hard to parse and query. JSONL provides structure while maintaining the simplicity of line-based logging - perfect for real-time streaming and analysis.
Plain Text:
2025-01-15 10:30:45 ERROR Database connection failed
JSONL:
{"timestamp":"2025-01-15T10:30:45Z","level":"error","msg":"Database connection failed","db":"users","host":"prod-01"}
Major logging platforms and tools have standardized on JSONL:
Winston is the most popular logging library for Node.js with built-in JSONL support.
const winston = require('winston');
// Configure Winston for JSONL output
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
// Write all logs to application.jsonl
new winston.transports.File({
filename: 'application.jsonl',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
)
}),
// Also log to console for development
new winston.transports.Console({
format: winston.format.simple()
})
]
});
// Add request context middleware (Express)
const addContext = (req, res, next) => {
req.logger = logger.child({
request_id: req.id,
user_id: req.user?.id,
ip: req.ip,
user_agent: req.get('user-agent')
});
next();
};
// Log examples
logger.info('Server started', { port: 3000, env: 'production' });
logger.error('Database query failed', {
query: 'SELECT * FROM users',
error: err.message,
stack: err.stack,
duration_ms: 1250
});
logger.warn('High memory usage', {
memory_used_mb: 512,
memory_limit_mb: 1024,
threshold_percent: 80
});
// With request context
app.get('/api/users', addContext, async (req, res) => {
req.logger.info('Fetching users', {
page: req.query.page,
limit: req.query.limit
});
// ... handler code
});
Pro Tip: Use child loggers to automatically include context like request IDs in all logs within a request.
Structlog makes structured logging in Python simple and performant.
import structlog
import logging
import sys
# Configure structlog for JSONL output
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Create logger
logger = structlog.get_logger()
# Basic logging
logger.info("server_started", port=8000, workers=4)
logger.error(
"database_connection_failed",
host="db.example.com",
port=5432,
retry_count=3,
exc_info=True
)
# Add context for multiple log entries
log_with_context = logger.bind(
user_id=12345,
request_id="req_abc123",
session_id="sess_xyz789"
)
log_with_context.info("user_action", action="login", ip="192.168.1.100")
log_with_context.info("user_action", action="view_profile")
# Django/Flask middleware example
class LoggingMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
request_id = environ.get('HTTP_X_REQUEST_ID', 'unknown')
logger = structlog.get_logger().bind(
request_id=request_id,
path=environ.get('PATH_INFO'),
method=environ.get('REQUEST_METHOD')
)
logger.info("request_started")
try:
return self.app(environ, start_response)
finally:
logger.info("request_completed")
Zerolog is a zero-allocation JSON logger for Go with excellent performance.
package main
import (
"os"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func main() {
// Configure logger for JSONL file output
file, err := os.OpenFile(
"application.jsonl",
os.O_APPEND|os.O_CREATE|os.O_WRONLY,
0644,
)
if err != nil {
panic(err)
}
defer file.Close()
log.Logger = zerolog.New(file).With().Timestamp().Caller().Logger()
// Basic logging
log.Info().
Str("event", "server_start").
Int("port", 8080).
Str("env", "production").
Msg("Server starting")
// Error logging with context
err = connectDatabase()
if err != nil {
log.Error().
Err(err).
Str("host", "db.example.com").
Int("port", 5432).
Int("retry", 3).
Msg("Database connection failed")
}
// HTTP middleware example
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Create logger with request context
logger := log.With().
Str("request_id", r.Header.Get("X-Request-ID")).
Str("method", r.Method).
Str("path", r.URL.Path).
Str("remote_addr", r.RemoteAddr).
Logger()
logger.Info().Msg("request_started")
// Wrap response writer to capture status
wrapped := &statusWriter{ResponseWriter: w}
next.ServeHTTP(wrapped, r)
logger.Info().
Int("status", wrapped.status).
Dur("duration_ms", time.Since(start)).
Msg("request_completed")
})
}
}
Configure Logback for JSON structured logging in Spring Boot applications.
<!-- logback-spring.xml -->
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>application.jsonl</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeContext>false</includeContext>
<timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSS'Z'</timestampPattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
</root>
</configuration>
// Java application code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
public class UserService {
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
public User getUser(Long userId) {
// Add context to MDC (Mapped Diagnostic Context)
MDC.put("user_id", userId.toString());
MDC.put("operation", "get_user");
try {
logger.info("Fetching user from database");
User user = userRepository.findById(userId);
if (user == null) {
logger.warn("User not found");
return null;
}
logger.info("User retrieved successfully");
return user;
} catch (Exception e) {
logger.error("Error fetching user", e);
throw e;
} finally {
MDC.clear();
}
}
}
use tracing::{info, error, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn main() {
// Configure JSON formatter
let file = std::fs::File::create("application.jsonl")
.expect("Failed to create log file");
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.json()
.with_writer(std::sync::Arc::new(file))
)
.init();
info!(port = 8080, env = "production", "Server started");
// Structured logging with context
let user_id = 12345;
let request_id = "req_abc123";
info!(
user_id = user_id,
request_id = request_id,
action = "login",
"User logged in"
);
// Error logging
if let Err(e) = connect_database() {
error!(
error = %e,
host = "db.example.com",
port = 5432,
"Database connection failed"
);
}
}
Ingest JSONL logs directly into Elasticsearch using the Bulk API for high-throughput indexing.
# Bulk API expects NDJSON (JSONL) format
# Each document needs an action line followed by the document
curl -X POST "localhost:9200/_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @logs.jsonl
# logs.jsonl format:
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:00Z", "level": "info", "message": "Server started", "service": "api"}
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:05Z", "level": "error", "message": "Database timeout", "service": "api"}
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:10Z", "level": "warn", "message": "High memory usage", "service": "worker"}
Process JSONL logs with Logstash for filtering, enrichment, and routing.
# logstash.conf
input {
file {
path => "/var/log/application.jsonl"
codec => "json_lines"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
# Parse timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# Add hostname
mutate {
add_field => { "host" => "%{HOSTNAME}" }
}
# Extract error details
if [level] == "error" {
mutate {
add_tag => ["error"]
add_field => { "alert" => "true" }
}
}
# GeoIP lookup for IP addresses
if [ip] {
geoip {
source => "ip"
target => "geoip"
}
}
# User-Agent parsing
if [user_agent] {
useragent {
source => "user_agent"
target => "ua"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# Also output errors to separate index
if [level] == "error" {
elasticsearch {
hosts => ["localhost:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
}
}
Once logs are in Elasticsearch, query them with Kibana's powerful search.
# Find all errors in the last hour
level:error AND @timestamp:[now-1h TO now]
# Search for specific user activity
user_id:12345 AND action:(login OR logout)
# Find slow database queries
service:database AND duration_ms:>1000
# Search across multiple fields
message:"connection failed" OR error.message:"connection failed"
# Complex boolean query
(level:error OR level:warn) AND service:api AND NOT status:404
# Aggregation query (in Kibana Dev Tools)
GET /logs-*/_search
{
"size": 0,
"aggs": {
"errors_by_service": {
"terms": {
"field": "service.keyword",
"size": 10
},
"aggs": {
"error_count": {
"filter": {
"term": { "level": "error" }
}
}
}
}
}
}
Ship JSONL logs to Elasticsearch with Filebeat for lightweight, efficient log forwarding.
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/application.jsonl
json.keys_under_root: true
json.add_error_key: true
json.message_key: message
# Optional: Add fields
fields:
environment: production
datacenter: us-east-1
fields_under_root: true
# Multiline support for stack traces
multiline.type: pattern
multiline.pattern: '^\s'
multiline.negate: false
multiline.match: after
# Output to Elasticsearch
output.elasticsearch:
hosts: ["localhost:9200"]
index: "logs-%{+yyyy.MM.dd}"
# Or output to Logstash
output.logstash:
hosts: ["localhost:5044"]
# Enable modules
filebeat.modules:
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.jsonl"]
# fluent.conf
<source>
@type tail
path /var/log/application.jsonl
pos_file /var/log/td-agent/application.pos
tag application.logs
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S%z
</parse>
</source>
# Filter: Add hostname and enrich data
<filter application.logs>
@type record_transformer
<record>
hostname ${hostname}
tag ${tag}
env production
</record>
</filter>
# Filter: Parse user agent
<filter application.logs>
@type parser
key_name user_agent
reserve_data true
<parse>
@type user_agent
</parse>
</filter>
# Output to Elasticsearch
<match application.logs>
@type elasticsearch
host localhost
port 9200
index_name logs-%Y.%m.%d
type_name _doc
<buffer>
@type file
path /var/log/td-agent/buffer/logs
flush_interval 10s
chunk_limit_size 5M
</buffer>
</match>
# Output to S3 for archival
<match application.logs>
@type s3
s3_bucket my-logs-bucket
s3_region us-east-1
path logs/%Y/%m/%d/
<buffer time>
@type file
path /var/log/td-agent/buffer/s3
timekey 3600 # 1 hour
timekey_wait 10m
chunk_limit_size 256m
</buffer>
<format>
@type json
</format>
</match>
// Node.js: fluent-logger
const logger = require('fluent-logger');
logger.configure('application', {
host: 'localhost',
port: 24224,
timeout: 3.0
});
// Send structured logs
logger.emit('user.action', {
user_id: 12345,
action: 'login',
ip: '192.168.1.100',
timestamp: new Date().toISOString()
});
logger.emit('database.query', {
query: 'SELECT * FROM users',
duration_ms: 45,
rows: 100
});
// Python: fluent-logger-python
from fluent import sender
from fluent import event
logger = sender.FluentSender('application', host='localhost', port=24224)
logger.emit('user.action', {
'user_id': 12345,
'action': 'login',
'ip': '192.168.1.100'
})
logger.emit('database.query', {
'query': 'SELECT * FROM users',
'duration_ms': 45,
'rows': 100
})
Collect logs from multiple services into a central location for analysis.
# Architecture:
# App Servers -> Log Shipper -> Message Queue -> Processor -> Storage
# Example flow:
1. Applications write JSONL to local files
2. Filebeat/Fluentd ships logs to Kafka
3. Logstash consumes from Kafka and processes
4. Elasticsearch indexes for searching
5. S3 archives for long-term storage
# Sample log aggregation script
import json
import boto3
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
# Consume from Kafka
consumer = KafkaConsumer(
'application-logs',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
es = Elasticsearch(['http://elasticsearch:9200'])
s3 = boto3.client('s3')
for message in consumer:
log_entry = message.value
# Index in Elasticsearch for searching
es.index(
index=f"logs-{log_entry['timestamp'][:10]}",
document=log_entry
)
# Archive to S3
if log_entry['level'] == 'error':
s3.put_object(
Bucket='error-logs',
Key=f"errors/{log_entry['timestamp']}.json",
Body=json.dumps(log_entry)
)
# Separate logs by tenant/customer for SaaS applications
{"tenant_id": "acme_corp", "timestamp": "2025-01-15T10:00:00Z", "user": "[email protected]", "action": "login"}
{"tenant_id": "widgets_inc", "timestamp": "2025-01-15T10:00:01Z", "user": "[email protected]", "action": "purchase"}
# Logstash routing by tenant
filter {
if [tenant_id] {
mutate {
add_field => { "[@metadata][target_index]" => "logs-%{tenant_id}-%{+YYYY.MM.dd}" }
}
}
}
output {
elasticsearch {
index => "%{[@metadata][target_index]}"
}
}
# Query specific tenant logs
GET /logs-acme_corp-*/_search
{
"query": {
"match": {
"user": "[email protected]"
}
}
}
# Implement log retention policies with time-based indices
# Elasticsearch ILM (Index Lifecycle Management) policy
PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
# Python script to archive old logs to S3
import gzip
import json
from datetime import datetime, timedelta
def archive_old_logs(days_old=30):
cutoff_date = datetime.now() - timedelta(days=days_old)
with open('application.jsonl', 'r') as infile:
current_logs = []
archive_logs = []
for line in infile:
log = json.loads(line)
log_date = datetime.fromisoformat(log['timestamp'])
if log_date < cutoff_date:
archive_logs.append(line)
else:
current_logs.append(line)
# Write current logs back
with open('application.jsonl', 'w') as outfile:
outfile.writelines(current_logs)
# Compress and archive old logs
archive_name = f"logs-{cutoff_date.strftime('%Y-%m')}.jsonl.gz"
with gzip.open(archive_name, 'wt') as zipfile:
zipfile.writelines(archive_logs)
import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class LogFileHandler(FileSystemEventHandler):
def __init__(self, filepath):
self.filepath = filepath
self.file = open(filepath, 'r')
# Move to end of file
self.file.seek(0, 2)
def on_modified(self, event):
if event.src_path == self.filepath:
# Read new lines
for line in self.file:
try:
log_entry = json.loads(line)
self.process_log(log_entry)
except json.JSONDecodeError:
pass
def process_log(self, log_entry):
# Real-time processing
if log_entry.get('level') == 'error':
self.alert_on_error(log_entry)
if log_entry.get('duration_ms', 0) > 1000:
self.alert_slow_request(log_entry)
def alert_on_error(self, log):
print(f"ERROR ALERT: {log['message']}")
# Send to Slack, PagerDuty, etc.
def alert_slow_request(self, log):
print(f"SLOW REQUEST: {log.get('path')} took {log['duration_ms']}ms")
# Watch log file
handler = LogFileHandler('/var/log/application.jsonl')
observer = Observer()
observer.schedule(handler, path='/var/log', recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
import json
from collections import defaultdict
from datetime import datetime, timedelta
class LogAlerter:
def __init__(self):
self.error_counts = defaultdict(int)
self.last_reset = datetime.now()
def process_log_stream(self, log_file):
with open(log_file, 'r') as f:
# Follow file like 'tail -f'
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
continue
try:
log = json.loads(line)
self.analyze(log)
except json.JSONDecodeError:
continue
def analyze(self, log):
# Reset counters every minute
if datetime.now() - self.last_reset > timedelta(minutes=1):
self.check_thresholds()
self.error_counts.clear()
self.last_reset = datetime.now()
# Count errors by service
if log.get('level') == 'error':
service = log.get('service', 'unknown')
self.error_counts[service] += 1
# Immediate alerts
if 'out of memory' in log.get('message', '').lower():
self.send_alert('CRITICAL', 'Out of memory error detected', log)
if log.get('status_code') == 500:
self.send_alert('HIGH', '500 error detected', log)
def check_thresholds(self):
for service, count in self.error_counts.items():
if count > 100: # More than 100 errors per minute
self.send_alert(
'HIGH',
f'{service} error rate exceeded: {count}/min',
{'service': service, 'count': count}
)
def send_alert(self, severity, message, context):
# Integrate with alerting systems
print(f"[{severity}] {message}")
print(f"Context: {json.dumps(context, indent=2)}")
# Send to Slack, PagerDuty, Email, etc.
import json
import boto3
from datetime import datetime
logs = boto3.client('logs')
log_group = '/aws/application/api'
log_stream = f"instance-{datetime.now().strftime('%Y-%m-%d')}"
# Create log stream if doesn't exist
try:
logs.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
except logs.exceptions.ResourceAlreadyExistsException:
pass
# Send structured logs
events = []
with open('application.jsonl', 'r') as f:
for line in f:
log = json.loads(line)
events.append({
'timestamp': int(datetime.fromisoformat(log['timestamp']).timestamp() * 1000),
'message': json.dumps(log)
})
# Batch upload (max 10,000 events or 1 MB)
logs.put_log_events(
logGroupName=log_group,
logStreamName=log_stream,
logEvents=events
)
# Query with CloudWatch Insights
query = '''
fields @timestamp, level, message, duration_ms
| filter level = "error"
| sort @timestamp desc
| limit 100
'''
response = logs.start_query(
logGroupName=log_group,
startTime=int((datetime.now() - timedelta(hours=1)).timestamp()),
endTime=int(datetime.now().timestamp()),
queryString=query
)
from google.cloud import logging
import json
client = logging.Client()
logger = client.logger('application-logs')
# Read and upload JSONL logs
with open('application.jsonl', 'r') as f:
for line in f:
log_entry = json.loads(line)
# Map severity
severity_map = {
'debug': 'DEBUG',
'info': 'INFO',
'warn': 'WARNING',
'error': 'ERROR',
'fatal': 'CRITICAL'
}
logger.log_struct(
log_entry,
severity=severity_map.get(log_entry.get('level'), 'DEFAULT')
)
# Query logs
from google.cloud.logging import DESCENDING
filter_str = '''
resource.type="gce_instance"
severity="ERROR"
timestamp>="2025-01-15T00:00:00Z"
'''
for entry in client.list_entries(filter_=filter_str, order_by=DESCENDING):
print(f"{entry.timestamp}: {entry.payload}")
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import json
credential = DefaultAzureCredential()
endpoint = "https://<data-collection-endpoint>.ingest.monitor.azure.com"
rule_id = "dcr-xxxxxxxxxxxxx"
stream_name = "Custom-ApplicationLogs"
client = LogsIngestionClient(endpoint=endpoint, credential=credential)
# Read JSONL and upload
logs_data = []
with open('application.jsonl', 'r') as f:
for line in f:
logs_data.append(json.loads(line))
# Upload in batches
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs_data)
# Query with KQL (Kusto Query Language)
from azure.monitor.query import LogsQueryClient
query_client = LogsQueryClient(credential)
query = """
ApplicationLogs
| where TimeGenerated > ago(1h)
| where Level == "error"
| project TimeGenerated, Message, Service, Host
| order by TimeGenerated desc
| limit 100
"""
response = query_client.query_workspace(
workspace_id="<workspace-id>",
query=query,
timespan=timedelta(hours=1)
)