Master compression algorithms, parallel processing, memory optimization, and distributed systems for production-scale JSONL deployments
When processing terabytes of JSONL data daily, basic approaches fail. Advanced techniques reduce storage by 90%, increase throughput by 10x, and enable processing datasets that don't fit in memory.
Real-world performance improvements from production systems:
gzip is the most widely supported compression format with excellent balance of speed and compression ratio.
# Python: gzip compression and decompression
import gzip
import json
# Compress JSONL file
def compress_jsonl(input_file, output_file, compression_level=6):
"""
Compress JSONL with gzip
compression_level: 1 (fastest) to 9 (best compression)
"""
with open(input_file, 'rb') as f_in:
with gzip.open(output_file, 'wb', compresslevel=compression_level) as f_out:
# Copy in chunks to handle large files
chunk_size = 1024 * 1024 # 1MB chunks
while True:
chunk = f_in.read(chunk_size)
if not chunk:
break
f_out.write(chunk)
# Decompress and stream
def stream_gzip_jsonl(filepath):
"""Stream decompress gzipped JSONL"""
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
for line in f:
yield json.loads(line)
# Usage
compress_jsonl('data.jsonl', 'data.jsonl.gz', compression_level=9)
# Process compressed file without decompressing to disk
for record in stream_gzip_jsonl('data.jsonl.gz'):
process(record)
# Append to gzipped file (requires decompression)
def append_to_gzip(filepath, new_records):
"""Append records to gzipped JSONL"""
# Read existing records
existing = []
try:
with gzip.open(filepath, 'rt') as f:
existing = [json.loads(line) for line in f]
except FileNotFoundError:
pass
# Combine and rewrite
all_records = existing + new_records
with gzip.open(filepath, 'wt') as f:
for record in all_records:
f.write(json.dumps(record) + '\n')
# Benchmark different compression levels
import time
def benchmark_compression(input_file):
"""Test all gzip compression levels"""
results = []
for level in range(1, 10):
start = time.time()
compress_jsonl(input_file, f'test_{level}.gz', level)
duration = time.time() - start
size = os.path.getsize(f'test_{level}.gz')
original_size = os.path.getsize(input_file)
ratio = (1 - size / original_size) * 100
results.append({
'level': level,
'duration': duration,
'size': size,
'ratio': ratio,
'speed_mb_s': os.path.getsize(input_file) / 1024 / 1024 / duration
})
print(f"Level {level}: {ratio:.1f}% reduction, {duration:.2f}s, {results[-1]['speed_mb_s']:.1f} MB/s")
return results
Pro Tip: Level 6 offers the best speed/ratio tradeoff for most use cases. Use level 1 for maximum speed, level 9 for archival storage.
Zstandard (zstd) offers better compression ratios than gzip with significantly faster compression/decompression speeds.
import zstandard as zstd
import json
# Compress with zstd
def compress_zstd(input_file, output_file, level=3):
"""
Compress JSONL with zstd
level: 1-22 (3 is default, 19+ for archival)
"""
cctx = zstd.ZstdCompressor(level=level)
with open(input_file, 'rb') as f_in:
with open(output_file, 'wb') as f_out:
f_out.write(cctx.compress(f_in.read()))
# Stream compression (better for large files)
def stream_compress_zstd(input_file, output_file, level=3):
"""Stream compress for memory efficiency"""
cctx = zstd.ZstdCompressor(level=level)
with open(input_file, 'rb') as f_in:
with open(output_file, 'wb') as f_out:
# Compress in chunks
compressor = cctx.stream_writer(f_out)
for chunk in iter(lambda: f_in.read(1024 * 1024), b''):
compressor.write(chunk)
compressor.flush(zstd.FLUSH_FRAME)
# Decompress and parse
def read_zstd_jsonl(filepath):
"""Read and parse zstd compressed JSONL"""
dctx = zstd.ZstdDecompressor()
with open(filepath, 'rb') as f:
with dctx.stream_reader(f) as reader:
text_stream = io.TextIOWrapper(reader, encoding='utf-8')
for line in text_stream:
yield json.loads(line)
# Train dictionary for better compression
def train_zstd_dictionary(sample_files, dict_size=100 * 1024):
"""Train custom dictionary on sample data"""
samples = []
for filepath in sample_files:
with open(filepath, 'rb') as f:
samples.append(f.read())
# Train dictionary
dict_data = zstd.train_dictionary(dict_size, samples)
return dict_data
# Compress with trained dictionary
def compress_with_dict(input_file, output_file, dict_data, level=3):
"""Compress using trained dictionary for better ratio"""
cctx = zstd.ZstdCompressor(level=level, dict_data=dict_data)
with open(input_file, 'rb') as f_in:
with open(output_file, 'wb') as f_out:
f_out.write(cctx.compress(f_in.read()))
# Comparison: zstd vs gzip
def compare_compression():
"""Compare different algorithms"""
import time
import gzip
input_file = 'large_data.jsonl'
original_size = os.path.getsize(input_file)
# Test gzip
start = time.time()
with open(input_file, 'rb') as f_in:
with gzip.open('test.gz', 'wb') as f_out:
f_out.write(f_in.read())
gzip_time = time.time() - start
gzip_size = os.path.getsize('test.gz')
# Test zstd
start = time.time()
compress_zstd(input_file, 'test.zst', level=3)
zstd_time = time.time() - start
zstd_size = os.path.getsize('test.zst')
print(f"Original: {original_size / 1024 / 1024:.1f} MB")
print(f"gzip: {gzip_size / 1024 / 1024:.1f} MB ({gzip_time:.2f}s) - {(1 - gzip_size/original_size)*100:.1f}% reduction")
print(f"zstd: {zstd_size / 1024 / 1024:.1f} MB ({zstd_time:.2f}s) - {(1 - zstd_size/original_size)*100:.1f}% reduction")
print(f"zstd is {gzip_time / zstd_time:.1f}x faster")
# Output example:
# Original: 1024.0 MB
# gzip: 102.4 MB (12.5s) - 90.0% reduction
# zstd: 81.9 MB (2.5s) - 92.0% reduction
# zstd is 5.0x faster
Brotli achieves the highest compression ratios, ideal for web delivery and archival storage.
import brotli
import json
# Compress with brotli
def compress_brotli(input_file, output_file, quality=11):
"""
Compress JSONL with brotli
quality: 0-11 (11 is maximum compression)
"""
with open(input_file, 'rb') as f_in:
data = f_in.read()
compressed = brotli.compress(data, quality=quality)
with open(output_file, 'wb') as f_out:
f_out.write(compressed)
# Stream decompress
def read_brotli_jsonl(filepath):
"""Read brotli compressed JSONL"""
with open(filepath, 'rb') as f:
compressed = f.read()
decompressed = brotli.decompress(compressed)
text = decompressed.decode('utf-8')
for line in text.split('\n'):
if line.strip():
yield json.loads(line)
# HTTP response compression
from flask import Flask, Response
import io
app = Flask(__name__)
@app.route('/api/data.jsonl')
def get_data_brotli():
"""Serve brotli-compressed JSONL over HTTP"""
def generate():
for record in get_records():
yield json.dumps(record) + '\n'
# Compress response
output = io.BytesIO()
content = ''.join(generate()).encode('utf-8')
compressed = brotli.compress(content, quality=6)
return Response(
compressed,
mimetype='application/x-ndjson',
headers={
'Content-Encoding': 'br',
'Content-Length': len(compressed)
}
)
import lz4.frame
import json
# LZ4 compression
def compress_lz4(input_file, output_file):
"""Ultra-fast LZ4 compression"""
with open(input_file, 'rb') as f_in:
with lz4.frame.open(output_file, 'wb') as f_out:
f_out.write(f_in.read())
# Stream processing
def process_lz4_jsonl(filepath):
"""Process LZ4 compressed JSONL"""
with lz4.frame.open(filepath, 'rt', encoding='utf-8') as f:
for line in f:
record = json.loads(line)
process(record)
# Benchmark: LZ4 vs others
def benchmark_all():
"""Compare all compression algorithms"""
import time
algorithms = {
'gzip': lambda i, o: compress_jsonl(i, o, 6),
'zstd': lambda i, o: compress_zstd(i, o, 3),
'brotli': lambda i, o: compress_brotli(i, o, 6),
'lz4': lambda i, o: compress_lz4(i, o)
}
results = {}
input_file = 'test.jsonl'
original_size = os.path.getsize(input_file)
for name, compress_fn in algorithms.items():
output = f'test.{name}'
start = time.time()
compress_fn(input_file, output)
duration = time.time() - start
size = os.path.getsize(output)
ratio = (1 - size / original_size) * 100
speed = original_size / 1024 / 1024 / duration
results[name] = {
'size_mb': size / 1024 / 1024,
'ratio': ratio,
'time': duration,
'speed_mb_s': speed
}
print(f"{name:8} {ratio:5.1f}% reduction, {speed:6.1f} MB/s")
return results
import gzip
import json
from typing import Iterator
class StreamingCompressor:
"""Compress JSONL records as they're generated"""
def __init__(self, output_file, compression='gzip', level=6):
self.output_file = output_file
self.compression = compression
self.level = level
self.compressor = None
def __enter__(self):
if self.compression == 'gzip':
self.compressor = gzip.open(
self.output_file, 'wt',
compresslevel=self.level
)
elif self.compression == 'zstd':
import zstandard as zstd
cctx = zstd.ZstdCompressor(level=self.level)
f = open(self.output_file, 'wb')
self.compressor = cctx.stream_writer(f, closefd=True)
return self
def __exit__(self, *args):
if self.compressor:
self.compressor.close()
def write_record(self, record: dict):
"""Write single record to compressed stream"""
line = json.dumps(record) + '\n'
if self.compression == 'zstd':
self.compressor.write(line.encode('utf-8'))
else:
self.compressor.write(line)
# Usage: Generate and compress on-the-fly
with StreamingCompressor('output.jsonl.gz', compression='gzip') as compressor:
for i in range(1000000):
record = {
'id': i,
'data': generate_data(),
'timestamp': datetime.now().isoformat()
}
compressor.write_record(record)
# Database export with streaming compression
def export_database_compressed(table, output_file):
"""Export database table to compressed JSONL"""
import psycopg2
conn = psycopg2.connect("dbname=mydb")
cursor = conn.cursor()
with StreamingCompressor(output_file) as compressor:
cursor.execute(f"SELECT * FROM {table}")
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
for row in rows:
record = dict(zip([desc[0] for desc in cursor.description], row))
compressor.write_record(record)
cursor.close()
conn.close()
# Parallel compression with multiprocessing
from multiprocessing import Pool, Queue
import queue
def parallel_compress_worker(input_queue, output_file, worker_id):
"""Worker process for parallel compression"""
temp_file = f"{output_file}.part{worker_id}"
with gzip.open(temp_file, 'wt') as f:
while True:
try:
record = input_queue.get(timeout=1)
if record is None: # Poison pill
break
f.write(json.dumps(record) + '\n')
except queue.Empty:
continue
def parallel_compress(records: Iterator, output_file, num_workers=4):
"""Compress using multiple processes"""
from multiprocessing import Manager
manager = Manager()
input_queue = manager.Queue(maxsize=1000)
# Start workers
pool = Pool(num_workers)
workers = [
pool.apply_async(parallel_compress_worker, (input_queue, output_file, i))
for i in range(num_workers)
]
# Feed records to queue
for record in records:
input_queue.put(record)
# Send stop signal
for _ in range(num_workers):
input_queue.put(None)
# Wait for completion
for worker in workers:
worker.get()
pool.close()
pool.join()
# Merge part files
with gzip.open(output_file, 'wt') as f_out:
for i in range(num_workers):
part_file = f"{output_file}.part{i}"
with gzip.open(part_file, 'rt') as f_in:
for line in f_in:
f_out.write(line)
os.remove(part_file)
import multiprocessing as mp
import json
from typing import Callable
def process_chunk(chunk_data):
"""Process a chunk of JSONL records"""
chunk_id, lines = chunk_data
results = []
for line in lines:
record = json.loads(line)
# Process record...
processed = transform_record(record)
results.append(processed)
return results
def parallel_process_jsonl(input_file, output_file, num_processes=None):
"""Process JSONL file in parallel"""
if num_processes is None:
num_processes = mp.cpu_count()
# Read file and split into chunks
with open(input_file, 'r') as f:
lines = f.readlines()
chunk_size = len(lines) // num_processes
chunks = [
(i, lines[i*chunk_size:(i+1)*chunk_size])
for i in range(num_processes)
]
# Handle remainder
if len(lines) % num_processes:
chunks[-1] = (num_processes-1, lines[(num_processes-1)*chunk_size:])
# Process in parallel
with mp.Pool(num_processes) as pool:
results = pool.map(process_chunk, chunks)
# Flatten results and write
with open(output_file, 'w') as f:
for chunk_results in results:
for record in chunk_results:
f.write(json.dumps(record) + '\n')
# Memory-efficient streaming parallel processing
def stream_parallel_process(input_file, output_file, process_fn, chunk_size=10000):
"""Process large files without loading into memory"""
def reader_process(input_queue):
"""Read file and queue records"""
with open(input_file, 'r') as f:
batch = []
for line in f:
batch.append(line)
if len(batch) >= chunk_size:
input_queue.put(batch)
batch = []
if batch:
input_queue.put(batch)
input_queue.put(None) # Signal completion
def worker_process(input_queue, output_queue):
"""Process records from queue"""
while True:
batch = input_queue.get()
if batch is None:
break
results = []
for line in batch:
record = json.loads(line)
processed = process_fn(record)
results.append(processed)
output_queue.put(results)
output_queue.put(None)
def writer_process(output_queue, num_workers):
"""Write results to file"""
completed_workers = 0
with open(output_file, 'w') as f:
while completed_workers < num_workers:
results = output_queue.get()
if results is None:
completed_workers += 1
continue
for record in results:
f.write(json.dumps(record) + '\n')
# Create queues
input_queue = mp.Queue(maxsize=10)
output_queue = mp.Queue(maxsize=10)
# Start processes
num_workers = mp.cpu_count()
reader = mp.Process(target=reader_process, args=(input_queue,))
workers = [
mp.Process(target=worker_process, args=(input_queue, output_queue))
for _ in range(num_workers)
]
writer = mp.Process(target=writer_process, args=(output_queue, num_workers))
reader.start()
for w in workers:
w.start()
writer.start()
# Wait for completion
reader.join()
for w in workers:
w.join()
writer.join()
package main
import (
"bufio"
"encoding/json"
"fmt"
"os"
"runtime"
"sync"
)
type Record struct {
ID int `json:"id"`
Data string `json:"data"`
}
func processRecord(record Record) Record {
// Process record...
record.Data = strings.ToUpper(record.Data)
return record
}
func parallelProcessJSONL(inputFile, outputFile string) error {
numWorkers := runtime.NumCPU()
// Channels
recordsChan := make(chan Record, 1000)
resultsChan := make(chan Record, 1000)
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for record := range recordsChan {
processed := processRecord(record)
resultsChan <- processed
}
}()
}
// Writer goroutine
writerDone := make(chan bool)
go func() {
f, _ := os.Create(outputFile)
defer f.Close()
writer := bufio.NewWriter(f)
defer writer.Flush()
for result := range resultsChan {
data, _ := json.Marshal(result)
writer.Write(data)
writer.WriteString("\n")
}
writerDone <- true
}()
// Read and distribute work
f, err := os.Open(inputFile)
if err != nil {
return err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
var record Record
json.Unmarshal(scanner.Bytes(), &record)
recordsChan <- record
}
close(recordsChan)
wg.Wait()
close(resultsChan)
<-writerDone
return nil
}
func main() {
parallelProcessJSONL("input.jsonl", "output.jsonl")
}
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
#[derive(Debug, Serialize, Deserialize)]
struct Record {
id: i32,
data: String,
}
fn process_record(record: Record) -> Record {
// Transform record
Record {
id: record.id,
data: record.data.to_uppercase(),
}
}
fn parallel_process_jsonl(input_path: &str, output_path: &str) -> std::io::Result<()> {
// Read all lines
let file = File::open(input_path)?;
let reader = BufReader::new(file);
let lines: Vec = reader.lines().collect::>()?;
// Process in parallel with Rayon
let results: Vec = lines
.par_iter()
.filter_map(|line| serde_json::from_str::(line).ok())
.map(process_record)
.collect();
// Write results
let mut output = File::create(output_path)?;
for record in results {
let json = serde_json::to_string(&record)?;
writeln!(output, "{}", json)?;
}
Ok(())
}
fn main() {
parallel_process_jsonl("input.jsonl", "output.jsonl").unwrap();
}
import json
from typing import Iterator, Dict, Any
def stream_jsonl(filepath: str, buffer_size: int = 8192) -> Iterator[Dict[str, Any]]:
"""
Memory-efficient JSONL streaming
Uses only buffer_size bytes of memory regardless of file size
"""
with open(filepath, 'r', buffering=buffer_size) as f:
for line in f:
yield json.loads(line)
# Process 100GB file using 8KB memory
for record in stream_jsonl('huge_file.jsonl'):
process(record)
# Filter while streaming
def filter_stream(filepath: str, condition: callable) -> Iterator[Dict]:
"""Stream and filter without loading entire file"""
for record in stream_jsonl(filepath):
if condition(record):
yield record
# Usage: Extract specific records
matches = filter_stream('data.jsonl', lambda r: r.get('status') == 'active')
for match in matches:
print(match)
# Aggregate while streaming
def aggregate_stream(filepath: str, key: str) -> Dict:
"""Compute aggregates without loading full dataset"""
aggregates = {
'count': 0,
'sum': 0,
'min': float('inf'),
'max': float('-inf')
}
for record in stream_jsonl(filepath):
value = record.get(key, 0)
aggregates['count'] += 1
aggregates['sum'] += value
aggregates['min'] = min(aggregates['min'], value)
aggregates['max'] = max(aggregates['max'], value)
aggregates['average'] = aggregates['sum'] / aggregates['count']
return aggregates
# Memory profiling
import tracemalloc
def profile_memory():
"""Profile memory usage of different approaches"""
# Bad: Load entire file
tracemalloc.start()
with open('data.jsonl', 'r') as f:
data = [json.loads(line) for line in f]
current, peak = tracemalloc.get_traced_memory()
print(f"Load all: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()
# Good: Stream
tracemalloc.start()
for record in stream_jsonl('data.jsonl'):
pass
current, peak = tracemalloc.get_traced_memory()
print(f"Stream: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()
# Output:
# Load all: 2048.5 MB
# Stream: 0.8 MB
import json
from collections import deque
class RecordPool:
"""Reuse record objects to reduce GC pressure"""
def __init__(self, max_size=1000):
self.pool = deque(maxlen=max_size)
def get(self):
"""Get record from pool or create new"""
return self.pool.pop() if self.pool else {}
def release(self, record):
"""Return record to pool"""
record.clear()
self.pool.append(record)
def process_with_pooling(filepath):
"""Process JSONL with object pooling"""
pool = RecordPool()
with open(filepath, 'r') as f:
for line in f:
record = pool.get()
record.update(json.loads(line))
# Process record...
process(record)
# Return to pool
pool.release(record)
# Include version in every record
{"_version": "1.0", "id": 1, "name": "Alice"}
{"_version": "2.0", "id": 2, "name": "Bob", "email": "[email protected]"}
{"_version": "2.1", "id": 3, "name": "Charlie", "email": "[email protected]", "verified": true}
# Python: Handle multiple versions
def parse_record(line: str) -> dict:
"""Parse record with version-specific logic"""
record = json.loads(line)
version = record.get('_version', '1.0')
if version == '1.0':
# Migrate v1 to current schema
record['email'] = f"{record['name'].lower()}@example.com"
record['verified'] = False
elif version == '2.0':
# Add missing fields from v2.1
record.setdefault('verified', False)
# Remove version field from working data
record.pop('_version', None)
return record
# Batch migration to new version
def migrate_schema(input_file, output_file, target_version='3.0'):
"""Migrate JSONL file to new schema version"""
def migrate_to_v3(record):
"""Upgrade any version to v3"""
# Parse with version handling
parsed = parse_record(json.dumps(record))
# Add v3 fields
parsed['created_at'] = datetime.now().isoformat()
parsed['_version'] = target_version
return parsed
with open(input_file, 'r') as f_in:
with open(output_file, 'w') as f_out:
for line in f_in:
record = json.loads(line)
migrated = migrate_to_v3(record)
f_out.write(json.dumps(migrated) + '\n')
from jsonschema import validate, ValidationError
# Define schemas for each version
SCHEMAS = {
'1.0': {
'type': 'object',
'required': ['id', 'name'],
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string'}
}
},
'2.0': {
'type': 'object',
'required': ['id', 'name', 'email'],
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string'},
'email': {'type': 'string', 'format': 'email'}
}
}
}
def validate_jsonl(filepath, report_errors=True):
"""Validate all records against their declared schema"""
errors = []
valid_count = 0
with open(filepath, 'r') as f:
for line_num, line in enumerate(f, 1):
try:
record = json.loads(line)
version = record.get('_version', '1.0')
if version not in SCHEMAS:
errors.append({
'line': line_num,
'error': f'Unknown schema version: {version}'
})
continue
validate(instance=record, schema=SCHEMAS[version])
valid_count += 1
except json.JSONDecodeError as e:
errors.append({
'line': line_num,
'error': f'JSON parse error: {e}'
})
except ValidationError as e:
errors.append({
'line': line_num,
'error': f'Schema validation failed: {e.message}'
})
if report_errors:
for error in errors:
print(f"Line {error['line']}: {error['error']}")
return {
'valid': valid_count,
'errors': len(errors),
'error_details': errors
}
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark
spark = SparkSession.builder \
.appName("JSONL Processing") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "4") \
.getOrCreate()
# Read JSONL files (auto-parallelized)
df = spark.read.json("hdfs://data/*.jsonl")
# Process in parallel across cluster
result = df \
.filter(col("status") == "active") \
.groupBy("category") \
.agg(
count("*").alias("count"),
avg("price").alias("avg_price"),
sum("quantity").alias("total_quantity")
)
# Write results as JSONL
result.write.mode("overwrite").json("hdfs://output/aggregated.jsonl")
# Partition large datasets
df.repartition(100) \
.write \
.partitionBy("date", "category") \
.mode("overwrite") \
.json("hdfs://output/partitioned/")
import dask.bag as db
from dask.distributed import Client
import json
# Start distributed cluster
client = Client('scheduler-address:8786')
# Read JSONL with Dask
def parse_jsonl(filename):
with open(filename) as f:
for line in f:
yield json.loads(line)
# Distribute processing
bag = db.read_text('data/*.jsonl').map(json.loads)
# Parallel transformations
result = bag \
.filter(lambda r: r['status'] == 'active') \
.map(lambda r: {**r, 'processed': True}) \
.compute()
# Write results
bag.to_textfiles('output/*.jsonl', name_function=lambda i: f'part-{i:05d}.jsonl')
import json
class JSONLIndex:
"""Build offset index for random access"""
def __init__(self, filepath):
self.filepath = filepath
self.index = {} # key -> byte offset
self.build_index()
def build_index(self):
"""Build index of record locations"""
with open(self.filepath, 'rb') as f:
offset = 0
for line in f:
record = json.loads(line)
key = record.get('id')
if key:
self.index[key] = offset
offset = f.tell()
def get(self, key):
"""Get record by key in O(1) time"""
if key not in self.index:
return None
offset = self.index[key]
with open(self.filepath, 'rb') as f:
f.seek(offset)
line = f.readline()
return json.loads(line)
def save_index(self, index_file):
"""Save index to disk"""
with open(index_file, 'w') as f:
json.dump(self.index, f)
def load_index(self, index_file):
"""Load pre-built index"""
with open(index_file, 'r') as f:
self.index = json.load(f)
# Usage
index = JSONLIndex('large_file.jsonl')
record = index.get(12345) # Instant lookup
index.save_index('large_file.jsonl.index')