JSONL by Language

Complete guides for working with JSONL in Python, JavaScript, Go, Java, and Rust

Python

Reading JSONL (Standard Library)

The simplest approach using Python's built-in json module:

import json

# Basic line-by-line reading
with open('data.jsonl', 'r', encoding='utf-8') as f:
    for line in f:
        # Skip empty lines
        line = line.strip()
        if not line:
            continue

        # Parse JSON
        record = json.loads(line)
        print(record['name'])

# With error handling
with open('data.jsonl', 'r', encoding='utf-8') as f:
    for line_num, line in enumerate(f, 1):
        try:
            record = json.loads(line)
            process(record)
        except json.JSONDecodeError as e:
            print(f"Error on line {line_num}: {e}")
            continue

Writing JSONL

import json

# Write list of dicts to JSONL
data = [
    {"id": 1, "name": "Alice", "age": 30},
    {"id": 2, "name": "Bob", "age": 25},
]

with open('output.jsonl', 'w', encoding='utf-8') as f:
    for record in data:
        # Write JSON + newline
        f.write(json.dumps(record) + '\n')

# Append to existing file
with open('output.jsonl', 'a', encoding='utf-8') as f:
    new_record = {"id": 3, "name": "Charlie", "age": 35}
    f.write(json.dumps(new_record) + '\n')

# Pretty-print with ensure_ascii=False for Unicode
with open('output.jsonl', 'w', encoding='utf-8') as f:
    for record in data:
        f.write(json.dumps(record, ensure_ascii=False) + '\n')

High-Performance Parsing (orjson)

orjson is 3-5x faster than standard library and handles more edge cases:

# Install: pip install orjson
import orjson

# Read JSONL (note: read as binary 'rb')
with open('data.jsonl', 'rb') as f:
    for line in f:
        record = orjson.loads(line)
        print(record['name'])

# Write JSONL (note: orjson.dumps returns bytes)
with open('output.jsonl', 'wb') as f:
    for record in data:
        # orjson.dumps() returns bytes, add b'\n'
        f.write(orjson.dumps(record) + b'\n')

# Pretty formatting option
with open('output.jsonl', 'wb') as f:
    for record in data:
        json_bytes = orjson.dumps(
            record,
            option=orjson.OPT_INDENT_2  # Pretty print (not typical for JSONL)
        )
        f.write(json_bytes + b'\n')

Performance tip: orjson is especially fast for large objects and handles datetime serialization automatically.

Working with Pandas

Pandas has native JSONL support via lines=True parameter:

import pandas as pd

# Read JSONL into DataFrame
df = pd.read_json('data.jsonl', lines=True)
print(df.head())

# Write DataFrame to JSONL
df.to_json('output.jsonl', orient='records', lines=True)

# Read compressed JSONL
df = pd.read_json('data.jsonl.gz', lines=True, compression='gzip')

# Chunked reading for large files (memory efficient)
chunk_size = 10000
chunks = []

for chunk in pd.read_json('large.jsonl', lines=True, chunksize=chunk_size):
    # Process each chunk
    filtered = chunk[chunk['age'] > 25]
    chunks.append(filtered)

# Combine all chunks
result = pd.concat(chunks, ignore_index=True)

# Stream processing without loading all into memory
with pd.read_json('large.jsonl', lines=True, chunksize=10000) as reader:
    for chunk in reader:
        # Process and write incrementally
        processed = transform(chunk)
        processed.to_json('output.jsonl', orient='records', lines=True, mode='a')

Streaming Large Files

Process files larger than available RAM:

import json

def stream_jsonl(filepath, batch_size=1000):
    """Generator that yields batches of records"""
    batch = []

    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                record = json.loads(line)
                batch.append(record)

                if len(batch) >= batch_size:
                    yield batch
                    batch = []
            except json.JSONDecodeError:
                continue

    # Yield remaining records
    if batch:
        yield batch

# Usage: process in batches
for batch in stream_jsonl('huge.jsonl', batch_size=5000):
    # Process batch (5000 records at a time)
    results = bulk_process(batch)
    save_results(results)

# Filter and transform streaming
def filter_transform_jsonl(input_path, output_path, condition):
    with open(input_path, 'r') as fin, open(output_path, 'w') as fout:
        for line in fin:
            record = json.loads(line)

            # Apply filter
            if condition(record):
                # Transform
                transformed = transform(record)
                fout.write(json.dumps(transformed) + '\n')

# Example: Extract active users
filter_transform_jsonl(
    'users.jsonl',
    'active_users.jsonl',
    lambda r: r.get('status') == 'active'
)

Working with Compressed JSONL

import gzip
import json

# Read gzipped JSONL (streaming)
with gzip.open('data.jsonl.gz', 'rt', encoding='utf-8') as f:
    for line in f:
        record = json.loads(line)
        process(record)

# Write gzipped JSONL
with gzip.open('output.jsonl.gz', 'wt', encoding='utf-8') as f:
    for record in data:
        f.write(json.dumps(record) + '\n')

# Transparent compression wrapper
def open_jsonl(filepath, mode='r'):
    """Open JSONL file, handling .gz automatically"""
    if filepath.endswith('.gz'):
        return gzip.open(filepath, mode + 't', encoding='utf-8')
    else:
        return open(filepath, mode, encoding='utf-8')

# Usage works for both compressed and uncompressed
with open_jsonl('data.jsonl.gz', 'r') as f:
    for line in f:
        record = json.loads(line)

# Read bzip2, xz, or zstd
import bz2
import lzma

# bzip2
with bz2.open('data.jsonl.bz2', 'rt') as f:
    for line in f:
        record = json.loads(line)

