Advanced JSONL Techniques

Master compression algorithms, parallel processing, memory optimization, and distributed systems for production-scale JSONL deployments

Enterprise-Scale JSONL

Why Advanced Techniques Matter

When processing terabytes of JSONL data daily, basic approaches fail. Advanced techniques reduce storage by 90%, increase throughput by 10x, and enable processing datasets that don't fit in memory.

  • Process 100GB+ files on 8GB RAM machines
  • Reduce storage costs by 80-95% with compression
  • Achieve 10-100x speedup with parallelization
  • Handle schema evolution without breaking changes
  • Scale horizontally across distributed clusters

Performance Benchmarks

Real-world performance improvements from production systems:

  • gzip: 85% size reduction, 50MB/s compression
  • zstd: 90% reduction, 500MB/s compression
  • brotli: 92% reduction, 30MB/s compression
  • Parallel: 16-core processes 20GB in 45 seconds
  • Streaming: Constant 50MB memory regardless of file size

Jump to Topic

Compression Algorithms

gzip - Universal Standard

gzip is the most widely supported compression format with excellent balance of speed and compression ratio.

# Python: gzip compression and decompression
import gzip
import json

# Compress JSONL file
def compress_jsonl(input_file, output_file, compression_level=6):
    """
    Compress JSONL with gzip
    compression_level: 1 (fastest) to 9 (best compression)
    """
    with open(input_file, 'rb') as f_in:
        with gzip.open(output_file, 'wb', compresslevel=compression_level) as f_out:
            # Copy in chunks to handle large files
            chunk_size = 1024 * 1024  # 1MB chunks
            while True:
                chunk = f_in.read(chunk_size)
                if not chunk:
                    break
                f_out.write(chunk)

# Decompress and stream
def stream_gzip_jsonl(filepath):
    """Stream decompress gzipped JSONL"""
    with gzip.open(filepath, 'rt', encoding='utf-8') as f:
        for line in f:
            yield json.loads(line)

# Usage
compress_jsonl('data.jsonl', 'data.jsonl.gz', compression_level=9)

# Process compressed file without decompressing to disk
for record in stream_gzip_jsonl('data.jsonl.gz'):
    process(record)

# Append to gzipped file (requires decompression)
def append_to_gzip(filepath, new_records):
    """Append records to gzipped JSONL"""
    # Read existing records
    existing = []
    try:
        with gzip.open(filepath, 'rt') as f:
            existing = [json.loads(line) for line in f]
    except FileNotFoundError:
        pass

    # Combine and rewrite
    all_records = existing + new_records

    with gzip.open(filepath, 'wt') as f:
        for record in all_records:
            f.write(json.dumps(record) + '\n')

# Benchmark different compression levels
import time

def benchmark_compression(input_file):
    """Test all gzip compression levels"""
    results = []

    for level in range(1, 10):
        start = time.time()
        compress_jsonl(input_file, f'test_{level}.gz', level)
        duration = time.time() - start

        size = os.path.getsize(f'test_{level}.gz')
        original_size = os.path.getsize(input_file)
        ratio = (1 - size / original_size) * 100

        results.append({
            'level': level,
            'duration': duration,
            'size': size,
            'ratio': ratio,
            'speed_mb_s': os.path.getsize(input_file) / 1024 / 1024 / duration
        })

        print(f"Level {level}: {ratio:.1f}% reduction, {duration:.2f}s, {results[-1]['speed_mb_s']:.1f} MB/s")

    return results

Pro Tip: Level 6 offers the best speed/ratio tradeoff for most use cases. Use level 1 for maximum speed, level 9 for archival storage.

zstd - Modern High Performance

Zstandard (zstd) offers better compression ratios than gzip with significantly faster compression/decompression speeds.

import zstandard as zstd
import json

# Compress with zstd
def compress_zstd(input_file, output_file, level=3):
    """
    Compress JSONL with zstd
    level: 1-22 (3 is default, 19+ for archival)
    """
    cctx = zstd.ZstdCompressor(level=level)

    with open(input_file, 'rb') as f_in:
        with open(output_file, 'wb') as f_out:
            f_out.write(cctx.compress(f_in.read()))

