Production-ready patterns for building robust JSONL systems
Every record should have a unique identifier for deduplication, updates, and cross-referencing.
Avoid
{"name": "Alice", "age": 30}
{"name": "Bob", "age": 25}
Best Practice
{"id": "usr_001", "name": "Alice", "age": 30}
{"id": "usr_002", "name": "Bob", "age": 25}
Recommended ID strategies: UUIDs, ULIDs, auto-increment integers, or composite keys (e.g., "user_123", "order_2025-11-11_001").
Timestamps enable time-series analysis, debugging, and auditing. Use ISO 8601 format.
{"id": "evt_001", "type": "click", "timestamp": "2025-11-11T14:30:00Z", "user_id": "usr_123"}
{"id": "evt_002", "type": "purchase", "timestamp": "2025-11-11T14:35:22Z", "user_id": "usr_456"}
Common timestamp fields:
created_at - When record was createdupdated_at - When record was last modifiedtimestamp - Event occurrence timeprocessed_at - When record was processedWhen storing different record types in the same file, include a type discriminator field.
{"type": "user", "id": "usr_001", "name": "Alice", "email": "[email protected]"}
{"type": "product", "id": "prd_001", "name": "Widget", "price": 99.99}
{"type": "order", "id": "ord_001", "user_id": "usr_001", "product_id": "prd_001", "quantity": 2}
This pattern enables filtering, routing, and schema validation based on record type.
Include schema version for backward compatibility when evolving data structures.
{"schema_version": "1.0", "id": "usr_001", "name": "Alice"}
{"schema_version": "2.0", "id": "usr_002", "first_name": "Bob", "last_name": "Smith"}
Migration strategy: Write parsers that handle multiple schema versions gracefully, allowing gradual rollout.
Flat structures are easier to query, index, and convert to other formats like CSV.
Acceptable but Complex
{"user": {
"id": 1,
"name": {
"first": "Alice",
"last": "Smith"
}
}}
Better for Analytics
{"user_id": 1,
"first_name": "Alice",
"last_name": "Smith"}
Trade-off: Nesting is fine for complex data, but consider flattening for data warehouse ingestion and SQL queries.
Define and enforce schemas using JSON Schema for data quality.
# Python example with jsonschema
from jsonschema import validate, ValidationError
import json
schema = {
"type": "object",
"required": ["id", "name", "email"],
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
with open('users.jsonl', 'r') as f:
for line_num, line in enumerate(f, 1):
try:
obj = json.loads(line)
validate(instance=obj, schema=schema)
except ValidationError as e:
print(f"Line {line_num} validation error: {e.message}")
Handle parse errors without crashing the entire pipeline. Log errors and continue processing.
# Python: Skip malformed lines, log errors
import json
import logging
logging.basicConfig(level=logging.ERROR)
processed_count = 0
error_count = 0
with open('data.jsonl', 'r') as f:
for line_num, line in enumerate(f, 1):
try:
obj = json.loads(line)
process(obj)
processed_count += 1
except json.JSONDecodeError as e:
logging.error(f"Line {line_num}: {e} | Content: {line[:100]}")
error_count += 1
except Exception as e:
logging.error(f"Line {line_num}: Processing error: {e}")
error_count += 1
print(f"Processed: {processed_count}, Errors: {error_count}")
Write failed records to a separate file for later investigation and reprocessing.
# Python: Write errors to dead letter file
import json
with open('data.jsonl', 'r') as fin, \
open('processed.jsonl', 'w') as fout, \
open('errors.jsonl', 'w') as ferr:
for line_num, line in enumerate(fin, 1):
try:
obj = json.loads(line)
result = process(obj)
fout.write(json.dumps(result) + '\n')
except Exception as e:
error_record = {
"line_num": line_num,
"error": str(e),
"raw_line": line.strip()
}
ferr.write(json.dumps(error_record) + '\n')
For critical pipelines, stop immediately on errors to prevent cascading failures.
# Go: Fail fast on parse errors
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
func main() {
file, err := os.Open("data.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
var record map[string]interface{}
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
log.Fatalf("Parse error on line %d: %v", lineNum, err)
}
// Process record...
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
For transient errors (network, rate limits), implement exponential backoff retry.
# Python: Retry with exponential backoff
import json
import time
import requests
def process_with_retry(obj, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.post('https://api.example.com', json=obj)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
time.sleep(wait_time)
with open('data.jsonl', 'r') as f:
for line in f:
obj = json.loads(line)
result = process_with_retry(obj)
For multi-GB files, save progress to resume from failures without reprocessing.
# Python: Checkpoint every 10,000 records
import json
CHECKPOINT_FILE = 'progress.txt'
CHECKPOINT_INTERVAL = 10000
# Resume from last checkpoint
start_line = 0
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, 'r') as f:
start_line = int(f.read().strip())
with open('huge.jsonl', 'r') as f:
# Skip to checkpoint
for _ in range(start_line):
next(f)
for line_num in range(start_line, float('inf')):
line = f.readline()
if not line:
break
obj = json.loads(line)
process(obj)
# Save checkpoint periodically
if (line_num + 1) % CHECKPOINT_INTERVAL == 0:
with open(CHECKPOINT_FILE, 'w') as cf:
cf.write(str(line_num + 1))
# Cleanup checkpoint on success
os.remove(CHECKPOINT_FILE)
JSONL compresses 70-90% with gzip. Always store compressed, decompress on-the-fly when reading.
# Python: Transparent gzip compression
import gzip
import json
# Write compressed JSONL
with gzip.open('data.jsonl.gz', 'wt') as f:
for record in records:
f.write(json.dumps(record) + '\n')
# Read compressed JSONL (streaming)
with gzip.open('data.jsonl.gz', 'rt') as f:
for line in f:
obj = json.loads(line)
Compression tools comparison:
| Tool | Compression | Speed | Streaming | Use Case |
|---|---|---|---|---|
| gzip | Good (70-80%) | Fast | General purpose | |
| bzip2 | Better (80-90%) | Slow | Archival | |
| xz | Best (85-95%) | Very slow | Long-term storage | |
| zstd | Great (75-85%) | Very fast | Real-time pipelines |
Split data by time, geography, or category for faster querying and parallel processing.
# Directory structure for partitioned data
data/
year=2025/
month=01/
day=01/
events.jsonl.gz
day=02/
events.jsonl.gz
month=02/
day=01/
events.jsonl.gz
Benefits: Query only relevant partitions, parallelize processing, easier retention policies.
Set up alerts for abnormal file sizes or record counts to detect pipeline issues early.
# Bash: Daily file size monitoring
#!/bin/bash
FILE="data/$(date +%Y-%m-%d).jsonl"
LINE_COUNT=$(wc -l < "$FILE")
FILE_SIZE=$(du -h "$FILE" | cut -f1)
echo "Date: $(date)"
echo "Records: $LINE_COUNT"
echo "File size: $FILE_SIZE"
# Alert if too small (possible failure)
if [ "$LINE_COUNT" -lt 1000 ]; then
echo "WARNING: Low record count!"
# Send alert (email, Slack, PagerDuty, etc.)
fi
Write to temporary files and rename atomically to prevent partial file reads.
# Python: Atomic file write
import os
import json
import tempfile
def write_jsonl_atomic(records, filepath):
# Write to temp file in same directory
dir_path = os.path.dirname(filepath)
fd, temp_path = tempfile.mkstemp(dir=dir_path, suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
for record in records:
f.write(json.dumps(record) + '\n')
# Atomic rename (POSIX guarantees atomicity)
os.rename(temp_path, filepath)
except:
# Cleanup on failure
if os.path.exists(temp_path):
os.remove(temp_path)
raise
This prevents readers from seeing incomplete files during writes.
Serve JSONL over HTTP with chunked transfer encoding for large result sets.
# Node.js: Express streaming endpoint
const express = require('express');
const fs = require('fs');
const readline = require('readline');
const app = express();
app.get('/api/data', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Transfer-Encoding', 'chunked');
const fileStream = fs.createReadStream('data.jsonl');
const rl = readline.createInterface({ input: fileStream });
for await (const line of rl) {
res.write(line + '\n');
}
res.end();
});
app.listen(3000);
Clients can process results as they arrive, reducing latency and memory usage.
Track schema conformance, null rates, and data distribution over time.
# Python: Data quality profiling
import json
from collections import defaultdict
stats = {
'total_records': 0,
'field_counts': defaultdict(int),
'null_counts': defaultdict(int),
'type_errors': []
}
with open('data.jsonl', 'r') as f:
for line_num, line in enumerate(f, 1):
obj = json.loads(line)
stats['total_records'] += 1
for key, value in obj.items():
stats['field_counts'][key] += 1
if value is None:
stats['null_counts'][key] += 1
# Calculate percentages
for field, count in stats['field_counts'].items():
coverage = (count / stats['total_records']) * 100
null_rate = (stats['null_counts'][field] / count) * 100
print(f"{field}: {coverage:.1f}% coverage, {null_rate:.1f}% null")
Always use buffered readers/writers. Unbuffered I/O is 10-100x slower.
Slow (Unbuffered)
with open('data.jsonl', 'r') as f:
for line in f: # Default buffering
obj = json.loads(line)
Fast (Explicit Buffer)
with open('data.jsonl', 'r', buffering=1024*1024) as f:
for line in f:
obj = json.loads(line)
Tip: Python defaults to 8KB buffer. Increasing to 1MB can significantly improve throughput on large files.
Split JSONL files and process chunks in parallel using multiprocessing.
# Python: Parallel processing with multiprocessing
from multiprocessing import Pool
import json
def process_chunk(filename):
results = []
with open(filename, 'r') as f:
for line in f:
obj = json.loads(line)
result = expensive_computation(obj)
results.append(result)
return results
# Split file into chunks (use split command or custom splitter)
chunk_files = ['chunk-01.jsonl', 'chunk-02.jsonl', 'chunk-03.jsonl', 'chunk-04.jsonl']
# Process in parallel (one process per CPU core)
with Pool() as pool:
all_results = pool.map(process_chunk, chunk_files)
# Flatten results
final_results = [item for sublist in all_results for item in sublist]
Standard library JSON parsers are often slow. Use optimized alternatives.
# Use orjson (3-5x faster than standard json)
import orjson
with open('data.jsonl', 'rb') as f:
for line in f:
obj = orjson.loads(line) # Much faster than json.loads()
// Use simdjson for ultra-fast parsing (Node.js)
const simdjson = require('simdjson');
// 2-4x faster than JSON.parse() for large objects
const obj = simdjson.parse(line);
// Use jsoniter (faster than encoding/json)
import jsoniter "github.com/json-iterator/go"
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var record MyStruct
json.Unmarshal(line, &record)
If you only need specific records, use grep or fast text search before JSON parsing.
# Filter with grep first, then parse
grep '"status":"active"' users.jsonl | python process.py
# In Python: Skip parsing if not needed
import json
with open('events.jsonl', 'r') as f:
for line in f:
# Quick string check before expensive parsing
if '"event_type":"click"' not in line:
continue
obj = json.loads(line)
process(obj)
This can provide 10x+ speedup when filtering is selective.
For random access to specific records, build an index mapping record IDs to file offsets.
# Python: Build and use offset index
import json
# Build index (one-time operation)
index = {} # {record_id: file_offset}
offset = 0
with open('data.jsonl', 'rb') as f:
while True:
line = f.readline()
if not line:
break
obj = json.loads(line)
index[obj['id']] = offset
offset = f.tell()
# Save index
with open('data.index.json', 'w') as f:
json.dump(index, f)
# Fast random access using index
with open('data.jsonl', 'rb') as f:
target_offset = index['record_12345']
f.seek(target_offset)
line = f.readline()
obj = json.loads(line)
Enables O(1) random access instead of O(n) linear scan.
Use JSONL for large-scale, record-oriented data that benefits from streaming or append-only operations. For everything else, regular JSON is usually the better choice.