# xz/lzma
with lzma.open('data.jsonl.xz', 'rt') as f:
    for line in f:
        record = json.loads(line)

Parallel Processing

from multiprocessing import Pool
import json

def process_chunk(lines):
    """Process a chunk of lines"""
    results = []
    for line in lines:
        try:
            record = json.loads(line)
            result = expensive_computation(record)
            results.append(result)
        except:
            continue
    return results

def parallel_process_jsonl(filepath, num_workers=4, chunk_size=10000):
    """Process JSONL in parallel"""
    # Read file into chunks
    chunks = []
    current_chunk = []

    with open(filepath, 'r') as f:
        for line in f:
            current_chunk.append(line)

            if len(current_chunk) >= chunk_size:
                chunks.append(current_chunk)
                current_chunk = []

    if current_chunk:
        chunks.append(current_chunk)

    # Process chunks in parallel
    with Pool(processes=num_workers) as pool:
        all_results = pool.map(process_chunk, chunks)

    # Flatten results
    final_results = [item for sublist in all_results for item in sublist]
    return final_results

# Usage
results = parallel_process_jsonl('data.jsonl', num_workers=8)

# Alternative: concurrent.futures for more control
from concurrent.futures import ProcessPoolExecutor, as_completed

def process_file_parallel(filepath):
    # Split file into parts
    chunks = split_file_into_chunks(filepath, num_chunks=10)

    with ProcessPoolExecutor(max_workers=10) as executor:
        # Submit all chunks
        futures = {executor.submit(process_chunk_file, chunk): chunk
                   for chunk in chunks}

        # Collect results as they complete
        for future in as_completed(futures):
            chunk = futures[future]
            try:
                result = future.result()
                print(f"Chunk {chunk} completed")
            except Exception as e:
                print(f"Chunk {chunk} failed: {e}")

Streaming with ijson

ijson provides iterative JSON parsing for ultra-large files:

# Install: pip install ijson
import ijson

# Stream large JSONL files with minimal memory
with open('large.jsonl', 'rb') as f:
    # Parse each JSON object as it's encountered
    objects = ijson.items(f, '', multiple_values=True)

    for obj in objects:
        print(obj['name'])
        process(obj)

# Memory-efficient: only loads one object at a time
# Perfect for files larger than available RAM

# Advanced: extract specific fields only
with open('data.jsonl', 'rb') as f:
    # Only parse the 'name' field from each object
    names = ijson.items(f, 'item.name', multiple_values=True)
    for name in names:
        print(name)

# Combine with generators for pipelines
def extract_active_users(filepath):
    with open(filepath, 'rb') as f:
        objects = ijson.items(f, '', multiple_values=True)
        for obj in objects:
            if obj.get('status') == 'active':
                yield obj

# Use in pipeline
for user in extract_active_users('users.jsonl'):
    process(user)

Best for: Files too large to fit in memory, where you need true streaming parsing.

Using jsonlines Library

The jsonlines library provides a clean API for JSONL operations:

# Install: pip install jsonlines
import jsonlines

# Read JSONL
with jsonlines.open('data.jsonl') as reader:
    for record in reader:
        print(record['name'])

# Write JSONL
with jsonlines.open('output.jsonl', mode='w') as writer:
    writer.write({'name': 'Alice', 'age': 30})
    writer.write({'name': 'Bob', 'age': 25})
    # Or write multiple at once
    writer.write_all([
        {'name': 'Charlie', 'age': 35},
        {'name': 'Diana', 'age': 28}
    ])

# Read with compression
with jsonlines.open('data.jsonl.gz') as reader:
    for record in reader:
        process(record)

# Skip invalid lines
with jsonlines.open('data.jsonl', mode='r') as reader:
    for record in reader.iter(skip_invalid=True):
        process(record)

# Type checking with dataclasses
from dataclasses import dataclass

@dataclass
class User:
    id: int
    name: str
    age: int

with jsonlines.open('users.jsonl') as reader:
    for obj in reader:
        user = User(**obj)  # Convert to dataclass
        print(user.name)

Complete Python Example: ETL Pipeline

#!/usr/bin/env python3
"""
ETL Pipeline: Read JSONL, transform, filter, write output
Handles large files, errors, and compression
"""
import json
import gzip
import logging
from typing import Iterator, Dict, Any
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def read_jsonl(filepath: str) -> Iterator[Dict[str, Any]]:
    """Read JSONL file (handles .gz compression)"""
    open_func = gzip.open if filepath.endswith('.gz') else open

    with open_func(filepath, 'rt', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue

            try:
                yield json.loads(line)
            except json.JSONDecodeError as e:
                logger.error(f"Parse error line {line_num}: {e}")


def write_jsonl(records: Iterator[Dict[str, Any]], filepath: str):
    """Write records to JSONL file (handles .gz compression)"""
    open_func = gzip.open if filepath.endswith('.gz') else open

    with open_func(filepath, 'wt', encoding='utf-8') as f:
        for record in records:
            f.write(json.dumps(record) + '\n')


def transform(record: Dict[str, Any]) -> Dict[str, Any]:
    """Transform record (example: add computed field)"""
    record['full_name'] = f"{record['first_name']} {record['last_name']}"
    record['is_adult'] = record['age'] >= 18
    return record


def filter_record(record: Dict[str, Any]) -> bool:
    """Filter logic (example: active users only)"""
    return record.get('status') == 'active'


def etl_pipeline(input_file: str, output_file: str):
    """Complete ETL pipeline"""
    logger.info(f"Starting ETL: {input_file} -> {output_file}")

    processed = 0
    filtered = 0

    records = read_jsonl(input_file)
    transformed = (transform(r) for r in records)
    filtered_records = (r for r in transformed if filter_record(r))

    write_jsonl(filtered_records, output_file)

    logger.info(f"ETL complete. Processed: {processed}, Output: {filtered}")


if __name__ == '__main__':
    etl_pipeline('input.jsonl.gz', 'output.jsonl.gz')

JavaScript / Node.js

Reading JSONL (Node.js)

const fs = require('fs');
const readline = require('readline');

// Basic line-by-line reading
async function readJSONL(filepath) {
    const fileStream = fs.createReadStream(filepath);

    const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity  // Handle both \n and \r\n
    });

    for await (const line of rl) {
        if (!line.trim()) continue;  // Skip empty lines

        try {
            const record = JSON.parse(line);
            console.log(record.name);
        } catch (err) {
            console.error('Parse error:', err.message);
        }
    }
}