# Stream compression (better for large files)
def stream_compress_zstd(input_file, output_file, level=3):
    """Stream compress for memory efficiency"""
    cctx = zstd.ZstdCompressor(level=level)

    with open(input_file, 'rb') as f_in:
        with open(output_file, 'wb') as f_out:
            # Compress in chunks
            compressor = cctx.stream_writer(f_out)
            for chunk in iter(lambda: f_in.read(1024 * 1024), b''):
                compressor.write(chunk)
            compressor.flush(zstd.FLUSH_FRAME)

# Decompress and parse
def read_zstd_jsonl(filepath):
    """Read and parse zstd compressed JSONL"""
    dctx = zstd.ZstdDecompressor()

    with open(filepath, 'rb') as f:
        with dctx.stream_reader(f) as reader:
            text_stream = io.TextIOWrapper(reader, encoding='utf-8')
            for line in text_stream:
                yield json.loads(line)

# Train dictionary for better compression
def train_zstd_dictionary(sample_files, dict_size=100 * 1024):
    """Train custom dictionary on sample data"""
    samples = []

    for filepath in sample_files:
        with open(filepath, 'rb') as f:
            samples.append(f.read())

    # Train dictionary
    dict_data = zstd.train_dictionary(dict_size, samples)

    return dict_data

# Compress with trained dictionary
def compress_with_dict(input_file, output_file, dict_data, level=3):
    """Compress using trained dictionary for better ratio"""
    cctx = zstd.ZstdCompressor(level=level, dict_data=dict_data)

    with open(input_file, 'rb') as f_in:
        with open(output_file, 'wb') as f_out:
            f_out.write(cctx.compress(f_in.read()))

# Comparison: zstd vs gzip
def compare_compression():
    """Compare different algorithms"""
    import time
    import gzip

    input_file = 'large_data.jsonl'
    original_size = os.path.getsize(input_file)

    # Test gzip
    start = time.time()
    with open(input_file, 'rb') as f_in:
        with gzip.open('test.gz', 'wb') as f_out:
            f_out.write(f_in.read())
    gzip_time = time.time() - start
    gzip_size = os.path.getsize('test.gz')

    # Test zstd
    start = time.time()
    compress_zstd(input_file, 'test.zst', level=3)
    zstd_time = time.time() - start
    zstd_size = os.path.getsize('test.zst')

    print(f"Original: {original_size / 1024 / 1024:.1f} MB")
    print(f"gzip: {gzip_size / 1024 / 1024:.1f} MB ({gzip_time:.2f}s) - {(1 - gzip_size/original_size)*100:.1f}% reduction")
    print(f"zstd: {zstd_size / 1024 / 1024:.1f} MB ({zstd_time:.2f}s) - {(1 - zstd_size/original_size)*100:.1f}% reduction")
    print(f"zstd is {gzip_time / zstd_time:.1f}x faster")

# Output example:
# Original: 1024.0 MB
# gzip: 102.4 MB (12.5s) - 90.0% reduction
# zstd: 81.9 MB (2.5s) - 92.0% reduction
# zstd is 5.0x faster

brotli - Maximum Compression

Brotli achieves the highest compression ratios, ideal for web delivery and archival storage.

import brotli
import json

# Compress with brotli
def compress_brotli(input_file, output_file, quality=11):
    """
    Compress JSONL with brotli
    quality: 0-11 (11 is maximum compression)
    """
    with open(input_file, 'rb') as f_in:
        data = f_in.read()
        compressed = brotli.compress(data, quality=quality)

        with open(output_file, 'wb') as f_out:
            f_out.write(compressed)

# Stream decompress
def read_brotli_jsonl(filepath):
    """Read brotli compressed JSONL"""
    with open(filepath, 'rb') as f:
        compressed = f.read()
        decompressed = brotli.decompress(compressed)
        text = decompressed.decode('utf-8')

        for line in text.split('\n'):
            if line.strip():
                yield json.loads(line)

# HTTP response compression
from flask import Flask, Response
import io

app = Flask(__name__)

