Debug common errors, validate files, fix encoding issues, and optimize performance with comprehensive troubleshooting strategies
Most JSONL problems fall into a few categories. This guide helps you quickly identify and fix issues with malformed JSON, encoding problems, parser errors, and performance bottlenecks.
Essential tools for troubleshooting JSONL files:
import json
import sys
from typing import Dict, List, Tuple
class JSONLValidator:
"""Comprehensive JSONL file validator"""
def __init__(self, filepath: str):
self.filepath = filepath
self.errors = []
self.warnings = []
self.stats = {
'total_lines': 0,
'valid_records': 0,
'empty_lines': 0,
'parse_errors': 0,
'total_bytes': 0
}
def validate(self) -> Dict:
"""Validate entire JSONL file"""
try:
with open(self.filepath, 'r', encoding='utf-8') as f:
line_num = 0
for line in f:
line_num += 1
self.stats['total_lines'] += 1
self.stats['total_bytes'] += len(line.encode('utf-8'))
# Check for empty lines
if not line.strip():
self.stats['empty_lines'] += 1
self.warnings.append({
'line': line_num,
'type': 'empty_line',
'message': 'Empty line (should be removed)'
})
continue
# Check for trailing whitespace
if line != line.strip() + '\n' and line != line.strip():
self.warnings.append({
'line': line_num,
'type': 'whitespace',
'message': 'Line has leading or trailing whitespace'
})
# Validate JSON
try:
record = json.loads(line)
self.stats['valid_records'] += 1
# Check if record is object (not array or primitive)
if not isinstance(record, dict):
self.warnings.append({
'line': line_num,
'type': 'not_object',
'message': f'Record is {type(record).__name__}, expected object/dict'
})
except json.JSONDecodeError as e:
self.stats['parse_errors'] += 1
self.errors.append({
'line': line_num,
'type': 'parse_error',
'message': str(e),
'content': line[:100] # First 100 chars
})
except UnicodeDecodeError as e:
self.errors.append({
'line': 0,
'type': 'encoding_error',
'message': f'File encoding error: {e}'
})
return self.get_report()
def get_report(self) -> Dict:
"""Generate validation report"""
return {
'valid': len(self.errors) == 0,
'stats': self.stats,
'errors': self.errors,
'warnings': self.warnings
}
def print_report(self):
"""Print human-readable report"""
report = self.get_report()
print(f"\n{'='*60}")
print(f"JSONL Validation Report: {self.filepath}")
print(f"{'='*60}\n")
print("Statistics:")
print(f" Total lines: {self.stats['total_lines']:,}")
print(f" Valid records: {self.stats['valid_records']:,}")
print(f" Empty lines: {self.stats['empty_lines']:,}")
print(f" Parse errors: {self.stats['parse_errors']:,}")
print(f" File size: {self.stats['total_bytes']:,} bytes")
if self.errors:
print(f"\n{len(self.errors)} ERRORS:")
for error in self.errors[:10]: # Show first 10
print(f"\n Line {error['line']}: {error['type']}")
print(f" {error['message']}")
if 'content' in error:
print(f" Content: {error['content']}")
if len(self.errors) > 10:
print(f"\n ... and {len(self.errors) - 10} more errors")
if self.warnings:
print(f"\n{len(self.warnings)} WARNINGS:")
for warning in self.warnings[:10]:
print(f" Line {warning['line']}: {warning['message']}")
if len(self.warnings) > 10:
print(f" ... and {len(self.warnings) - 10} more warnings")
if report['valid']:
print("\n✓ File is valid JSONL")
else:
print("\n✗ File has errors")
return report['valid']
# Usage
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python validator.py ")
sys.exit(1)
validator = JSONLValidator(sys.argv[1])
validator.validate()
is_valid = validator.print_report()
sys.exit(0 if is_valid else 1)
Usage: python validator.py data.jsonl
# Validate with jq (each line must be valid JSON)
cat data.jsonl | jq -c . > /dev/null && echo "Valid JSONL" || echo "Invalid JSONL"
# Show line numbers of invalid JSON
awk '{print NR, $0}' data.jsonl | while read -r num line; do
echo "$line" | jq . > /dev/null 2>&1 || echo "Error on line $num"
done
# Count valid vs invalid lines
total=$(wc -l < data.jsonl)
valid=$(cat data.jsonl | jq -c . 2>/dev/null | wc -l)
echo "Valid: $valid / $total"
# Find lines with common issues
grep -n '^\s' data.jsonl # Lines with leading whitespace
grep -n '\s$' data.jsonl # Lines with trailing whitespace
grep -n '^$' data.jsonl # Empty lines
# Validate and show specific errors
cat data.jsonl | while IFS= read -r line; do
echo "$line" | jq . > /dev/null 2>&1 || echo "Error: $line"
done
from jsonschema import validate, ValidationError
import json
# Define expected schema
SCHEMA = {
"type": "object",
"required": ["id", "name", "email"],
"properties": {
"id": {"type": "integer", "minimum": 1},
"name": {"type": "string", "minLength": 1},
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 0, "maximum": 150}
}
}
def validate_jsonl_schema(filepath, schema):
"""Validate records against schema"""
errors = []
with open(filepath, 'r') as f:
for line_num, line in enumerate(f, 1):
try:
record = json.loads(line)
# Validate against schema
validate(instance=record, schema=schema)
except json.JSONDecodeError as e:
errors.append({
'line': line_num,
'error': 'JSON parse error',
'message': str(e)
})
except ValidationError as e:
errors.append({
'line': line_num,
'error': 'Schema validation failed',
'message': e.message,
'path': list(e.path)
})
return errors
# Run validation
errors = validate_jsonl_schema('data.jsonl', SCHEMA)
if errors:
print(f"Found {len(errors)} validation errors:")
for error in errors[:20]:
print(f" Line {error['line']}: {error['message']}")
else:
print("All records match schema")
UTF-8 BOM at start of file causes JSON parse errors
# Detect BOM
hexdump -C data.jsonl | head -n 1
# Look for: ef bb bf (UTF-8 BOM)
# Remove BOM
tail -c +4 data.jsonl > data_fixed.jsonl # Skip first 3 bytes
# Python: Remove BOM
with open('data.jsonl', 'r', encoding='utf-8-sig') as f:
content = f.read()
with open('data_fixed.jsonl', 'w', encoding='utf-8') as f:
f.write(content)
File contains mix of UTF-8 and Latin-1
import chardet
def detect_encoding(filepath):
"""Detect file encoding"""
with open(filepath, 'rb') as f:
result = chardet.detect(f.read(10000))
return result
# Auto-detect and convert
def fix_encoding(input_file, output_file):
"""Convert to UTF-8"""
encoding = detect_encoding(input_file)['encoding']
print(f"Detected encoding: {encoding}")
with open(input_file, 'r', encoding=encoding, errors='replace') as f_in:
with open(output_file, 'w', encoding='utf-8') as f_out:
for line in f_in:
f_out.write(line)
fix_encoding('data.jsonl', 'data_utf8.jsonl')
Corrupted characters in file
def clean_invalid_utf8(input_file, output_file):
"""Remove or replace invalid UTF-8"""
with open(input_file, 'rb') as f_in:
content = f_in.read()
# Replace invalid sequences
cleaned = content.decode('utf-8', errors='replace')
with open(output_file, 'w', encoding='utf-8') as f_out:
f_out.write(cleaned)
# Alternative: ignore invalid characters
with open('data.jsonl', 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
process(line)
Windows (CRLF) vs Unix (LF) line endings
# Convert Windows to Unix
dos2unix data.jsonl
# Or with Python
def fix_line_endings(input_file, output_file):
"""Convert to Unix line endings"""
with open(input_file, 'rb') as f_in:
content = f_in.read()
# Replace CRLF with LF
content = content.replace(b'\r\n', b'\n')
with open(output_file, 'wb') as f_out:
f_out.write(content)
import json
import re
def find_malformed_records(filepath):
"""Identify and categorize malformed JSON"""
issues = {
'missing_quotes': [],
'trailing_commas': [],
'unescaped_quotes': [],
'incomplete_json': [],
'other': []
}
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
for line_num, line in enumerate(f, 1):
try:
json.loads(line)
except json.JSONDecodeError as e:
content = line.strip()
# Categorize error
if 'Expecting property name' in str(e):
issues['missing_quotes'].append((line_num, content, str(e)))
elif 'trailing comma' in str(e).lower():
issues['trailing_commas'].append((line_num, content, str(e)))
elif 'Unterminated string' in str(e):
issues['unescaped_quotes'].append((line_num, content, str(e)))
elif 'Expecting value' in str(e):
issues['incomplete_json'].append((line_num, content, str(e)))
else:
issues['other'].append((line_num, content, str(e)))
return issues
# Auto-fix common issues
def auto_fix_jsonl(input_file, output_file):
"""Attempt to fix common JSON issues"""
fixed = 0
skipped = 0
with open(input_file, 'r', encoding='utf-8', errors='replace') as f_in:
with open(output_file, 'w', encoding='utf-8') as f_out:
for line in f_in:
try:
# Try parsing as-is
record = json.loads(line)
f_out.write(json.dumps(record) + '\n')
fixed += 1
except json.JSONDecodeError:
# Try common fixes
fixed_line = line.strip()
# Remove trailing commas
fixed_line = re.sub(r',(\s*[}\]])', r'\1', fixed_line)
# Try parsing again
try:
record = json.loads(fixed_line)
f_out.write(json.dumps(record) + '\n')
fixed += 1
except json.JSONDecodeError:
skipped += 1
print(f"Could not fix line: {line[:100]}")
print(f"Fixed: {fixed}, Skipped: {skipped}")
return fixed, skipped
# Report malformed records
issues = find_malformed_records('data.jsonl')
for issue_type, records in issues.items():
if records:
print(f"\n{issue_type.upper()}: {len(records)} records")
for line_num, content, error in records[:5]:
print(f" Line {line_num}: {error}")
print(f" {content[:100]}")
# Attempt auto-fix
auto_fix_jsonl('data.jsonl', 'data_fixed.jsonl')
| Issue | Example | Fix |
|---|---|---|
| Trailing comma | {"a": 1, "b": 2,} |
{"a": 1, "b": 2} |
| Unquoted keys | {name: "Alice"} |
{"name": "Alice"} |
| Single quotes | {'name': 'Alice'} |
{"name": "Alice"} |
| Unescaped quotes | {"text": "He said "hi""} |
{"text": "He said \"hi\""} |
| Incomplete JSON | {"name": "Alice" |
{"name": "Alice"} |
| Multiple records per line | {"a": 1}{"b": 2} |
Split into separate lines |
import json
import traceback
def debug_parse_error(line, line_num):
"""Detailed parser error analysis"""
try:
json.loads(line)
return None
except json.JSONDecodeError as e:
error_info = {
'line_num': line_num,
'error_type': type(e).__name__,
'message': str(e),
'position': e.pos,
'line_content': line,
'error_location': line[max(0, e.pos-20):min(len(line), e.pos+20)]
}
# Show context around error
if e.pos < len(line):
error_info['char_at_error'] = repr(line[e.pos])
return error_info
# Analyze all errors in file
def analyze_all_errors(filepath):
"""Comprehensive error analysis"""
errors = []
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
for line_num, line in enumerate(f, 1):
error = debug_parse_error(line, line_num)
if error:
errors.append(error)
# Categorize errors
error_types = {}
for error in errors:
msg = error['message']
error_types[msg] = error_types.get(msg, 0) + 1
print(f"Found {len(errors)} parse errors\n")
print("Error type distribution:")
for msg, count in sorted(error_types.items(), key=lambda x: -x[1]):
print(f" {count:4d} - {msg}")
print("\nFirst 5 errors with context:")
for error in errors[:5]:
print(f"\nLine {error['line_num']}: {error['message']}")
print(f" Position: {error['position']}")
print(f" Context: ...{error['error_location']}...")
if 'char_at_error' in error:
print(f" Character at error: {error['char_at_error']}")
analyze_all_errors('data.jsonl')
def extract_valid_records(input_file, output_file, error_file=None):
"""Separate valid and invalid records"""
valid_count = 0
error_count = 0
with open(input_file, 'r', encoding='utf-8', errors='replace') as f_in:
with open(output_file, 'w', encoding='utf-8') as f_out:
error_writer = None
if error_file:
error_writer = open(error_file, 'w', encoding='utf-8')
for line_num, line in enumerate(f_in, 1):
try:
record = json.loads(line)
f_out.write(json.dumps(record) + '\n')
valid_count += 1
except json.JSONDecodeError as e:
error_count += 1
if error_writer:
error_writer.write(f"Line {line_num}: {e}\n")
error_writer.write(line)
error_writer.write('\n')
if error_writer:
error_writer.close()
print(f"Valid: {valid_count}, Errors: {error_count}")
return valid_count, error_count
# Usage
extract_valid_records('data.jsonl', 'valid.jsonl', 'errors.txt')
import tracemalloc
import json
def profile_memory_usage(filepath):
"""Profile memory usage while processing"""
tracemalloc.start()
# Snapshot before
snapshot1 = tracemalloc.take_snapshot()
# Bad: Load entire file
with open(filepath, 'r') as f:
data = [json.loads(line) for line in f]
# Snapshot after
snapshot2 = tracemalloc.take_snapshot()
# Compare
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("Memory usage (loading entire file):")
for stat in top_stats[:10]:
print(stat)
current, peak = tracemalloc.get_traced_memory()
print(f"\nCurrent memory: {current / 1024 / 1024:.1f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()
# Good: Stream processing
def stream_with_profiling(filepath):
"""Stream processing with memory monitoring"""
tracemalloc.start()
record_count = 0
for line in open(filepath, 'r'):
record = json.loads(line)
record_count += 1
if record_count % 10000 == 0:
current, peak = tracemalloc.get_traced_memory()
print(f"Processed {record_count}: {current / 1024 / 1024:.1f} MB")
tracemalloc.stop()
profile_memory_usage('large_file.jsonl')
stream_with_profiling('large_file.jsonl')
# Bad: Accumulates records in memory
def process_bad(filepath):
results = []
with open(filepath, 'r') as f:
for line in f:
record = json.loads(line)
processed = transform(record)
results.append(processed) # Memory leak!
return results
# Good: Stream processing
def process_good(filepath, output_file):
with open(filepath, 'r') as f_in:
with open(output_file, 'w') as f_out:
for line in f_in:
record = json.loads(line)
processed = transform(record)
f_out.write(json.dumps(processed) + '\n')
# Record immediately discarded after writing
# Good: Generator
def process_generator(filepath):
with open(filepath, 'r') as f:
for line in f:
record = json.loads(line)
yield transform(record)
# Use generator
for result in process_generator('data.jsonl'):
handle_result(result) # Process one at a time
import time
import cProfile
import pstats
def benchmark_processing(filepath):
"""Benchmark different processing methods"""
# Method 1: Load all
start = time.time()
with open(filepath, 'r') as f:
data = [json.loads(line) for line in f]
method1_time = time.time() - start
# Method 2: Stream
start = time.time()
count = 0
for line in open(filepath, 'r'):
record = json.loads(line)
count += 1
method2_time = time.time() - start
# Method 3: Streaming with orjson (faster JSON parser)
import orjson
start = time.time()
count = 0
for line in open(filepath, 'rb'):
record = orjson.loads(line)
count += 1
method3_time = time.time() - start
print(f"Load all: {method1_time:.2f}s")
print(f"Stream (json): {method2_time:.2f}s")
print(f"Stream (orjson): {method3_time:.2f}s")
print(f"orjson is {method2_time / method3_time:.1f}x faster")
# Profile specific function
def profile_function(filepath):
"""Profile with cProfile"""
def process_file():
for line in open(filepath, 'r'):
record = json.loads(line)
# Process...
profiler = cProfile.Profile()
profiler.enable()
process_file()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
benchmark_processing('large_file.jsonl')
profile_function('large_file.jsonl')
# Install: pip install orjson ujson
import orjson # Fastest
import ujson # Fast
import json # Standard library
# Benchmark
import time
# Standard json
start = time.time()
for line in open('data.jsonl', 'r'):
json.loads(line)
json_time = time.time() - start
# orjson (binary mode)
start = time.time()
for line in open('data.jsonl', 'rb'):
orjson.loads(line)
orjson_time = time.time() - start
print(f"json: {json_time:.2f}s")
print(f"orjson: {orjson_time:.2f}s ({json_time/orjson_time:.1f}x faster)")
from multiprocessing import Pool
import json
def process_chunk(lines):
return [json.loads(line) for line in lines]
def parallel_process(filepath, num_workers=4):
with open(filepath, 'r') as f:
lines = f.readlines()
# Split into chunks
chunk_size = len(lines) // num_workers
chunks = [lines[i:i+chunk_size] for i in range(0, len(lines), chunk_size)]
# Process in parallel
with Pool(num_workers) as pool:
results = pool.map(process_chunk, chunks)
return [item for sublist in results for item in sublist]
# Default buffering (8KB)
with open('data.jsonl', 'r') as f:
for line in f:
process(line)
# Larger buffer (1MB)
with open('data.jsonl', 'r', buffering=1024*1024) as f:
for line in f:
process(line)
# Read in chunks
def read_chunks(filepath, chunk_size=1024*1024):
with open(filepath, 'r') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
import json
from collections import Counter, defaultdict
class DataQualityAnalyzer:
"""Analyze JSONL data quality"""
def __init__(self, filepath):
self.filepath = filepath
self.stats = {
'total_records': 0,
'unique_keys': set(),
'missing_keys': defaultdict(int),
'null_values': defaultdict(int),
'empty_strings': defaultdict(int),
'data_types': defaultdict(Counter),
'duplicates': 0
}
self.seen_records = set()
def analyze(self):
"""Run full analysis"""
with open(self.filepath, 'r') as f:
for line in f:
try:
record = json.loads(line)
self.analyze_record(record)
except json.JSONDecodeError:
continue
return self.get_report()
def analyze_record(self, record):
"""Analyze single record"""
self.stats['total_records'] += 1
# Check for duplicates
record_hash = json.dumps(record, sort_keys=True)
if record_hash in self.seen_records:
self.stats['duplicates'] += 1
self.seen_records.add(record_hash)
# Collect keys
self.stats['unique_keys'].update(record.keys())
# Check each field
for key, value in record.items():
# Data type distribution
self.stats['data_types'][key][type(value).__name__] += 1
# Null values
if value is None:
self.stats['null_values'][key] += 1
# Empty strings
if value == '':
self.stats['empty_strings'][key] += 1
def get_report(self):
"""Generate quality report"""
report = {
'total_records': self.stats['total_records'],
'unique_keys': len(self.stats['unique_keys']),
'duplicates': self.stats['duplicates'],
'keys': list(self.stats['unique_keys']),
'quality_issues': []
}
# Find keys with high null rate
for key, null_count in self.stats['null_values'].items():
null_rate = null_count / self.stats['total_records']
if null_rate > 0.1: # >10% null
report['quality_issues'].append({
'key': key,
'issue': 'high_null_rate',
'rate': f"{null_rate*100:.1f}%"
})
# Find keys with inconsistent types
for key, type_counts in self.stats['data_types'].items():
if len(type_counts) > 1:
report['quality_issues'].append({
'key': key,
'issue': 'inconsistent_types',
'types': dict(type_counts)
})
return report
def print_report(self):
"""Print human-readable report"""
report = self.get_report()
print(f"\nData Quality Report")
print("=" * 60)
print(f"Total records: {report['total_records']:,}")
print(f"Unique keys: {report['unique_keys']}")
print(f"Duplicates: {report['duplicates']}")
if report['quality_issues']:
print(f"\nQuality Issues ({len(report['quality_issues'])}):")
for issue in report['quality_issues']:
print(f" {issue['key']}: {issue['issue']}")
if 'rate' in issue:
print(f" Rate: {issue['rate']}")
if 'types' in issue:
print(f" Types: {issue['types']}")
# Usage
analyzer = DataQualityAnalyzer('data.jsonl')
analyzer.analyze()
analyzer.print_report()
# Count records
wc -l data.jsonl
# Check file encoding
file -i data.jsonl
# Find records matching pattern
grep -n '"status": "error"' data.jsonl
# Extract specific field from all records
cat data.jsonl | jq -r '.email'
# Count unique values for a field
cat data.jsonl | jq -r '.category' | sort | uniq -c
# Find records with missing field
cat data.jsonl | jq 'select(.email == null)'
# Pretty print first record
head -n 1 data.jsonl | jq .
# Check for duplicate records
sort data.jsonl | uniq -d
# Split large file into chunks
split -l 1000000 data.jsonl chunk_
# Merge multiple JSONL files
cat file1.jsonl file2.jsonl > merged.jsonl
# Random sample of records
shuf -n 100 data.jsonl > sample.jsonl
#!/usr/bin/env python3
"""
JSONL Debug Utility
Usage: python jsonl_debug.py [--validate] [--stats] [--fix]
"""
import json
import sys
import argparse
def main():
parser = argparse.ArgumentParser(description='JSONL debugging utility')
parser.add_argument('file', help='JSONL file to analyze')
parser.add_argument('--validate', action='store_true', help='Validate JSON syntax')
parser.add_argument('--stats', action='store_true', help='Show statistics')
parser.add_argument('--fix', help='Attempt to fix and save to file')
parser.add_argument('--encoding', action='store_true', help='Check encoding')
args = parser.parse_args()
if args.validate:
validate_file(args.file)
if args.stats:
show_stats(args.file)
if args.fix:
fix_file(args.file, args.fix)
if args.encoding:
check_encoding(args.file)
if __name__ == '__main__':
main()
Problem: Cannot parse JSONL file
1. Check file encoding
file -i data.jsonlhexdump -C data.jsonl | head -n 12. Validate JSON syntax
cat data.jsonl | jq -c . > /dev/null3. Check for empty/whitespace lines
grep -n '^$' data.jsonlsed '/^$/d' data.jsonl > cleaned.jsonl4. Memory or performance issues
✓ If all steps pass: File is valid JSONL