// Usage
readJSONL('data.jsonl');

// Callback-based approach
function readJSONLCallback(filepath, onRecord, onComplete) {
    const fileStream = fs.createReadStream(filepath);
    const rl = readline.createInterface({ input: fileStream });

    rl.on('line', (line) => {
        try {
            const record = JSON.parse(line);
            onRecord(record);
        } catch (err) {
            console.error('Parse error:', err);
        }
    });

    rl.on('close', onComplete);
}

Writing JSONL

const fs = require('fs');

// Basic writing
const data = [
    { id: 1, name: 'Alice', age: 30 },
    { id: 2, name: 'Bob', age: 25 }
];

const writeStream = fs.createWriteStream('output.jsonl');

data.forEach(record => {
    writeStream.write(JSON.stringify(record) + '\n');
});

writeStream.end();

// Async/await with promises
async function writeJSONL(records, filepath) {
    const writeStream = fs.createWriteStream(filepath);

    for (const record of records) {
        const json = JSON.stringify(record) + '\n';

        // Wait if buffer is full
        if (!writeStream.write(json)) {
            await new Promise(resolve => writeStream.once('drain', resolve));
        }
    }

    writeStream.end();
    return new Promise(resolve => writeStream.once('finish', resolve));
}

// Usage
await writeJSONL(data, 'output.jsonl');

// Append to existing file
const appendStream = fs.createWriteStream('output.jsonl', { flags: 'a' });
appendStream.write(JSON.stringify({ id: 3, name: 'Charlie' }) + '\n');
appendStream.end();

High-Performance Streaming (ndjson)

The ndjson npm package provides optimized streaming:

// Install: npm install ndjson
const fs = require('fs');
const ndjson = require('ndjson');

// Read JSONL with streaming parser
fs.createReadStream('data.jsonl')
    .pipe(ndjson.parse())
    .on('data', (record) => {
        console.log(record.name);
    })
    .on('end', () => {
        console.log('Done');
    });

// Write JSONL with streaming serializer
const writeStream = fs.createWriteStream('output.jsonl');
const stringify = ndjson.stringify();

stringify.pipe(writeStream);

// Write objects
stringify.write({ id: 1, name: 'Alice' });
stringify.write({ id: 2, name: 'Bob' });
stringify.end();

// Transform pipeline
fs.createReadStream('input.jsonl')
    .pipe(ndjson.parse())
    .pipe(ndjson.stringify())
    .pipe(fs.createWriteStream('output.jsonl'));

Transform Streams

const { Transform } = require('stream');
const fs = require('fs');
const ndjson = require('ndjson');

// Custom transform stream
class FilterTransform extends Transform {
    constructor(filterFn) {
        super({ objectMode: true });
        this.filterFn = filterFn;
    }

    _transform(record, encoding, callback) {
        if (this.filterFn(record)) {
            // Transform record
            record.processed = true;
            record.timestamp = Date.now();
            this.push(record);
        }
        callback();
    }
}

// Usage: filter active users
const filterActive = new FilterTransform(r => r.status === 'active');

fs.createReadStream('users.jsonl')
    .pipe(ndjson.parse())
    .pipe(filterActive)
    .pipe(ndjson.stringify())
    .pipe(fs.createWriteStream('active_users.jsonl'));

// Pipeline with multiple transforms
const { pipeline } = require('stream/promises');

async function processJSONL() {
    await pipeline(
        fs.createReadStream('input.jsonl'),
        ndjson.parse(),
        new FilterTransform(r => r.age > 25),
        ndjson.stringify(),
        fs.createWriteStream('output.jsonl')
    );
    console.log('Pipeline complete');
}

processJSONL();

Compressed JSONL

const fs = require('fs');
const zlib = require('zlib');
const readline = require('readline');

// Read gzipped JSONL
async function readGzippedJSONL(filepath) {
    const fileStream = fs.createReadStream(filepath);
    const gunzip = zlib.createGunzip();
    const rl = readline.createInterface({
        input: fileStream.pipe(gunzip),
        crlfDelay: Infinity
    });

    for await (const line of rl) {
        const record = JSON.parse(line);
        console.log(record);
    }
}

// Write gzipped JSONL
async function writeGzippedJSONL(records, filepath) {
    const writeStream = fs.createWriteStream(filepath);
    const gzip = zlib.createGzip();

    gzip.pipe(writeStream);

    for (const record of records) {
        gzip.write(JSON.stringify(record) + '\n');
    }

    gzip.end();
    return new Promise(resolve => writeStream.on('finish', resolve));
}

// With ndjson
const ndjson = require('ndjson');