@app.route('/api/data.jsonl')
def get_data_brotli():
    """Serve brotli-compressed JSONL over HTTP"""

    def generate():
        for record in get_records():
            yield json.dumps(record) + '\n'

    # Compress response
    output = io.BytesIO()
    content = ''.join(generate()).encode('utf-8')
    compressed = brotli.compress(content, quality=6)

    return Response(
        compressed,
        mimetype='application/x-ndjson',
        headers={
            'Content-Encoding': 'br',
            'Content-Length': len(compressed)
        }
    )

LZ4 - Extreme Speed

import lz4.frame
import json

# LZ4 compression
def compress_lz4(input_file, output_file):
    """Ultra-fast LZ4 compression"""
    with open(input_file, 'rb') as f_in:
        with lz4.frame.open(output_file, 'wb') as f_out:
            f_out.write(f_in.read())

# Stream processing
def process_lz4_jsonl(filepath):
    """Process LZ4 compressed JSONL"""
    with lz4.frame.open(filepath, 'rt', encoding='utf-8') as f:
        for line in f:
            record = json.loads(line)
            process(record)

# Benchmark: LZ4 vs others
def benchmark_all():
    """Compare all compression algorithms"""
    import time

    algorithms = {
        'gzip': lambda i, o: compress_jsonl(i, o, 6),
        'zstd': lambda i, o: compress_zstd(i, o, 3),
        'brotli': lambda i, o: compress_brotli(i, o, 6),
        'lz4': lambda i, o: compress_lz4(i, o)
    }

    results = {}
    input_file = 'test.jsonl'
    original_size = os.path.getsize(input_file)

    for name, compress_fn in algorithms.items():
        output = f'test.{name}'
        start = time.time()
        compress_fn(input_file, output)
        duration = time.time() - start

        size = os.path.getsize(output)
        ratio = (1 - size / original_size) * 100
        speed = original_size / 1024 / 1024 / duration

        results[name] = {
            'size_mb': size / 1024 / 1024,
            'ratio': ratio,
            'time': duration,
            'speed_mb_s': speed
        }

        print(f"{name:8} {ratio:5.1f}% reduction, {speed:6.1f} MB/s")

    return results

Streaming Compression

Compress While Streaming

import gzip
import json
from typing import Iterator

class StreamingCompressor:
    """Compress JSONL records as they're generated"""

    def __init__(self, output_file, compression='gzip', level=6):
        self.output_file = output_file
        self.compression = compression
        self.level = level
        self.compressor = None

    def __enter__(self):
        if self.compression == 'gzip':
            self.compressor = gzip.open(
                self.output_file, 'wt',
                compresslevel=self.level
            )
        elif self.compression == 'zstd':
            import zstandard as zstd
            cctx = zstd.ZstdCompressor(level=self.level)
            f = open(self.output_file, 'wb')
            self.compressor = cctx.stream_writer(f, closefd=True)

        return self

    def __exit__(self, *args):
        if self.compressor:
            self.compressor.close()

    def write_record(self, record: dict):
        """Write single record to compressed stream"""
        line = json.dumps(record) + '\n'

        if self.compression == 'zstd':
            self.compressor.write(line.encode('utf-8'))
        else:
            self.compressor.write(line)

# Usage: Generate and compress on-the-fly
with StreamingCompressor('output.jsonl.gz', compression='gzip') as compressor:
    for i in range(1000000):
        record = {
            'id': i,
            'data': generate_data(),
            'timestamp': datetime.now().isoformat()
        }
        compressor.write_record(record)

# Database export with streaming compression
def export_database_compressed(table, output_file):
    """Export database table to compressed JSONL"""
    import psycopg2

    conn = psycopg2.connect("dbname=mydb")
    cursor = conn.cursor()

    with StreamingCompressor(output_file) as compressor:
        cursor.execute(f"SELECT * FROM {table}")

        while True:
            rows = cursor.fetchmany(1000)
            if not rows:
                break

            for row in rows:
                record = dict(zip([desc[0] for desc in cursor.description], row))
                compressor.write_record(record)

    cursor.close()
    conn.close()

