Complete guides for working with JSONL in Python, JavaScript, Go, Java, and Rust
The simplest approach using Python's built-in json module:
import json
# Basic line-by-line reading
with open('data.jsonl', 'r', encoding='utf-8') as f:
for line in f:
# Skip empty lines
line = line.strip()
if not line:
continue
# Parse JSON
record = json.loads(line)
print(record['name'])
# With error handling
with open('data.jsonl', 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
try:
record = json.loads(line)
process(record)
except json.JSONDecodeError as e:
print(f"Error on line {line_num}: {e}")
continue
import json
# Write list of dicts to JSONL
data = [
{"id": 1, "name": "Alice", "age": 30},
{"id": 2, "name": "Bob", "age": 25},
]
with open('output.jsonl', 'w', encoding='utf-8') as f:
for record in data:
# Write JSON + newline
f.write(json.dumps(record) + '\n')
# Append to existing file
with open('output.jsonl', 'a', encoding='utf-8') as f:
new_record = {"id": 3, "name": "Charlie", "age": 35}
f.write(json.dumps(new_record) + '\n')
# Pretty-print with ensure_ascii=False for Unicode
with open('output.jsonl', 'w', encoding='utf-8') as f:
for record in data:
f.write(json.dumps(record, ensure_ascii=False) + '\n')
orjson is 3-5x faster than standard library and handles more edge cases:
# Install: pip install orjson
import orjson
# Read JSONL (note: read as binary 'rb')
with open('data.jsonl', 'rb') as f:
for line in f:
record = orjson.loads(line)
print(record['name'])
# Write JSONL (note: orjson.dumps returns bytes)
with open('output.jsonl', 'wb') as f:
for record in data:
# orjson.dumps() returns bytes, add b'\n'
f.write(orjson.dumps(record) + b'\n')
# Pretty formatting option
with open('output.jsonl', 'wb') as f:
for record in data:
json_bytes = orjson.dumps(
record,
option=orjson.OPT_INDENT_2 # Pretty print (not typical for JSONL)
)
f.write(json_bytes + b'\n')
Performance tip: orjson is especially fast for large objects and handles datetime serialization automatically.
Pandas has native JSONL support via lines=True parameter:
import pandas as pd
# Read JSONL into DataFrame
df = pd.read_json('data.jsonl', lines=True)
print(df.head())
# Write DataFrame to JSONL
df.to_json('output.jsonl', orient='records', lines=True)
# Read compressed JSONL
df = pd.read_json('data.jsonl.gz', lines=True, compression='gzip')
# Chunked reading for large files (memory efficient)
chunk_size = 10000
chunks = []
for chunk in pd.read_json('large.jsonl', lines=True, chunksize=chunk_size):
# Process each chunk
filtered = chunk[chunk['age'] > 25]
chunks.append(filtered)
# Combine all chunks
result = pd.concat(chunks, ignore_index=True)
# Stream processing without loading all into memory
with pd.read_json('large.jsonl', lines=True, chunksize=10000) as reader:
for chunk in reader:
# Process and write incrementally
processed = transform(chunk)
processed.to_json('output.jsonl', orient='records', lines=True, mode='a')
Process files larger than available RAM:
import json
def stream_jsonl(filepath, batch_size=1000):
"""Generator that yields batches of records"""
batch = []
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
try:
record = json.loads(line)
batch.append(record)
if len(batch) >= batch_size:
yield batch
batch = []
except json.JSONDecodeError:
continue
# Yield remaining records
if batch:
yield batch
# Usage: process in batches
for batch in stream_jsonl('huge.jsonl', batch_size=5000):
# Process batch (5000 records at a time)
results = bulk_process(batch)
save_results(results)
# Filter and transform streaming
def filter_transform_jsonl(input_path, output_path, condition):
with open(input_path, 'r') as fin, open(output_path, 'w') as fout:
for line in fin:
record = json.loads(line)
# Apply filter
if condition(record):
# Transform
transformed = transform(record)
fout.write(json.dumps(transformed) + '\n')
# Example: Extract active users
filter_transform_jsonl(
'users.jsonl',
'active_users.jsonl',
lambda r: r.get('status') == 'active'
)
import gzip
import json
# Read gzipped JSONL (streaming)
with gzip.open('data.jsonl.gz', 'rt', encoding='utf-8') as f:
for line in f:
record = json.loads(line)
process(record)
# Write gzipped JSONL
with gzip.open('output.jsonl.gz', 'wt', encoding='utf-8') as f:
for record in data:
f.write(json.dumps(record) + '\n')
# Transparent compression wrapper
def open_jsonl(filepath, mode='r'):
"""Open JSONL file, handling .gz automatically"""
if filepath.endswith('.gz'):
return gzip.open(filepath, mode + 't', encoding='utf-8')
else:
return open(filepath, mode, encoding='utf-8')
# Usage works for both compressed and uncompressed
with open_jsonl('data.jsonl.gz', 'r') as f:
for line in f:
record = json.loads(line)
# Read bzip2, xz, or zstd
import bz2
import lzma
# bzip2
with bz2.open('data.jsonl.bz2', 'rt') as f:
for line in f:
record = json.loads(line)
# xz/lzma
with lzma.open('data.jsonl.xz', 'rt') as f:
for line in f:
record = json.loads(line)
from multiprocessing import Pool
import json
def process_chunk(lines):
"""Process a chunk of lines"""
results = []
for line in lines:
try:
record = json.loads(line)
result = expensive_computation(record)
results.append(result)
except:
continue
return results
def parallel_process_jsonl(filepath, num_workers=4, chunk_size=10000):
"""Process JSONL in parallel"""
# Read file into chunks
chunks = []
current_chunk = []
with open(filepath, 'r') as f:
for line in f:
current_chunk.append(line)
if len(current_chunk) >= chunk_size:
chunks.append(current_chunk)
current_chunk = []
if current_chunk:
chunks.append(current_chunk)
# Process chunks in parallel
with Pool(processes=num_workers) as pool:
all_results = pool.map(process_chunk, chunks)
# Flatten results
final_results = [item for sublist in all_results for item in sublist]
return final_results
# Usage
results = parallel_process_jsonl('data.jsonl', num_workers=8)
# Alternative: concurrent.futures for more control
from concurrent.futures import ProcessPoolExecutor, as_completed
def process_file_parallel(filepath):
# Split file into parts
chunks = split_file_into_chunks(filepath, num_chunks=10)
with ProcessPoolExecutor(max_workers=10) as executor:
# Submit all chunks
futures = {executor.submit(process_chunk_file, chunk): chunk
for chunk in chunks}
# Collect results as they complete
for future in as_completed(futures):
chunk = futures[future]
try:
result = future.result()
print(f"Chunk {chunk} completed")
except Exception as e:
print(f"Chunk {chunk} failed: {e}")
ijson provides iterative JSON parsing for ultra-large files:
# Install: pip install ijson
import ijson
# Stream large JSONL files with minimal memory
with open('large.jsonl', 'rb') as f:
# Parse each JSON object as it's encountered
objects = ijson.items(f, '', multiple_values=True)
for obj in objects:
print(obj['name'])
process(obj)
# Memory-efficient: only loads one object at a time
# Perfect for files larger than available RAM
# Advanced: extract specific fields only
with open('data.jsonl', 'rb') as f:
# Only parse the 'name' field from each object
names = ijson.items(f, 'item.name', multiple_values=True)
for name in names:
print(name)
# Combine with generators for pipelines
def extract_active_users(filepath):
with open(filepath, 'rb') as f:
objects = ijson.items(f, '', multiple_values=True)
for obj in objects:
if obj.get('status') == 'active':
yield obj
# Use in pipeline
for user in extract_active_users('users.jsonl'):
process(user)
Best for: Files too large to fit in memory, where you need true streaming parsing.
The jsonlines library provides a clean API for JSONL operations:
# Install: pip install jsonlines
import jsonlines
# Read JSONL
with jsonlines.open('data.jsonl') as reader:
for record in reader:
print(record['name'])
# Write JSONL
with jsonlines.open('output.jsonl', mode='w') as writer:
writer.write({'name': 'Alice', 'age': 30})
writer.write({'name': 'Bob', 'age': 25})
# Or write multiple at once
writer.write_all([
{'name': 'Charlie', 'age': 35},
{'name': 'Diana', 'age': 28}
])
# Read with compression
with jsonlines.open('data.jsonl.gz') as reader:
for record in reader:
process(record)
# Skip invalid lines
with jsonlines.open('data.jsonl', mode='r') as reader:
for record in reader.iter(skip_invalid=True):
process(record)
# Type checking with dataclasses
from dataclasses import dataclass
@dataclass
class User:
id: int
name: str
age: int
with jsonlines.open('users.jsonl') as reader:
for obj in reader:
user = User(**obj) # Convert to dataclass
print(user.name)
#!/usr/bin/env python3
"""
ETL Pipeline: Read JSONL, transform, filter, write output
Handles large files, errors, and compression
"""
import json
import gzip
import logging
from typing import Iterator, Dict, Any
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_jsonl(filepath: str) -> Iterator[Dict[str, Any]]:
"""Read JSONL file (handles .gz compression)"""
open_func = gzip.open if filepath.endswith('.gz') else open
with open_func(filepath, 'rt', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
logger.error(f"Parse error line {line_num}: {e}")
def write_jsonl(records: Iterator[Dict[str, Any]], filepath: str):
"""Write records to JSONL file (handles .gz compression)"""
open_func = gzip.open if filepath.endswith('.gz') else open
with open_func(filepath, 'wt', encoding='utf-8') as f:
for record in records:
f.write(json.dumps(record) + '\n')
def transform(record: Dict[str, Any]) -> Dict[str, Any]:
"""Transform record (example: add computed field)"""
record['full_name'] = f"{record['first_name']} {record['last_name']}"
record['is_adult'] = record['age'] >= 18
return record
def filter_record(record: Dict[str, Any]) -> bool:
"""Filter logic (example: active users only)"""
return record.get('status') == 'active'
def etl_pipeline(input_file: str, output_file: str):
"""Complete ETL pipeline"""
logger.info(f"Starting ETL: {input_file} -> {output_file}")
processed = 0
filtered = 0
records = read_jsonl(input_file)
transformed = (transform(r) for r in records)
filtered_records = (r for r in transformed if filter_record(r))
write_jsonl(filtered_records, output_file)
logger.info(f"ETL complete. Processed: {processed}, Output: {filtered}")
if __name__ == '__main__':
etl_pipeline('input.jsonl.gz', 'output.jsonl.gz')
const fs = require('fs');
const readline = require('readline');
// Basic line-by-line reading
async function readJSONL(filepath) {
const fileStream = fs.createReadStream(filepath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity // Handle both \n and \r\n
});
for await (const line of rl) {
if (!line.trim()) continue; // Skip empty lines
try {
const record = JSON.parse(line);
console.log(record.name);
} catch (err) {
console.error('Parse error:', err.message);
}
}
}
// Usage
readJSONL('data.jsonl');
// Callback-based approach
function readJSONLCallback(filepath, onRecord, onComplete) {
const fileStream = fs.createReadStream(filepath);
const rl = readline.createInterface({ input: fileStream });
rl.on('line', (line) => {
try {
const record = JSON.parse(line);
onRecord(record);
} catch (err) {
console.error('Parse error:', err);
}
});
rl.on('close', onComplete);
}
const fs = require('fs');
// Basic writing
const data = [
{ id: 1, name: 'Alice', age: 30 },
{ id: 2, name: 'Bob', age: 25 }
];
const writeStream = fs.createWriteStream('output.jsonl');
data.forEach(record => {
writeStream.write(JSON.stringify(record) + '\n');
});
writeStream.end();
// Async/await with promises
async function writeJSONL(records, filepath) {
const writeStream = fs.createWriteStream(filepath);
for (const record of records) {
const json = JSON.stringify(record) + '\n';
// Wait if buffer is full
if (!writeStream.write(json)) {
await new Promise(resolve => writeStream.once('drain', resolve));
}
}
writeStream.end();
return new Promise(resolve => writeStream.once('finish', resolve));
}
// Usage
await writeJSONL(data, 'output.jsonl');
// Append to existing file
const appendStream = fs.createWriteStream('output.jsonl', { flags: 'a' });
appendStream.write(JSON.stringify({ id: 3, name: 'Charlie' }) + '\n');
appendStream.end();
The ndjson npm package provides optimized streaming:
// Install: npm install ndjson
const fs = require('fs');
const ndjson = require('ndjson');
// Read JSONL with streaming parser
fs.createReadStream('data.jsonl')
.pipe(ndjson.parse())
.on('data', (record) => {
console.log(record.name);
})
.on('end', () => {
console.log('Done');
});
// Write JSONL with streaming serializer
const writeStream = fs.createWriteStream('output.jsonl');
const stringify = ndjson.stringify();
stringify.pipe(writeStream);
// Write objects
stringify.write({ id: 1, name: 'Alice' });
stringify.write({ id: 2, name: 'Bob' });
stringify.end();
// Transform pipeline
fs.createReadStream('input.jsonl')
.pipe(ndjson.parse())
.pipe(ndjson.stringify())
.pipe(fs.createWriteStream('output.jsonl'));
const { Transform } = require('stream');
const fs = require('fs');
const ndjson = require('ndjson');
// Custom transform stream
class FilterTransform extends Transform {
constructor(filterFn) {
super({ objectMode: true });
this.filterFn = filterFn;
}
_transform(record, encoding, callback) {
if (this.filterFn(record)) {
// Transform record
record.processed = true;
record.timestamp = Date.now();
this.push(record);
}
callback();
}
}
// Usage: filter active users
const filterActive = new FilterTransform(r => r.status === 'active');
fs.createReadStream('users.jsonl')
.pipe(ndjson.parse())
.pipe(filterActive)
.pipe(ndjson.stringify())
.pipe(fs.createWriteStream('active_users.jsonl'));
// Pipeline with multiple transforms
const { pipeline } = require('stream/promises');
async function processJSONL() {
await pipeline(
fs.createReadStream('input.jsonl'),
ndjson.parse(),
new FilterTransform(r => r.age > 25),
ndjson.stringify(),
fs.createWriteStream('output.jsonl')
);
console.log('Pipeline complete');
}
processJSONL();
const fs = require('fs');
const zlib = require('zlib');
const readline = require('readline');
// Read gzipped JSONL
async function readGzippedJSONL(filepath) {
const fileStream = fs.createReadStream(filepath);
const gunzip = zlib.createGunzip();
const rl = readline.createInterface({
input: fileStream.pipe(gunzip),
crlfDelay: Infinity
});
for await (const line of rl) {
const record = JSON.parse(line);
console.log(record);
}
}
// Write gzipped JSONL
async function writeGzippedJSONL(records, filepath) {
const writeStream = fs.createWriteStream(filepath);
const gzip = zlib.createGzip();
gzip.pipe(writeStream);
for (const record of records) {
gzip.write(JSON.stringify(record) + '\n');
}
gzip.end();
return new Promise(resolve => writeStream.on('finish', resolve));
}
// With ndjson
const ndjson = require('ndjson');
fs.createReadStream('data.jsonl.gz')
.pipe(zlib.createGunzip())
.pipe(ndjson.parse())
.on('data', record => {
console.log(record);
});
const express = require('express');
const fs = require('fs');
const ndjson = require('ndjson');
const app = express();
// Streaming JSONL endpoint
app.get('/api/data', (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Transfer-Encoding', 'chunked');
fs.createReadStream('data.jsonl')
.pipe(ndjson.parse())
.pipe(ndjson.stringify())
.pipe(res);
});
// Query database and stream results
app.get('/api/users', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
const cursor = db.collection('users').find().stream();
cursor.on('data', (doc) => {
res.write(JSON.stringify(doc) + '\n');
});
cursor.on('end', () => {
res.end();
});
});
app.listen(3000);
// Client: consume streaming JSONL
const https = require('https');
const readline = require('readline');
https.get('https://api.example.com/data', (res) => {
const rl = readline.createInterface({ input: res });
rl.on('line', (line) => {
const record = JSON.parse(line);
console.log(record);
});
});
// Streaming JSONL in the browser
async function fetchStreamingJSONL(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Decode chunk
buffer += decoder.decode(value, { stream: true });
// Process complete lines
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
const record = JSON.parse(line);
console.log(record);
// Update UI with record
}
}
}
}
// Usage
fetchStreamingJSONL('https://api.example.com/data.jsonl');
// With async iterator (modern browsers)
async function* parseJSONLStream(response) {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
if (line.trim()) {
yield JSON.parse(line);
}
}
}
}
// Usage with for-await
const response = await fetch('https://api.example.com/data.jsonl');
for await (const record of parseJSONLStream(response)) {
console.log(record);
}
// Complete example: JSONL API with filtering
const express = require('express');
const fs = require('fs');
const ndjson = require('ndjson');
const { Transform } = require('stream');
const app = express();
// Custom filter transform
class FilterStream extends Transform {
constructor(query) {
super({ objectMode: true });
this.query = query;
}
_transform(record, encoding, callback) {
// Apply filters
let match = true;
if (this.query.minAge && record.age < this.query.minAge) {
match = false;
}
if (this.query.status && record.status !== this.query.status) {
match = false;
}
if (match) {
this.push(record);
}
callback();
}
}
// Streaming JSONL endpoint with query params
app.get('/api/users', (req, res) => {
const query = {
minAge: parseInt(req.query.minAge) || 0,
status: req.query.status
};
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Transfer-Encoding', 'chunked');
const filterStream = new FilterStream(query);
fs.createReadStream('users.jsonl')
.pipe(ndjson.parse())
.pipe(filterStream)
.pipe(ndjson.stringify())
.pipe(res)
.on('error', (err) => {
console.error('Stream error:', err);
res.end();
});
});
app.listen(3000, () => {
console.log('API listening on port 3000');
});
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func main() {
file, err := os.Open("data.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
// Increase buffer size for large lines (default is 64KB)
const maxCapacity = 1024 * 1024 // 1MB
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity)
lineNum := 0
for scanner.Scan() {
lineNum++
line := scanner.Bytes()
var user User
if err := json.Unmarshal(line, &user); err != nil {
log.Printf("Error on line %d: %v", lineNum, err)
continue
}
fmt.Printf("%s\n", user.Name)
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
package main
import (
"bufio"
"encoding/json"
"log"
"os"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func main() {
users := []User{
{ID: 1, Name: "Alice", Email: "[email protected]"},
{ID: 2, Name: "Bob", Email: "[email protected]"},
}
file, err := os.Create("output.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// Use buffered writer for better performance
writer := bufio.NewWriter(file)
defer writer.Flush()
encoder := json.NewEncoder(writer)
for _, user := range users {
if err := encoder.Encode(&user); err != nil {
log.Printf("Error encoding user %d: %v", user.ID, err)
continue
}
}
}
// Append to existing file
func appendToJSONL(user User, filepath string) error {
file, err := os.OpenFile(filepath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
encoder := json.NewEncoder(file)
return encoder.Encode(&user)
}
package main
import (
"bufio"
"encoding/json"
"io"
"log"
"os"
)
type Record struct {
ID int `json:"id"`
Data string `json:"data"`
}
// Stream JSONL with json.Decoder (more efficient than Scanner for large files)
func streamJSONL(filepath string, processFn func(Record) error) error {
file, err := os.Open(filepath)
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
for {
line, err := reader.ReadBytes('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
var record Record
if err := json.Unmarshal(line, &record); err != nil {
log.Printf("Parse error: %v", err)
continue
}
if err := processFn(record); err != nil {
return err
}
}
return nil
}
func main() {
err := streamJSONL("data.jsonl", func(r Record) error {
// Process each record
log.Printf("Processing ID %d", r.ID)
return nil
})
if err != nil {
log.Fatal(err)
}
}
package main
import (
"bufio"
"encoding/json"
"log"
"os"
"sync"
)
type Record struct {
ID int `json:"id"`
Data string `json:"data"`
}
// Concurrent JSONL processing
func processConcurrent(filepath string, numWorkers int) error {
file, err := os.Open(filepath)
if err != nil {
return err
}
defer file.Close()
// Channel for distributing work
lines := make(chan []byte, numWorkers*2)
results := make(chan Record, numWorkers*2)
// Start worker goroutines
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for line := range lines {
var record Record
if err := json.Unmarshal(line, &record); err != nil {
log.Printf("Parse error: %v", err)
continue
}
// Process record
processed := processRecord(record)
results <- processed
}
}()
}
// Result collector
var resultsWg sync.WaitGroup
resultsWg.Add(1)
go func() {
defer resultsWg.Done()
for result := range results {
// Handle results
log.Printf("Processed: %+v", result)
}
}()
// Read file and distribute lines
scanner := bufio.NewScanner(file)
buf := make([]byte, 1024*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
// Make a copy of line (scanner reuses buffer)
lineCopy := make([]byte, len(scanner.Bytes()))
copy(lineCopy, scanner.Bytes())
lines <- lineCopy
}
close(lines)
wg.Wait()
close(results)
resultsWg.Wait()
return scanner.Err()
}
func processRecord(r Record) Record {
// Expensive processing here
return r
}
func main() {
if err := processConcurrent("data.jsonl", 10); err != nil {
log.Fatal(err)
}
}
package main
import (
"bufio"
"compress/gzip"
"encoding/json"
"log"
"os"
)
type Record struct {
ID int `json:"id"`
Data string `json:"data"`
}
// Read gzipped JSONL
func readGzippedJSONL(filepath string) error {
file, err := os.Open(filepath)
if err != nil {
return err
}
defer file.Close()
gzReader, err := gzip.NewReader(file)
if err != nil {
return err
}
defer gzReader.Close()
scanner := bufio.NewScanner(gzReader)
buf := make([]byte, 1024*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
var record Record
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
log.Printf("Parse error: %v", err)
continue
}
// Process record
log.Printf("%+v", record)
}
return scanner.Err()
}
// Write gzipped JSONL
func writeGzippedJSONL(records []Record, filepath string) error {
file, err := os.Create(filepath)
if err != nil {
return err
}
defer file.Close()
gzWriter := gzip.NewWriter(file)
defer gzWriter.Close()
writer := bufio.NewWriter(gzWriter)
defer writer.Flush()
encoder := json.NewEncoder(writer)
for _, record := range records {
if err := encoder.Encode(&record); err != nil {
return err
}
}
return nil
}
func main() {
// Read
if err := readGzippedJSONL("data.jsonl.gz"); err != nil {
log.Fatal(err)
}
// Write
records := []Record{{ID: 1, Data: "test"}}
if err := writeGzippedJSONL(records, "output.jsonl.gz"); err != nil {
log.Fatal(err)
}
}
package main
import (
"bufio"
"encoding/json"
"io"
"os"
)
// Generic JSONL reader
func ReadJSONL[T any](filepath string) ([]T, error) {
file, err := os.Open(filepath)
if err != nil {
return nil, err
}
defer file.Close()
var results []T
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var record T
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
continue
}
results = append(results, record)
}
return results, scanner.Err()
}
// Generic JSONL writer
func WriteJSONL[T any](records []T, filepath string) error {
file, err := os.Create(filepath)
if err != nil {
return err
}
defer file.Close()
writer := bufio.NewWriter(file)
defer writer.Flush()
encoder := json.NewEncoder(writer)
for _, record := range records {
if err := encoder.Encode(&record); err != nil {
return err
}
}
return nil
}
// Streaming generic processor
func StreamJSONL[T any](r io.Reader, processFn func(T) error) error {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
var record T
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
continue
}
if err := processFn(record); err != nil {
return err
}
}
return scanner.Err()
}
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
func main() {
// Type-safe reading
users, _ := ReadJSONL[User]("users.jsonl")
// Type-safe writing
WriteJSONL(users, "output.jsonl")
// Streaming
file, _ := os.Open("users.jsonl")
defer file.Close()
StreamJSONL(file, func(u User) error {
// Process user
return nil
})
}
package main
import (
"bufio"
"compress/gzip"
"encoding/json"
"log"
"os"
"sync"
)
type InputRecord struct {
ID int `json:"id"`
Status string `json:"status"`
Value int `json:"value"`
}
type OutputRecord struct {
ID int `json:"id"`
Processed bool `json:"processed"`
Result int `json:"result"`
}
func main() {
if err := runETL("input.jsonl.gz", "output.jsonl.gz", 10); err != nil {
log.Fatal(err)
}
}
func runETL(inputPath, outputPath string, numWorkers int) error {
// Open input file
inFile, err := os.Open(inputPath)
if err != nil {
return err
}
defer inFile.Close()
gzReader, _ := gzip.NewReader(inFile)
defer gzReader.Close()
// Open output file
outFile, err := os.Create(outputPath)
if err != nil {
return err
}
defer outFile.Close()
gzWriter := gzip.NewWriter(outFile)
defer gzWriter.Close()
// Channels
inputs := make(chan InputRecord, numWorkers*2)
outputs := make(chan OutputRecord, numWorkers*2)
// Workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for input := range inputs {
// Filter
if input.Status != "active" {
continue
}
// Transform
output := OutputRecord{
ID: input.ID,
Processed: true,
Result: input.Value * 2,
}
outputs <- output
}
}()
}
// Writer goroutine
var writerWg sync.WaitGroup
writerWg.Add(1)
go func() {
defer writerWg.Done()
encoder := json.NewEncoder(gzWriter)
for output := range outputs {
encoder.Encode(&output)
}
}()
// Read and distribute
scanner := bufio.NewScanner(gzReader)
buf := make([]byte, 1024*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
var record InputRecord
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
continue
}
inputs <- record
}
// Cleanup
close(inputs)
wg.Wait()
close(outputs)
writerWg.Wait()
return scanner.Err()
}
Add Jackson or Gson to your project for JSON processing:
// Maven - Jackson
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
// Gradle - Jackson
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
// Maven - Gson
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
// Gradle - Gson
implementation 'com.google.code.gson:gson:2.10.1'
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class JacksonJSONLReader {
public static class User {
public int id;
public String name;
public String email;
}
public static void main(String[] args) throws IOException {
ObjectMapper mapper = new ObjectMapper();
List<User> users = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader("data.jsonl"))) {
String line;
int lineNum = 0;
while ((line = reader.readLine()) != null) {
lineNum++;
// Skip empty lines
if (line.trim().isEmpty()) {
continue;
}
try {
User user = mapper.readValue(line, User.class);
users.add(user);
System.out.println(user.name);
} catch (IOException e) {
System.err.println("Error on line " + lineNum + ": " + e.getMessage());
}
}
}
System.out.println("Loaded " + users.size() + " users");
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class JacksonJSONLWriter {
public static class User {
public int id;
public String name;
public String email;
public User(int id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
}
public static void main(String[] args) throws IOException {
ObjectMapper mapper = new ObjectMapper();
List<User> users = Arrays.asList(
new User(1, "Alice", "[email protected]"),
new User(2, "Bob", "[email protected]"),
new User(3, "Charlie", "[email protected]")
);
try (BufferedWriter writer = new BufferedWriter(new FileWriter("output.jsonl"))) {
for (User user : users) {
String json = mapper.writeValueAsString(user);
writer.write(json);
writer.newLine();
}
}
System.out.println("Wrote " + users.size() + " users to output.jsonl");
}
}
Jackson's streaming API provides memory-efficient processing:
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.function.Consumer;
public class StreamingJSONLProcessor {
public static class Record {
public int id;
public String data;
}
public static void streamJSONL(String filepath, Consumer<Record> processor) throws IOException {
ObjectMapper mapper = new ObjectMapper();
try (BufferedReader reader = new BufferedReader(new FileReader(filepath), 8192)) {
String line;
while ((line = reader.readLine()) != null) {
if (line.trim().isEmpty()) {
continue;
}
try {
Record record = mapper.readValue(line, Record.class);
processor.accept(record);
} catch (IOException e) {
System.err.println("Parse error: " + e.getMessage());
}
}
}
}
public static void main(String[] args) throws IOException {
// Process each record as it's read (low memory usage)
streamJSONL("large.jsonl", record -> {
System.out.println("Processing: " + record.id);
// Do expensive processing here
});
}
}
import com.google.gson.Gson;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class GsonJSONLReader {
public static class User {
int id;
String name;
String email;
}
public static void main(String[] args) throws IOException {
Gson gson = new Gson();
List<User> users = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader("data.jsonl"))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.trim().isEmpty()) {
continue;
}
User user = gson.fromJson(line, User.class);
users.add(user);
System.out.println(user.name);
}
}
System.out.println("Total users: " + users.size());
}
}
import com.google.gson.Gson;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class GsonJSONLWriter {
public static class User {
int id;
String name;
String email;
User(int id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
}
public static void main(String[] args) throws IOException {
Gson gson = new Gson();
List<User> users = Arrays.asList(
new User(1, "Alice", "[email protected]"),
new User(2, "Bob", "[email protected]")
);
try (BufferedWriter writer = new BufferedWriter(new FileWriter("output.jsonl"))) {
for (User user : users) {
String json = gson.toJson(user);
writer.write(json);
writer.newLine();
}
}
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelJSONLProcessor {
public static class Record {
public int id;
public String data;
public String status;
}
public static void main(String[] args) throws IOException {
ObjectMapper mapper = new ObjectMapper();
// Read and process in parallel
List<Record> results = Files.lines(Paths.get("data.jsonl"))
.parallel()
.filter(line -> !line.trim().isEmpty())
.map(line -> {
try {
return mapper.readValue(line, Record.class);
} catch (IOException e) {
return null;
}
})
.filter(record -> record != null)
.filter(record -> "active".equals(record.status))
.map(record -> {
// Expensive processing
record.data = processData(record.data);
return record;
})
.collect(Collectors.toList());
System.out.println("Processed " + results.size() + " records");
}
private static String processData(String data) {
// Simulate expensive processing
return data.toUpperCase();
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.*;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class JSONLETLPipeline {
private static final ObjectMapper mapper = new ObjectMapper();
public static class InputRecord {
public int id;
public String status;
public int value;
}
public static class OutputRecord {
public int id;
public boolean processed;
public int result;
public OutputRecord(int id, boolean processed, int result) {
this.id = id;
this.processed = processed;
this.result = result;
}
}
public static void main(String[] args) throws Exception {
runETL("input.jsonl.gz", "output.jsonl.gz", 10);
}
public static void runETL(String inputPath, String outputPath, int numThreads)
throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
BlockingQueue<OutputRecord> outputQueue = new LinkedBlockingQueue<>(1000);
// Writer thread
CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(() -> {
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(
new GZIPOutputStream(new FileOutputStream(outputPath))))) {
while (true) {
OutputRecord record = outputQueue.poll(1, TimeUnit.SECONDS);
if (record == null) continue;
if (record.id == -1) break; // Sentinel value
String json = mapper.writeValueAsString(record);
writer.write(json);
writer.newLine();
}
} catch (Exception e) {
e.printStackTrace();
}
});
// Read and process
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(
new GZIPInputStream(new FileInputStream(inputPath))))) {
reader.lines()
.parallel()
.filter(line -> !line.trim().isEmpty())
.forEach(line -> {
try {
InputRecord input = mapper.readValue(line, InputRecord.class);
// Filter
if (!"active".equals(input.status)) {
return;
}
// Transform
OutputRecord output = new OutputRecord(
input.id,
true,
input.value * 2
);
outputQueue.put(output);
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
}
});
}
// Signal completion
outputQueue.put(new OutputRecord(-1, false, 0));
// Wait for writer
writerFuture.get();
executor.shutdown();
System.out.println("ETL pipeline complete");
}
}
Add serde and serde_json to your Cargo.toml:
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Debug, Deserialize, Serialize)]
struct User {
id: u32,
name: String,
email: String,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = File::open("data.jsonl")?;
let reader = BufReader::new(file);
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
// Skip empty lines
if line.trim().is_empty() {
continue;
}
// Parse JSON
match serde_json::from_str::(&line) {
Ok(user) => {
println!("User: {} ({})", user.name, user.email);
}
Err(e) => {
eprintln!("Error on line {}: {}", line_num + 1, e);
}
}
}
Ok(())
}
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufWriter, Write};
#[derive(Debug, Deserialize, Serialize)]
struct User {
id: u32,
name: String,
email: String,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let users = vec![
User {
id: 1,
name: "Alice".to_string(),
email: "[email protected]".to_string(),
},
User {
id: 2,
name: "Bob".to_string(),
email: "[email protected]".to_string(),
},
];
let file = File::create("output.jsonl")?;
let mut writer = BufWriter::new(file);
for user in users {
// Serialize to JSON string
let json = serde_json::to_string(&user)?;
// Write JSON + newline
writeln!(writer, "{}", json)?;
}
// Ensure all data is written
writer.flush()?;
println!("Successfully wrote JSONL file");
Ok(())
}
Process large files line-by-line with minimal memory:
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Debug, Deserialize)]
struct Record {
id: u32,
data: String,
}
fn stream_jsonl<F>(filepath: &str, mut processor: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(Record) -> Result<(), Box<dyn std::error::Error>>,
{
let file = File::open(filepath)?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::(&line) {
Ok(record) => {
processor(record)?;
}
Err(e) => {
eprintln!("Parse error: {}", e);
continue;
}
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Process each record as it's read
stream_jsonl("large.jsonl", |record| {
println!("Processing ID: {}", record.id);
// Do expensive work here
Ok(())
})?;
Ok(())
}
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Debug, Deserialize)]
struct User {
id: u32,
name: String,
}
#[derive(Debug)]
struct ParseStats {
total_lines: usize,
successful: usize,
errors: usize,
}
fn read_jsonl_with_stats(filepath: &str) -> Result<(Vec<User>, ParseStats), std::io::Error> {
let file = File::open(filepath)?;
let reader = BufReader::new(file);
let mut users = Vec::new();
let mut stats = ParseStats {
total_lines: 0,
successful: 0,
errors: 0,
};
for (line_num, line) in reader.lines().enumerate() {
stats.total_lines += 1;
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::(&line) {
Ok(user) => {
users.push(user);
stats.successful += 1;
}
Err(e) => {
eprintln!("Line {}: {}", line_num + 1, e);
stats.errors += 1;
}
}
}
Ok((users, stats))
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let (users, stats) = read_jsonl_with_stats("data.jsonl")?;
println!("Loaded {} users", users.len());
println!("Stats: {:?}", stats);
Ok(())
}
Add flate2 for gzip support:
// Cargo.toml
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
flate2 = "1.0"
// src/main.rs
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
#[derive(Debug, Deserialize, Serialize)]
struct Record {
id: u32,
data: String,
}
fn read_gzipped_jsonl(filepath: &str) -> Result<Vec<Record>, Box<dyn std::error::Error>> {
let file = File::open(filepath)?;
let decoder = GzDecoder::new(file);
let reader = BufReader::new(decoder);
let mut records = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let record: Record = serde_json::from_str(&line)?;
records.push(record);
}
Ok(records)
}
fn write_gzipped_jsonl(records: &[Record], filepath: &str) -> Result<(), Box<dyn std::error::Error>> {
let file = File::create(filepath)?;
let encoder = GzEncoder::new(file, Compression::default());
let mut writer = BufWriter::new(encoder);
for record in records {
let json = serde_json::to_string(record)?;
writeln!(writer, "{}", json)?;
}
writer.flush()?;
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Read gzipped JSONL
let records = read_gzipped_jsonl("data.jsonl.gz")?;
println!("Loaded {} records", records.len());
// Write gzipped JSONL
write_gzipped_jsonl(&records, "output.jsonl.gz")?;
println!("Wrote compressed JSONL");
Ok(())
}
Add rayon for easy parallelism:
// Cargo.toml
[dependencies]
rayon = "1.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
// src/main.rs
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Debug, Deserialize, Serialize)]
struct Record {
id: u32,
status: String,
value: i32,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = File::open("data.jsonl")?;
let reader = BufReader::new(file);
// Read all lines into memory
let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();
// Process in parallel
let results: Vec<Record> = lines
.par_iter()
.filter_map(|line| {
if line.trim().is_empty() {
return None;
}
serde_json::from_str::(line).ok()
})
.filter(|record| record.status == "active")
.map(|mut record| {
// Expensive processing
record.value *= 2;
record
})
.collect();
println!("Processed {} records", results.len());
Ok(())
}
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
#[derive(Debug, Deserialize)]
struct InputRecord {
id: u32,
status: String,
value: i32,
}
#[derive(Debug, Serialize)]
struct OutputRecord {
id: u32,
processed: bool,
result: i32,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
run_etl("input.jsonl.gz", "output.jsonl.gz")?;
Ok(())
}
fn run_etl(input_path: &str, output_path: &str) -> Result<(), Box<dyn std::error::Error>> {
// Read gzipped input
let input_file = File::open(input_path)?;
let decoder = GzDecoder::new(input_file);
let reader = BufReader::new(decoder);
// Read all lines
let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();
println!("Processing {} lines...", lines.len());
// Parallel ETL pipeline
let results: Vec<OutputRecord> = lines
.par_iter()
.filter_map(|line| {
if line.trim().is_empty() {
return None;
}
// Parse
serde_json::from_str::(line).ok()
})
.filter(|input| {
// Filter: only active records
input.status == "active"
})
.map(|input| {
// Transform
OutputRecord {
id: input.id,
processed: true,
result: input.value * 2,
}
})
.collect();
println!("Filtered to {} records", results.len());
// Write gzipped output
let output_file = File::create(output_path)?;
let encoder = GzEncoder::new(output_file, Compression::default());
let mut writer = BufWriter::new(encoder);
for record in results {
let json = serde_json::to_string(&record)?;
writeln!(writer, "{}", json)?;
}
writer.flush()?;
println!("ETL complete!");
Ok(())
}