fs.createReadStream('data.jsonl.gz')
    .pipe(zlib.createGunzip())
    .pipe(ndjson.parse())
    .on('data', record => {
        console.log(record);
    });

HTTP Streaming API

const express = require('express');
const fs = require('fs');
const ndjson = require('ndjson');

const app = express();

// Streaming JSONL endpoint
app.get('/api/data', (req, res) => {
    res.setHeader('Content-Type', 'application/x-ndjson');
    res.setHeader('Transfer-Encoding', 'chunked');

    fs.createReadStream('data.jsonl')
        .pipe(ndjson.parse())
        .pipe(ndjson.stringify())
        .pipe(res);
});

// Query database and stream results
app.get('/api/users', async (req, res) => {
    res.setHeader('Content-Type', 'application/x-ndjson');

    const cursor = db.collection('users').find().stream();

    cursor.on('data', (doc) => {
        res.write(JSON.stringify(doc) + '\n');
    });

    cursor.on('end', () => {
        res.end();
    });
});

app.listen(3000);

// Client: consume streaming JSONL
const https = require('https');
const readline = require('readline');

https.get('https://api.example.com/data', (res) => {
    const rl = readline.createInterface({ input: res });

    rl.on('line', (line) => {
        const record = JSON.parse(line);
        console.log(record);
    });
});

Browser Usage (Fetch API)

// Streaming JSONL in the browser
async function fetchStreamingJSONL(url) {
    const response = await fetch(url);
    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();

        if (done) break;

        // Decode chunk
        buffer += decoder.decode(value, { stream: true });

        // Process complete lines
        const lines = buffer.split('\n');
        buffer = lines.pop(); // Keep incomplete line in buffer

        for (const line of lines) {
            if (line.trim()) {
                const record = JSON.parse(line);
                console.log(record);
                // Update UI with record
            }
        }
    }
}

// Usage
fetchStreamingJSONL('https://api.example.com/data.jsonl');

// With async iterator (modern browsers)
async function* parseJSONLStream(response) {
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop();

        for (const line of lines) {
            if (line.trim()) {
                yield JSON.parse(line);
            }
        }
    }
}

// Usage with for-await
const response = await fetch('https://api.example.com/data.jsonl');
for await (const record of parseJSONLStream(response)) {
    console.log(record);
}

Complete Node.js Example: API + Streaming

// Complete example: JSONL API with filtering
const express = require('express');
const fs = require('fs');
const ndjson = require('ndjson');
const { Transform } = require('stream');

const app = express();

// Custom filter transform
class FilterStream extends Transform {
    constructor(query) {
        super({ objectMode: true });
        this.query = query;
    }

    _transform(record, encoding, callback) {
        // Apply filters
        let match = true;

        if (this.query.minAge && record.age < this.query.minAge) {
            match = false;
        }

        if (this.query.status && record.status !== this.query.status) {
            match = false;
        }

        if (match) {
            this.push(record);
        }

        callback();
    }
}

// Streaming JSONL endpoint with query params
app.get('/api/users', (req, res) => {
    const query = {
        minAge: parseInt(req.query.minAge) || 0,
        status: req.query.status
    };

    res.setHeader('Content-Type', 'application/x-ndjson');
    res.setHeader('Transfer-Encoding', 'chunked');

    const filterStream = new FilterStream(query);

    fs.createReadStream('users.jsonl')
        .pipe(ndjson.parse())
        .pipe(filterStream)
        .pipe(ndjson.stringify())
        .pipe(res)
        .on('error', (err) => {
            console.error('Stream error:', err);
            res.end();
        });
});

app.listen(3000, () => {
    console.log('API listening on port 3000');
});

Go

Reading JSONL (bufio.Scanner)

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "log"
    "os"
)

type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

func main() {
    file, err := os.Open("data.jsonl")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    // Increase buffer size for large lines (default is 64KB)
    const maxCapacity = 1024 * 1024 // 1MB
    buf := make([]byte, maxCapacity)
    scanner.Buffer(buf, maxCapacity)

    lineNum := 0
    for scanner.Scan() {
        lineNum++
        line := scanner.Bytes()

        var user User
        if err := json.Unmarshal(line, &user); err != nil {
            log.Printf("Error on line %d: %v", lineNum, err)
            continue
        }

        fmt.Printf("%s\n", user.Name)
    }

    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}

Writing JSONL

package main

import (
    "bufio"
    "encoding/json"
    "log"
    "os"
)

type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

func main() {
    users := []User{
        {ID: 1, Name: "Alice", Email: "[email protected]"},
        {ID: 2, Name: "Bob", Email: "[email protected]"},
    }

    file, err := os.Create("output.jsonl")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // Use buffered writer for better performance
    writer := bufio.NewWriter(file)
    defer writer.Flush()

    encoder := json.NewEncoder(writer)

    for _, user := range users {
        if err := encoder.Encode(&user); err != nil {
            log.Printf("Error encoding user %d: %v", user.ID, err)
            continue
        }
    }
}

// Append to existing file
func appendToJSONL(user User, filepath string) error {
    file, err := os.OpenFile(filepath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return err
    }
    defer file.Close()

    encoder := json.NewEncoder(file)
    return encoder.Encode(&user)
}

Streaming with json.Decoder

package main