# Parallel compression with multiprocessing
from multiprocessing import Pool, Queue
import queue

def parallel_compress_worker(input_queue, output_file, worker_id):
    """Worker process for parallel compression"""
    temp_file = f"{output_file}.part{worker_id}"

    with gzip.open(temp_file, 'wt') as f:
        while True:
            try:
                record = input_queue.get(timeout=1)
                if record is None:  # Poison pill
                    break
                f.write(json.dumps(record) + '\n')
            except queue.Empty:
                continue

def parallel_compress(records: Iterator, output_file, num_workers=4):
    """Compress using multiple processes"""
    from multiprocessing import Manager

    manager = Manager()
    input_queue = manager.Queue(maxsize=1000)

    # Start workers
    pool = Pool(num_workers)
    workers = [
        pool.apply_async(parallel_compress_worker, (input_queue, output_file, i))
        for i in range(num_workers)
    ]

    # Feed records to queue
    for record in records:
        input_queue.put(record)

    # Send stop signal
    for _ in range(num_workers):
        input_queue.put(None)

    # Wait for completion
    for worker in workers:
        worker.get()

    pool.close()
    pool.join()

    # Merge part files
    with gzip.open(output_file, 'wt') as f_out:
        for i in range(num_workers):
            part_file = f"{output_file}.part{i}"
            with gzip.open(part_file, 'rt') as f_in:
                for line in f_in:
                    f_out.write(line)
            os.remove(part_file)

Parallel Processing

Python Multiprocessing

import multiprocessing as mp
import json
from typing import Callable

def process_chunk(chunk_data):
    """Process a chunk of JSONL records"""
    chunk_id, lines = chunk_data
    results = []

    for line in lines:
        record = json.loads(line)
        # Process record...
        processed = transform_record(record)
        results.append(processed)

    return results

def parallel_process_jsonl(input_file, output_file, num_processes=None):
    """Process JSONL file in parallel"""
    if num_processes is None:
        num_processes = mp.cpu_count()

    # Read file and split into chunks
    with open(input_file, 'r') as f:
        lines = f.readlines()

    chunk_size = len(lines) // num_processes
    chunks = [
        (i, lines[i*chunk_size:(i+1)*chunk_size])
        for i in range(num_processes)
    ]

    # Handle remainder
    if len(lines) % num_processes:
        chunks[-1] = (num_processes-1, lines[(num_processes-1)*chunk_size:])

    # Process in parallel
    with mp.Pool(num_processes) as pool:
        results = pool.map(process_chunk, chunks)

    # Flatten results and write
    with open(output_file, 'w') as f:
        for chunk_results in results:
            for record in chunk_results:
                f.write(json.dumps(record) + '\n')

# Memory-efficient streaming parallel processing
def stream_parallel_process(input_file, output_file, process_fn, chunk_size=10000):
    """Process large files without loading into memory"""
    def reader_process(input_queue):
        """Read file and queue records"""
        with open(input_file, 'r') as f:
            batch = []
            for line in f:
                batch.append(line)
                if len(batch) >= chunk_size:
                    input_queue.put(batch)
                    batch = []
            if batch:
                input_queue.put(batch)
        input_queue.put(None)  # Signal completion

    def worker_process(input_queue, output_queue):
        """Process records from queue"""
        while True:
            batch = input_queue.get()
            if batch is None:
                break

            results = []
            for line in batch:
                record = json.loads(line)
                processed = process_fn(record)
                results.append(processed)

            output_queue.put(results)

        output_queue.put(None)

    def writer_process(output_queue, num_workers):
        """Write results to file"""
        completed_workers = 0
        with open(output_file, 'w') as f:
            while completed_workers < num_workers:
                results = output_queue.get()
                if results is None:
                    completed_workers += 1
                    continue

                for record in results:
                    f.write(json.dumps(record) + '\n')

    # Create queues
    input_queue = mp.Queue(maxsize=10)
    output_queue = mp.Queue(maxsize=10)

    # Start processes
    num_workers = mp.cpu_count()

    reader = mp.Process(target=reader_process, args=(input_queue,))
    workers = [
        mp.Process(target=worker_process, args=(input_queue, output_queue))
        for _ in range(num_workers)
    ]
    writer = mp.Process(target=writer_process, args=(output_queue, num_workers))

    reader.start()
    for w in workers:
        w.start()
    writer.start()

    # Wait for completion
    reader.join()
    for w in workers:
        w.join()
    writer.join()