import (
    "bufio"
    "encoding/json"
    "io"
    "log"
    "os"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// Stream JSONL with json.Decoder (more efficient than Scanner for large files)
func streamJSONL(filepath string, processFn func(Record) error) error {
    file, err := os.Open(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    reader := bufio.NewReader(file)

    for {
        line, err := reader.ReadBytes('\n')
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        var record Record
        if err := json.Unmarshal(line, &record); err != nil {
            log.Printf("Parse error: %v", err)
            continue
        }

        if err := processFn(record); err != nil {
            return err
        }
    }

    return nil
}

func main() {
    err := streamJSONL("data.jsonl", func(r Record) error {
        // Process each record
        log.Printf("Processing ID %d", r.ID)
        return nil
    })

    if err != nil {
        log.Fatal(err)
    }
}

Concurrent Processing with Goroutines

package main

import (
    "bufio"
    "encoding/json"
    "log"
    "os"
    "sync"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// Concurrent JSONL processing
func processConcurrent(filepath string, numWorkers int) error {
    file, err := os.Open(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    // Channel for distributing work
    lines := make(chan []byte, numWorkers*2)
    results := make(chan Record, numWorkers*2)

    // Start worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range lines {
                var record Record
                if err := json.Unmarshal(line, &record); err != nil {
                    log.Printf("Parse error: %v", err)
                    continue
                }

                // Process record
                processed := processRecord(record)
                results <- processed
            }
        }()
    }

    // Result collector
    var resultsWg sync.WaitGroup
    resultsWg.Add(1)
    go func() {
        defer resultsWg.Done()
        for result := range results {
            // Handle results
            log.Printf("Processed: %+v", result)
        }
    }()

    // Read file and distribute lines
    scanner := bufio.NewScanner(file)
    buf := make([]byte, 1024*1024)
    scanner.Buffer(buf, 1024*1024)

    for scanner.Scan() {
        // Make a copy of line (scanner reuses buffer)
        lineCopy := make([]byte, len(scanner.Bytes()))
        copy(lineCopy, scanner.Bytes())
        lines <- lineCopy
    }

    close(lines)
    wg.Wait()
    close(results)
    resultsWg.Wait()

    return scanner.Err()
}

func processRecord(r Record) Record {
    // Expensive processing here
    return r
}

func main() {
    if err := processConcurrent("data.jsonl", 10); err != nil {
        log.Fatal(err)
    }
}

Compressed JSONL

package main

import (
    "bufio"
    "compress/gzip"
    "encoding/json"
    "log"
    "os"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// Read gzipped JSONL
func readGzippedJSONL(filepath string) error {
    file, err := os.Open(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    gzReader, err := gzip.NewReader(file)
    if err != nil {
        return err
    }
    defer gzReader.Close()

    scanner := bufio.NewScanner(gzReader)
    buf := make([]byte, 1024*1024)
    scanner.Buffer(buf, 1024*1024)

    for scanner.Scan() {
        var record Record
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            log.Printf("Parse error: %v", err)
            continue
        }

        // Process record
        log.Printf("%+v", record)
    }

    return scanner.Err()
}

// Write gzipped JSONL
func writeGzippedJSONL(records []Record, filepath string) error {
    file, err := os.Create(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    gzWriter := gzip.NewWriter(file)
    defer gzWriter.Close()

    writer := bufio.NewWriter(gzWriter)
    defer writer.Flush()

    encoder := json.NewEncoder(writer)

    for _, record := range records {
        if err := encoder.Encode(&record); err != nil {
            return err
        }
    }

    return nil
}

func main() {
    // Read
    if err := readGzippedJSONL("data.jsonl.gz"); err != nil {
        log.Fatal(err)
    }

    // Write
    records := []Record{{ID: 1, Data: "test"}}
    if err := writeGzippedJSONL(records, "output.jsonl.gz"); err != nil {
        log.Fatal(err)
    }
}

Generic JSONL Processing (Go 1.18+)

package main

import (
    "bufio"
    "encoding/json"
    "io"
    "os"
)

// Generic JSONL reader
func ReadJSONL[T any](filepath string) ([]T, error) {
    file, err := os.Open(filepath)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var results []T
    scanner := bufio.NewScanner(file)

    for scanner.Scan() {
        var record T
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            continue
        }
        results = append(results, record)
    }

    return results, scanner.Err()
}

// Generic JSONL writer
func WriteJSONL[T any](records []T, filepath string) error {
    file, err := os.Create(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    writer := bufio.NewWriter(file)
    defer writer.Flush()

    encoder := json.NewEncoder(writer)

    for _, record := range records {
        if err := encoder.Encode(&record); err != nil {
            return err
        }
    }

    return nil
}

// Streaming generic processor
func StreamJSONL[T any](r io.Reader, processFn func(T) error) error {
    scanner := bufio.NewScanner(r)

    for scanner.Scan() {
        var record T
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            continue
        }

        if err := processFn(record); err != nil {
            return err
        }
    }

    return scanner.Err()
}

type User struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

func main() {
    // Type-safe reading
    users, _ := ReadJSONL[User]("users.jsonl")

    // Type-safe writing
    WriteJSONL(users, "output.jsonl")

    // Streaming
    file, _ := os.Open("users.jsonl")
    defer file.Close()

    StreamJSONL(file, func(u User) error {
        // Process user
        return nil
    })
}

Complete Go Example: Concurrent ETL Pipeline

package main

import (
    "bufio"
    "compress/gzip"
    "encoding/json"
    "log"
    "os"
    "sync"
)

type InputRecord struct {
    ID     int    `json:"id"`
    Status string `json:"status"`
    Value  int    `json:"value"`
}

type OutputRecord struct {
    ID        int    `json:"id"`
    Processed bool   `json:"processed"`
    Result    int    `json:"result"`
}

func main() {
    if err := runETL("input.jsonl.gz", "output.jsonl.gz", 10); err != nil {
        log.Fatal(err)
    }
}

func runETL(inputPath, outputPath string, numWorkers int) error {
    // Open input file
    inFile, err := os.Open(inputPath)
    if err != nil {
        return err
    }
    defer inFile.Close()

    gzReader, _ := gzip.NewReader(inFile)
    defer gzReader.Close()

    // Open output file
    outFile, err := os.Create(outputPath)
    if err != nil {
        return err
    }
    defer outFile.Close()

    gzWriter := gzip.NewWriter(outFile)
    defer gzWriter.Close()

    // Channels
    inputs := make(chan InputRecord, numWorkers*2)
    outputs := make(chan OutputRecord, numWorkers*2)

    // Workers
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for input := range inputs {
                // Filter
                if input.Status != "active" {
                    continue
                }

                // Transform
                output := OutputRecord{
                    ID:        input.ID,
                    Processed: true,
                    Result:    input.Value * 2,
                }

                outputs <- output
            }
        }()
    }

    // Writer goroutine
    var writerWg sync.WaitGroup
    writerWg.Add(1)
    go func() {
        defer writerWg.Done()
        encoder := json.NewEncoder(gzWriter)
        for output := range outputs {
            encoder.Encode(&output)
        }
    }()

    // Read and distribute
    scanner := bufio.NewScanner(gzReader)
    buf := make([]byte, 1024*1024)
    scanner.Buffer(buf, 1024*1024)

    for scanner.Scan() {
        var record InputRecord
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            continue
        }
        inputs <- record
    }

    // Cleanup
    close(inputs)
    wg.Wait()
    close(outputs)
    writerWg.Wait()

    return scanner.Err()
}

Java

Setup and Dependencies

Add Jackson or Gson to your project for JSON processing:

// Maven - Jackson
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.15.2</version>
</dependency>

// Gradle - Jackson
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'

// Maven - Gson
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.10.1</version>
</dependency>

// Gradle - Gson
implementation 'com.google.code.gson:gson:2.10.1'

Reading JSONL with Jackson

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JacksonJSONLReader {
    public static class User {
        public int id;
        public String name;
        public String email;
    }

    public static void main(String[] args) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        List<User> users = new ArrayList<>();

        try (BufferedReader reader = new BufferedReader(new FileReader("data.jsonl"))) {
            String line;
            int lineNum = 0;

            while ((line = reader.readLine()) != null) {
                lineNum++;

                // Skip empty lines
                if (line.trim().isEmpty()) {
                    continue;
                }

                try {
                    User user = mapper.readValue(line, User.class);
                    users.add(user);
                    System.out.println(user.name);
                } catch (IOException e) {
                    System.err.println("Error on line " + lineNum + ": " + e.getMessage());
                }
            }
        }

        System.out.println("Loaded " + users.size() + " users");
    }
}

Writing JSONL with Jackson

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class JacksonJSONLWriter {
    public static class User {
        public int id;
        public String name;
        public String email;

        public User(int id, String name, String email) {
            this.id = id;
            this.name = name;
            this.email = email;
        }
    }

    public static void main(String[] args) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        List<User> users = Arrays.asList(
            new User(1, "Alice", "[email protected]"),
            new User(2, "Bob", "[email protected]"),
            new User(3, "Charlie", "[email protected]")
        );

        try (BufferedWriter writer = new BufferedWriter(new FileWriter("output.jsonl"))) {
            for (User user : users) {
                String json = mapper.writeValueAsString(user);
                writer.write(json);
                writer.newLine();
            }
        }

        System.out.println("Wrote " + users.size() + " users to output.jsonl");
    }
}

Streaming Large Files with Jackson

Jackson's streaming API provides memory-efficient processing:

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.function.Consumer;

public class StreamingJSONLProcessor {
    public static class Record {
        public int id;
        public String data;
    }

    public static void streamJSONL(String filepath, Consumer<Record> processor) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        try (BufferedReader reader = new BufferedReader(new FileReader(filepath), 8192)) {
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.trim().isEmpty()) {
                    continue;
                }

                try {
                    Record record = mapper.readValue(line, Record.class);
                    processor.accept(record);
                } catch (IOException e) {
                    System.err.println("Parse error: " + e.getMessage());
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        // Process each record as it's read (low memory usage)
        streamJSONL("large.jsonl", record -> {
            System.out.println("Processing: " + record.id);
            // Do expensive processing here
        });
    }
}

Reading JSONL with Gson

import com.google.gson.Gson;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class GsonJSONLReader {
    public static class User {
        int id;
        String name;
        String email;
    }

    public static void main(String[] args) throws IOException {
        Gson gson = new Gson();
        List<User> users = new ArrayList<>();

        try (BufferedReader reader = new BufferedReader(new FileReader("data.jsonl"))) {
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.trim().isEmpty()) {
                    continue;
                }

                User user = gson.fromJson(line, User.class);
                users.add(user);
                System.out.println(user.name);
            }
        }

        System.out.println("Total users: " + users.size());
    }
}

Writing JSONL with Gson

import com.google.gson.Gson;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class GsonJSONLWriter {
    public static class User {
        int id;
        String name;
        String email;

        User(int id, String name, String email) {
            this.id = id;
            this.name = name;
            this.email = email;
        }
    }

    public static void main(String[] args) throws IOException {
        Gson gson = new Gson();

        List<User> users = Arrays.asList(
            new User(1, "Alice", "[email protected]"),
            new User(2, "Bob", "[email protected]")
        );

        try (BufferedWriter writer = new BufferedWriter(new FileWriter("output.jsonl"))) {
            for (User user : users) {
                String json = gson.toJson(user);
                writer.write(json);
                writer.newLine();
            }
        }
    }
}