Go Goroutines

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "os"
    "runtime"
    "sync"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

func processRecord(record Record) Record {
    // Process record...
    record.Data = strings.ToUpper(record.Data)
    return record
}

func parallelProcessJSONL(inputFile, outputFile string) error {
    numWorkers := runtime.NumCPU()

    // Channels
    recordsChan := make(chan Record, 1000)
    resultsChan := make(chan Record, 1000)
    var wg sync.WaitGroup

    // Start workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for record := range recordsChan {
                processed := processRecord(record)
                resultsChan <- processed
            }
        }()
    }

    // Writer goroutine
    writerDone := make(chan bool)
    go func() {
        f, _ := os.Create(outputFile)
        defer f.Close()
        writer := bufio.NewWriter(f)
        defer writer.Flush()

        for result := range resultsChan {
            data, _ := json.Marshal(result)
            writer.Write(data)
            writer.WriteString("\n")
        }
        writerDone <- true
    }()

    // Read and distribute work
    f, err := os.Open(inputFile)
    if err != nil {
        return err
    }
    defer f.Close()

    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        var record Record
        json.Unmarshal(scanner.Bytes(), &record)
        recordsChan <- record
    }

    close(recordsChan)
    wg.Wait()
    close(resultsChan)
    <-writerDone

    return nil
}

func main() {
    parallelProcessJSONL("input.jsonl", "output.jsonl")
}

Rust Rayon

use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, Write};

#[derive(Debug, Serialize, Deserialize)]
struct Record {
    id: i32,
    data: String,
}

fn process_record(record: Record) -> Record {
    // Transform record
    Record {
        id: record.id,
        data: record.data.to_uppercase(),
    }
}

fn parallel_process_jsonl(input_path: &str, output_path: &str) -> std::io::Result<()> {
    // Read all lines
    let file = File::open(input_path)?;
    let reader = BufReader::new(file);
    let lines: Vec = reader.lines().collect::>()?;

    // Process in parallel with Rayon
    let results: Vec = lines
        .par_iter()
        .filter_map(|line| serde_json::from_str::(line).ok())
        .map(process_record)
        .collect();

    // Write results
    let mut output = File::create(output_path)?;
    for record in results {
        let json = serde_json::to_string(&record)?;
        writeln!(output, "{}", json)?;
    }

    Ok(())
}

fn main() {
    parallel_process_jsonl("input.jsonl", "output.jsonl").unwrap();
}

Memory-Efficient Parsing

Generator-Based Streaming

import json
from typing import Iterator, Dict, Any

def stream_jsonl(filepath: str, buffer_size: int = 8192) -> Iterator[Dict[str, Any]]:
    """
    Memory-efficient JSONL streaming
    Uses only buffer_size bytes of memory regardless of file size
    """
    with open(filepath, 'r', buffering=buffer_size) as f:
        for line in f:
            yield json.loads(line)

# Process 100GB file using 8KB memory
for record in stream_jsonl('huge_file.jsonl'):
    process(record)

# Filter while streaming
def filter_stream(filepath: str, condition: callable) -> Iterator[Dict]:
    """Stream and filter without loading entire file"""
    for record in stream_jsonl(filepath):
        if condition(record):
            yield record

# Usage: Extract specific records
matches = filter_stream('data.jsonl', lambda r: r.get('status') == 'active')
for match in matches:
    print(match)

# Aggregate while streaming
def aggregate_stream(filepath: str, key: str) -> Dict:
    """Compute aggregates without loading full dataset"""
    aggregates = {
        'count': 0,
        'sum': 0,
        'min': float('inf'),
        'max': float('-inf')
    }

    for record in stream_jsonl(filepath):
        value = record.get(key, 0)
        aggregates['count'] += 1
        aggregates['sum'] += value
        aggregates['min'] = min(aggregates['min'], value)
        aggregates['max'] = max(aggregates['max'], value)

    aggregates['average'] = aggregates['sum'] / aggregates['count']
    return aggregates