Parallel Processing with Streams

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelJSONLProcessor {
    public static class Record {
        public int id;
        public String data;
        public String status;
    }

    public static void main(String[] args) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        // Read and process in parallel
        List<Record> results = Files.lines(Paths.get("data.jsonl"))
            .parallel()
            .filter(line -> !line.trim().isEmpty())
            .map(line -> {
                try {
                    return mapper.readValue(line, Record.class);
                } catch (IOException e) {
                    return null;
                }
            })
            .filter(record -> record != null)
            .filter(record -> "active".equals(record.status))
            .map(record -> {
                // Expensive processing
                record.data = processData(record.data);
                return record;
            })
            .collect(Collectors.toList());

        System.out.println("Processed " + results.size() + " records");
    }

    private static String processData(String data) {
        // Simulate expensive processing
        return data.toUpperCase();
    }
}

Complete Java Example: ETL Pipeline

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.*;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class JSONLETLPipeline {
    private static final ObjectMapper mapper = new ObjectMapper();

    public static class InputRecord {
        public int id;
        public String status;
        public int value;
    }

    public static class OutputRecord {
        public int id;
        public boolean processed;
        public int result;

        public OutputRecord(int id, boolean processed, int result) {
            this.id = id;
            this.processed = processed;
            this.result = result;
        }
    }

    public static void main(String[] args) throws Exception {
        runETL("input.jsonl.gz", "output.jsonl.gz", 10);
    }

    public static void runETL(String inputPath, String outputPath, int numThreads)
            throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        BlockingQueue<OutputRecord> outputQueue = new LinkedBlockingQueue<>(1000);

        // Writer thread
        CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(() -> {
            try (BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(
                        new GZIPOutputStream(new FileOutputStream(outputPath))))) {

                while (true) {
                    OutputRecord record = outputQueue.poll(1, TimeUnit.SECONDS);
                    if (record == null) continue;

                    if (record.id == -1) break; // Sentinel value

                    String json = mapper.writeValueAsString(record);
                    writer.write(json);
                    writer.newLine();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // Read and process
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(
                    new GZIPInputStream(new FileInputStream(inputPath))))) {

            reader.lines()
                .parallel()
                .filter(line -> !line.trim().isEmpty())
                .forEach(line -> {
                    try {
                        InputRecord input = mapper.readValue(line, InputRecord.class);

                        // Filter
                        if (!"active".equals(input.status)) {
                            return;
                        }

                        // Transform
                        OutputRecord output = new OutputRecord(
                            input.id,
                            true,
                            input.value * 2
                        );

                        outputQueue.put(output);
                    } catch (Exception e) {
                        System.err.println("Error: " + e.getMessage());
                    }
                });
        }

        // Signal completion
        outputQueue.put(new OutputRecord(-1, false, 0));

        // Wait for writer
        writerFuture.get();
        executor.shutdown();

        System.out.println("ETL pipeline complete");
    }
}

Best Practices

  • Use BufferedReader/Writer: Provides significant performance improvement for file I/O
  • Reuse ObjectMapper: Creating new instances is expensive - use a singleton
  • Handle errors gracefully: One bad line shouldn't crash your entire pipeline
  • Use try-with-resources: Ensures proper cleanup of file handles
  • Consider streaming for large files: Process line-by-line to minimize memory usage

Rust

Setup and Dependencies

Add serde and serde_json to your Cargo.toml:

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

Reading JSONL

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize, Serialize)]
struct User {
    id: u32,
    name: String,
    email: String,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("data.jsonl")?;
    let reader = BufReader::new(file);

    for (line_num, line) in reader.lines().enumerate() {
        let line = line?;

        // Skip empty lines
        if line.trim().is_empty() {
            continue;
        }

        // Parse JSON
        match serde_json::from_str::(&line) {
            Ok(user) => {
                println!("User: {} ({})", user.name, user.email);
            }
            Err(e) => {
                eprintln!("Error on line {}: {}", line_num + 1, e);
            }
        }
    }

    Ok(())
}

Writing JSONL

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufWriter, Write};

#[derive(Debug, Deserialize, Serialize)]
struct User {
    id: u32,
    name: String,
    email: String,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let users = vec![
        User {
            id: 1,
            name: "Alice".to_string(),
            email: "[email protected]".to_string(),
        },
        User {
            id: 2,
            name: "Bob".to_string(),
            email: "[email protected]".to_string(),
        },
    ];

    let file = File::create("output.jsonl")?;
    let mut writer = BufWriter::new(file);

    for user in users {
        // Serialize to JSON string
        let json = serde_json::to_string(&user)?;

        // Write JSON + newline
        writeln!(writer, "{}", json)?;
    }

    // Ensure all data is written
    writer.flush()?;

    println!("Successfully wrote JSONL file");
    Ok(())
}

Memory-Efficient Streaming

Process large files line-by-line with minimal memory:

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize)]
struct Record {
    id: u32,
    data: String,
}

fn stream_jsonl<F>(filepath: &str, mut processor: F) -> Result<(), Box<dyn std::error::Error>>
where
    F: FnMut(Record) -> Result<(), Box<dyn std::error::Error>>,
{
    let file = File::open(filepath)?;
    let reader = BufReader::new(file);

    for line in reader.lines() {
        let line = line?;

        if line.trim().is_empty() {
            continue;
        }

        match serde_json::from_str::(&line) {
            Ok(record) => {
                processor(record)?;
            }
            Err(e) => {
                eprintln!("Parse error: {}", e);
                continue;
            }
        }
    }

    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Process each record as it's read
    stream_jsonl("large.jsonl", |record| {
        println!("Processing ID: {}", record.id);
        // Do expensive work here
        Ok(())
    })?;

    Ok(())
}