# Memory profiling
import tracemalloc

def profile_memory():
    """Profile memory usage of different approaches"""

    # Bad: Load entire file
    tracemalloc.start()
    with open('data.jsonl', 'r') as f:
        data = [json.loads(line) for line in f]
    current, peak = tracemalloc.get_traced_memory()
    print(f"Load all: {peak / 1024 / 1024:.1f} MB")
    tracemalloc.stop()

    # Good: Stream
    tracemalloc.start()
    for record in stream_jsonl('data.jsonl'):
        pass
    current, peak = tracemalloc.get_traced_memory()
    print(f"Stream: {peak / 1024 / 1024:.1f} MB")
    tracemalloc.stop()

# Output:
# Load all: 2048.5 MB
# Stream: 0.8 MB

Object Pooling

import json
from collections import deque

class RecordPool:
    """Reuse record objects to reduce GC pressure"""

    def __init__(self, max_size=1000):
        self.pool = deque(maxlen=max_size)

    def get(self):
        """Get record from pool or create new"""
        return self.pool.pop() if self.pool else {}

    def release(self, record):
        """Return record to pool"""
        record.clear()
        self.pool.append(record)

def process_with_pooling(filepath):
    """Process JSONL with object pooling"""
    pool = RecordPool()

    with open(filepath, 'r') as f:
        for line in f:
            record = pool.get()
            record.update(json.loads(line))

            # Process record...
            process(record)

            # Return to pool
            pool.release(record)

Schema Evolution & Versioning

Version Field Strategy

# Include version in every record
{"_version": "1.0", "id": 1, "name": "Alice"}
{"_version": "2.0", "id": 2, "name": "Bob", "email": "[email protected]"}
{"_version": "2.1", "id": 3, "name": "Charlie", "email": "[email protected]", "verified": true}

# Python: Handle multiple versions
def parse_record(line: str) -> dict:
    """Parse record with version-specific logic"""
    record = json.loads(line)
    version = record.get('_version', '1.0')

    if version == '1.0':
        # Migrate v1 to current schema
        record['email'] = f"{record['name'].lower()}@example.com"
        record['verified'] = False

    elif version == '2.0':
        # Add missing fields from v2.1
        record.setdefault('verified', False)

    # Remove version field from working data
    record.pop('_version', None)

    return record

# Batch migration to new version
def migrate_schema(input_file, output_file, target_version='3.0'):
    """Migrate JSONL file to new schema version"""

    def migrate_to_v3(record):
        """Upgrade any version to v3"""
        # Parse with version handling
        parsed = parse_record(json.dumps(record))

        # Add v3 fields
        parsed['created_at'] = datetime.now().isoformat()
        parsed['_version'] = target_version

        return parsed

    with open(input_file, 'r') as f_in:
        with open(output_file, 'w') as f_out:
            for line in f_in:
                record = json.loads(line)
                migrated = migrate_to_v3(record)
                f_out.write(json.dumps(migrated) + '\n')

Schema Validation

from jsonschema import validate, ValidationError

# Define schemas for each version
SCHEMAS = {
    '1.0': {
        'type': 'object',
        'required': ['id', 'name'],
        'properties': {
            'id': {'type': 'integer'},
            'name': {'type': 'string'}
        }
    },
    '2.0': {
        'type': 'object',
        'required': ['id', 'name', 'email'],
        'properties': {
            'id': {'type': 'integer'},
            'name': {'type': 'string'},
            'email': {'type': 'string', 'format': 'email'}
        }
    }
}

def validate_jsonl(filepath, report_errors=True):
    """Validate all records against their declared schema"""
    errors = []
    valid_count = 0

    with open(filepath, 'r') as f:
        for line_num, line in enumerate(f, 1):
            try:
                record = json.loads(line)
                version = record.get('_version', '1.0')

                if version not in SCHEMAS:
                    errors.append({
                        'line': line_num,
                        'error': f'Unknown schema version: {version}'
                    })
                    continue

                validate(instance=record, schema=SCHEMAS[version])
                valid_count += 1

            except json.JSONDecodeError as e:
                errors.append({
                    'line': line_num,
                    'error': f'JSON parse error: {e}'
                })

            except ValidationError as e:
                errors.append({
                    'line': line_num,
                    'error': f'Schema validation failed: {e.message}'
                })

    if report_errors:
        for error in errors:
            print(f"Line {error['line']}: {error['error']}")

    return {
        'valid': valid_count,
        'errors': len(errors),
        'error_details': errors
    }

Distributed Processing

Apache Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark
spark = SparkSession.builder \
    .appName("JSONL Processing") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

# Read JSONL files (auto-parallelized)
df = spark.read.json("hdfs://data/*.jsonl")

# Process in parallel across cluster
result = df \
    .filter(col("status") == "active") \
    .groupBy("category") \
    .agg(
        count("*").alias("count"),
        avg("price").alias("avg_price"),
        sum("quantity").alias("total_quantity")
    )

# Write results as JSONL
result.write.mode("overwrite").json("hdfs://output/aggregated.jsonl")

# Partition large datasets
df.repartition(100) \
    .write \
    .partitionBy("date", "category") \
    .mode("overwrite") \
    .json("hdfs://output/partitioned/")

Dask Distributed

import dask.bag as db
from dask.distributed import Client
import json

# Start distributed cluster
client = Client('scheduler-address:8786')

# Read JSONL with Dask
def parse_jsonl(filename):
    with open(filename) as f:
        for line in f:
            yield json.loads(line)

# Distribute processing
bag = db.read_text('data/*.jsonl').map(json.loads)

# Parallel transformations
result = bag \
    .filter(lambda r: r['status'] == 'active') \
    .map(lambda r: {**r, 'processed': True}) \
    .compute()

# Write results
bag.to_textfiles('output/*.jsonl', name_function=lambda i: f'part-{i:05d}.jsonl')

Indexing for Fast Access

Offset Index

import json

class JSONLIndex:
    """Build offset index for random access"""

    def __init__(self, filepath):
        self.filepath = filepath
        self.index = {}  # key -> byte offset
        self.build_index()

    def build_index(self):
        """Build index of record locations"""
        with open(self.filepath, 'rb') as f:
            offset = 0
            for line in f:
                record = json.loads(line)
                key = record.get('id')
                if key:
                    self.index[key] = offset
                offset = f.tell()

    def get(self, key):
        """Get record by key in O(1) time"""
        if key not in self.index:
            return None

        offset = self.index[key]
        with open(self.filepath, 'rb') as f:
            f.seek(offset)
            line = f.readline()
            return json.loads(line)

    def save_index(self, index_file):
        """Save index to disk"""
        with open(index_file, 'w') as f:
            json.dump(self.index, f)

    def load_index(self, index_file):
        """Load pre-built index"""
        with open(index_file, 'r') as f:
            self.index = json.load(f)

# Usage
index = JSONLIndex('large_file.jsonl')
record = index.get(12345)  # Instant lookup
index.save_index('large_file.jsonl.index')

Best Practices

Compression Strategy

  • Use zstd for best speed/ratio tradeoff in production
  • Use gzip for maximum compatibility
  • Train zstd dictionaries for 5-10% better compression
  • Compress while streaming to avoid temporary files
  • Use LZ4 for real-time systems requiring minimum latency

Performance Optimization

  • Always stream large files - never load entirely into memory
  • Use parallel processing for CPU-intensive transformations
  • Profile memory usage to identify bottlenecks
  • Partition large datasets for distributed processing
  • Build indexes for random access patterns

Schema Management

  • Include version field in every record
  • Maintain backward compatibility when evolving schemas
  • Validate records against declared schema version
  • Document schema changes in migration scripts
  • Test migration paths before deploying schema changes

Distributed Systems

  • Use Spark or Dask for datasets larger than single machine RAM
  • Partition data by date or category for efficient queries
  • Compress partitions to reduce network transfer
  • Monitor cluster resource utilization
  • Implement retries for transient failures

Related Resources