Robust Error Handling

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize)]
struct User {
    id: u32,
    name: String,
}

#[derive(Debug)]
struct ParseStats {
    total_lines: usize,
    successful: usize,
    errors: usize,
}

fn read_jsonl_with_stats(filepath: &str) -> Result<(Vec<User>, ParseStats), std::io::Error> {
    let file = File::open(filepath)?;
    let reader = BufReader::new(file);

    let mut users = Vec::new();
    let mut stats = ParseStats {
        total_lines: 0,
        successful: 0,
        errors: 0,
    };

    for (line_num, line) in reader.lines().enumerate() {
        stats.total_lines += 1;
        let line = line?;

        if line.trim().is_empty() {
            continue;
        }

        match serde_json::from_str::(&line) {
            Ok(user) => {
                users.push(user);
                stats.successful += 1;
            }
            Err(e) => {
                eprintln!("Line {}: {}", line_num + 1, e);
                stats.errors += 1;
            }
        }
    }

    Ok((users, stats))
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (users, stats) = read_jsonl_with_stats("data.jsonl")?;

    println!("Loaded {} users", users.len());
    println!("Stats: {:?}", stats);

    Ok(())
}

Compressed JSONL

Add flate2 for gzip support:

// Cargo.toml
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
flate2 = "1.0"

// src/main.rs
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    id: u32,
    data: String,
}

fn read_gzipped_jsonl(filepath: &str) -> Result<Vec<Record>, Box<dyn std::error::Error>> {
    let file = File::open(filepath)?;
    let decoder = GzDecoder::new(file);
    let reader = BufReader::new(decoder);

    let mut records = Vec::new();

    for line in reader.lines() {
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }

        let record: Record = serde_json::from_str(&line)?;
        records.push(record);
    }

    Ok(records)
}

fn write_gzipped_jsonl(records: &[Record], filepath: &str) -> Result<(), Box<dyn std::error::Error>> {
    let file = File::create(filepath)?;
    let encoder = GzEncoder::new(file, Compression::default());
    let mut writer = BufWriter::new(encoder);

    for record in records {
        let json = serde_json::to_string(record)?;
        writeln!(writer, "{}", json)?;
    }

    writer.flush()?;
    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Read gzipped JSONL
    let records = read_gzipped_jsonl("data.jsonl.gz")?;
    println!("Loaded {} records", records.len());

    // Write gzipped JSONL
    write_gzipped_jsonl(&records, "output.jsonl.gz")?;
    println!("Wrote compressed JSONL");

    Ok(())
}

Parallel Processing with Rayon

Add rayon for easy parallelism:

// Cargo.toml
[dependencies]
rayon = "1.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

// src/main.rs
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    id: u32,
    status: String,
    value: i32,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("data.jsonl")?;
    let reader = BufReader::new(file);

    // Read all lines into memory
    let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();

    // Process in parallel
    let results: Vec<Record> = lines
        .par_iter()
        .filter_map(|line| {
            if line.trim().is_empty() {
                return None;
            }

            serde_json::from_str::(line).ok()
        })
        .filter(|record| record.status == "active")
        .map(|mut record| {
            // Expensive processing
            record.value *= 2;
            record
        })
        .collect();

    println!("Processed {} records", results.len());

    Ok(())
}

Complete Rust Example: ETL Pipeline

use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

#[derive(Debug, Deserialize)]
struct InputRecord {
    id: u32,
    status: String,
    value: i32,
}

#[derive(Debug, Serialize)]
struct OutputRecord {
    id: u32,
    processed: bool,
    result: i32,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_etl("input.jsonl.gz", "output.jsonl.gz")?;
    Ok(())
}

fn run_etl(input_path: &str, output_path: &str) -> Result<(), Box<dyn std::error::Error>> {
    // Read gzipped input
    let input_file = File::open(input_path)?;
    let decoder = GzDecoder::new(input_file);
    let reader = BufReader::new(decoder);

    // Read all lines
    let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();

    println!("Processing {} lines...", lines.len());

    // Parallel ETL pipeline
    let results: Vec<OutputRecord> = lines
        .par_iter()
        .filter_map(|line| {
            if line.trim().is_empty() {
                return None;
            }

            // Parse
            serde_json::from_str::(line).ok()
        })
        .filter(|input| {
            // Filter: only active records
            input.status == "active"
        })
        .map(|input| {
            // Transform
            OutputRecord {
                id: input.id,
                processed: true,
                result: input.value * 2,
            }
        })
        .collect();

    println!("Filtered to {} records", results.len());

    // Write gzipped output
    let output_file = File::create(output_path)?;
    let encoder = GzEncoder::new(output_file, Compression::default());
    let mut writer = BufWriter::new(encoder);

    for record in results {
        let json = serde_json::to_string(&record)?;
        writeln!(writer, "{}", json)?;
    }

    writer.flush()?;
    println!("ETL complete!");

    Ok(())
}

Best Practices

  • Use BufReader/BufWriter: Dramatically improves I/O performance
  • Leverage serde's derive macro: Auto-generate serialization code
  • Handle errors explicitly: Use Result types and proper error propagation
  • Use Rayon for parallelism: Easy parallel processing with minimal code changes
  • Flush writers explicitly: Ensures all data is written to disk
  • Consider memory vs speed tradeoffs: Line-by-line for huge files, bulk for better parallelism