JSONL for Analytics Pipelines

Power your ETL workflows, data warehouses, and analytics at scale with JSONL - the format trusted by BigQuery, Snowflake, and modern data platforms

Why JSONL for Analytics?

Perfect for Data Warehouses

JSONL is the preferred format for loading data into modern data warehouses. Its line-based structure allows for parallel loading, schema-on-read flexibility, and efficient incremental updates.

  • Parallel data loading
  • Schema evolution support
  • Incremental updates
  • Nested data structures

Platform Support

  • BigQuery: Native JSONL import
  • Snowflake: JSONL bulk loading
  • Redshift: COPY from S3 JSONL
  • Athena: Query JSONL directly
  • Databricks: Delta Lake ingestion

Jump to Topic

ETL Workflows with JSONL

Extract - Pull Data to JSONL

# Python: Extract from PostgreSQL to JSONL
import psycopg2
import json
from datetime import datetime

conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()

# Query with cursor for memory efficiency
cursor.execute("SELECT * FROM users WHERE created_at > %s", (last_sync,))

with open(f"users_{datetime.now().strftime('%Y%m%d')}.jsonl", 'w') as f:
    while True:
        rows = cursor.fetchmany(1000)  # Batch processing
        if not rows:
            break

        for row in rows:
            record = {
                'id': row[0],
                'email': row[1],
                'name': row[2],
                'created_at': row[3].isoformat()
            }
            f.write(json.dumps(record) + '\n')

cursor.close()
conn.close()

# Extract from MongoDB
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['myapp']

with open('orders.jsonl', 'w') as f:
    for doc in db.orders.find({'status': 'completed'}):
        doc['_id'] = str(doc['_id'])  # Convert ObjectId
        f.write(json.dumps(doc) + '\n')

# Extract from REST API
import requests

url = "https://api.example.com/events"
params = {'since': '2025-01-01', 'limit': 1000}

with open('api_events.jsonl', 'w') as f:
    while True:
        response = requests.get(url, params=params)
        data = response.json()

        for item in data['items']:
            f.write(json.dumps(item) + '\n')

        if not data.get('next_page'):
            break
        params['page'] = data['next_page']

Transform - Process JSONL Data

# Python: Transform JSONL with pandas
import pandas as pd
import json

# Read JSONL into DataFrame
records = []
with open('users.jsonl', 'r') as f:
    for line in f:
        records.append(json.loads(line))

df = pd.DataFrame(records)

# Transformations
df['created_date'] = pd.to_datetime(df['created_at']).dt.date
df['email_domain'] = df['email'].str.split('@').str[1]
df['name_length'] = df['name'].str.len()

# Enrich with external data
df = df.merge(companies_df, left_on='email_domain', right_on='domain')

# Filter and clean
df = df[df['email'].str.contains('@')]  # Valid emails
df = df.drop_duplicates(subset=['email'])

# Write transformed data
with open('users_transformed.jsonl', 'w') as f:
    for record in df.to_dict('records'):
        f.write(json.dumps(record) + '\n')

# Streaming transformation for large files
def transform_record(record):
    """Transform individual record"""
    record['email'] = record['email'].lower()
    record['created_year'] = record['created_at'][:4]
    return record

with open('input.jsonl', 'r') as infile, \
     open('output.jsonl', 'w') as outfile:
    for line in infile:
        record = json.loads(line)
        transformed = transform_record(record)
        outfile.write(json.dumps(transformed) + '\n')

Load - Import to Data Warehouse

# Load to PostgreSQL
import psycopg2
import json

conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()

with open('users_transformed.jsonl', 'r') as f:
    for line in f:
        record = json.loads(line)
        cursor.execute(
            """
            INSERT INTO users (id, email, name, created_at)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (id) DO UPDATE
            SET email = EXCLUDED.email,
                name = EXCLUDED.name,
                updated_at = NOW()
            """,
            (record['id'], record['email'], record['name'], record['created_at'])
        )

conn.commit()
cursor.close()

# Bulk load with COPY
with open('users.jsonl', 'r') as f:
    cursor.copy_expert(
        """
        COPY users (data)
        FROM STDIN
        """,
        f
    )

Google BigQuery

Loading JSONL into BigQuery

# Python: Load JSONL to BigQuery
from google.cloud import bigquery

client = bigquery.Client()
table_id = "project.dataset.users"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    autodetect=True,  # Auto-detect schema
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

# Load from local file
with open("users.jsonl", "rb") as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)

job.result()  # Wait for completion
print(f"Loaded {job.output_rows} rows")

# Load from Google Cloud Storage
uri = "gs://my-bucket/data/*.jsonl"
job = client.load_table_from_uri(uri, table_id, job_config=job_config)
job.result()

# Streaming inserts (real-time)
rows_to_insert = [
    {"id": 1, "name": "Alice", "email": "[email protected]"},
    {"id": 2, "name": "Bob", "email": "[email protected]"}
]

errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
    print(f"Errors: {errors}")

# Query nested JSON
query = """
SELECT
    user_id,
    JSON_VALUE(metadata, '$.country') as country,
    JSON_VALUE(metadata, '$.city') as city
FROM `project.dataset.events`
WHERE DATE(timestamp) = CURRENT_DATE()
"""

results = client.query(query)
for row in results:
    print(f"{row.user_id}: {row.country}, {row.city}")

Schema Design for JSON Data

-- Create table with schema
CREATE OR REPLACE TABLE `project.dataset.users` (
    id INT64,
    email STRING,
    name STRING,
    metadata JSON,  -- Native JSON type
    tags ARRAY<STRING>,
    address STRUCT<
        street STRING,
        city STRING,
        country STRING
    >,
    created_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY email;

-- Query JSON fields
SELECT
    id,
    JSON_EXTRACT_SCALAR(metadata, '$.signup_source') as signup_source,
    JSON_EXTRACT_ARRAY(metadata, '$.interests') as interests
FROM `project.dataset.users`
WHERE JSON_EXTRACT_SCALAR(metadata, '$.plan') = 'premium';

-- Flatten nested arrays
SELECT
    id,
    tag
FROM `project.dataset.users`,
UNNEST(tags) as tag;

Snowflake

Loading JSONL into Snowflake

-- Create file format for JSONL
CREATE OR REPLACE FILE FORMAT jsonl_format
    TYPE = 'JSON'
    STRIP_OUTER_ARRAY = FALSE
    COMPRESSION = 'GZIP';

-- Create stage for S3
CREATE OR REPLACE STAGE s3_stage
    URL = 's3://my-bucket/data/'
    CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
    FILE_FORMAT = jsonl_format;

-- Create target table
CREATE OR REPLACE TABLE users (
    raw_data VARIANT,  -- JSON data type
    loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Load data from stage
COPY INTO users (raw_data)
FROM @s3_stage/users.jsonl.gz
FILE_FORMAT = jsonl_format
ON_ERROR = 'CONTINUE';

-- Create view with parsed columns
CREATE OR REPLACE VIEW users_parsed AS
SELECT
    raw_data:id::NUMBER as id,
    raw_data:email::STRING as email,
    raw_data:name::STRING as name,
    raw_data:metadata::VARIANT as metadata,
    raw_data:created_at::TIMESTAMP as created_at,
    loaded_at
FROM users;

-- Query nested JSON
SELECT
    id,
    email,
    metadata:country::STRING as country,
    metadata:preferences[0]::STRING as first_preference
FROM users_parsed
WHERE metadata:plan::STRING = 'premium';

Snowflake Python Connector

import snowflake.connector
import json

conn = snowflake.connector.connect(
    user='myuser',
    password='mypassword',
    account='myaccount',
    warehouse='COMPUTE_WH',
    database='MYDB',
    schema='PUBLIC'
)

cursor = conn.cursor()

# Stage local file
cursor.execute("PUT file://users.jsonl @~")

# Load into table
cursor.execute("""
    COPY INTO users (raw_data)
    FROM @~/users.jsonl.gz
    FILE_FORMAT = jsonl_format
""")

# Query data
cursor.execute("""
    SELECT raw_data:email::STRING as email
    FROM users
    WHERE raw_data:created_at::DATE = CURRENT_DATE()
""")

for row in cursor:
    print(row[0])

cursor.close()
conn.close()

Apache Airflow Orchestration

ETL DAG with JSONL

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'etl_to_bigquery',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2025, 1, 1),
    catchup=False
)

def extract_data(**context):
    """Extract data to JSONL"""
    import json
    import psycopg2

    conn = psycopg2.connect("postgresql://...")
    cursor = conn.cursor()

    execution_date = context['execution_date']
    filepath = f"/tmp/data_{execution_date.strftime('%Y%m%d')}.jsonl"

    cursor.execute("SELECT * FROM users WHERE updated_at::DATE = %s", (execution_date.date(),))

    with open(filepath, 'w') as f:
        for row in cursor:
            record = {'id': row[0], 'email': row[1], 'name': row[2]}
            f.write(json.dumps(record) + '\n')

    return filepath

def transform_data(**context):
    """Transform JSONL data"""
    import json

    filepath = context['task_instance'].xcom_pull(task_ids='extract')
    output_path = filepath.replace('.jsonl', '_transformed.jsonl')

    with open(filepath, 'r') as infile, open(output_path, 'w') as outfile:
        for line in infile:
            record = json.loads(line)
            record['email'] = record['email'].lower()
            record['processed_at'] = datetime.utcnow().isoformat()
            outfile.write(json.dumps(record) + '\n')

    return output_path

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag
)

upload_to_gcs = LocalFilesystemToGCSOperator(
    task_id='upload_to_gcs',
    src="{{ task_instance.xcom_pull(task_ids='transform') }}",
    dst="data/{{ ds }}/users.jsonl",
    bucket='my-data-bucket',
    dag=dag
)

load_to_bq = BigQueryInsertJobOperator(
    task_id='load_to_bigquery',
    configuration={
        "load": {
            "sourceUris": ["gs://my-data-bucket/data/{{ ds }}/users.jsonl"],
            "destinationTable": {
                "projectId": "my-project",
                "datasetId": "analytics",
                "tableId": "users"
            },
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "writeDisposition": "WRITE_APPEND",
            "autodetect": True
        }
    },
    dag=dag
)

extract >> transform >> upload_to_gcs >> load_to_bq

dbt (Data Build Tool)

Transforming JSON in dbt

-- models/staging/stg_events.sql
-- Parse JSONL loaded into raw table

{{
    config(
        materialized='incremental',
        unique_key='event_id'
    )
}}

SELECT
    raw_data:event_id::STRING as event_id,
    raw_data:user_id::NUMBER as user_id,
    raw_data:event_type::STRING as event_type,
    raw_data:timestamp::TIMESTAMP as event_timestamp,
    raw_data:properties::VARIANT as properties,
    CURRENT_TIMESTAMP() as dbt_loaded_at
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
    WHERE raw_data:timestamp::TIMESTAMP > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}

-- models/marts/fct_user_events.sql
-- Aggregate user events

SELECT
    user_id,
    DATE(event_timestamp) as event_date,
    event_type,
    COUNT(*) as event_count,
    MIN(event_timestamp) as first_event_at,
    MAX(event_timestamp) as last_event_at
FROM {{ ref('stg_events') }}
GROUP BY 1, 2, 3

Apache Spark

Processing JSONL with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("JSONLProcessor").getOrCreate()

# Read JSONL
df = spark.read.json("s3://bucket/data/*.jsonl")

# Schema inference
df.printSchema()

# Transformations
transformed_df = df \
    .withColumn("created_date", to_date(col("created_at"))) \
    .withColumn("email_domain", split(col("email"), "@").getItem(1)) \
    .filter(col("status") == "active")

# Nested JSON operations
df_exploded = df \
    .select(
        col("id"),
        explode(col("tags")).alias("tag")
    )

# Aggregations
summary = df \
    .groupBy("email_domain") \
    .agg(
        count("*").alias("user_count"),
        avg("age").alias("avg_age")
    ) \
    .orderBy(desc("user_count"))

# Write to JSONL (partitioned)
transformed_df.write \
    .mode("overwrite") \
    .partitionBy("created_date") \
    .json("s3://bucket/output/")

# Write to Parquet for efficiency
transformed_df.write \
    .mode("overwrite") \
    .partitionBy("created_date") \
    .parquet("s3://bucket/parquet/")

Incremental Loading Patterns

Timestamp-Based Incremental

# Python: Incremental extract based on timestamps
import json
from datetime import datetime, timedelta

def incremental_extract(last_sync_time):
    """Extract only records modified since last sync"""
    import psycopg2

    conn = psycopg2.connect("postgresql://...")
    cursor = conn.cursor()

    cursor.execute("""
        SELECT * FROM users
        WHERE updated_at > %s
        ORDER BY updated_at
    """, (last_sync_time,))

    filename = f"incremental_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"

    with open(filename, 'w') as f:
        for row in cursor:
            record = {
                'id': row[0],
                'email': row[1],
                'updated_at': row[2].isoformat()
            }
            f.write(json.dumps(record) + '\n')

    cursor.close()
    return filename

# Track last sync
with open('last_sync.txt', 'r') as f:
    last_sync = datetime.fromisoformat(f.read().strip())

extract_file = incremental_extract(last_sync)

# Update last sync timestamp
with open('last_sync.txt', 'w') as f:
    f.write(datetime.utcnow().isoformat())

Upsert Pattern

-- BigQuery MERGE for upserts
MERGE `project.dataset.users` T
USING (
    SELECT * FROM EXTERNAL_QUERY(
        "projects/project/locations/us/connections/postgres",
        "SELECT * FROM users WHERE updated_at > CURRENT_DATE - 1"
    )
) S
ON T.id = S.id
WHEN MATCHED THEN
    UPDATE SET
        email = S.email,
        name = S.name,
        updated_at = S.updated_at
WHEN NOT MATCHED THEN
    INSERT (id, email, name, created_at, updated_at)
    VALUES (S.id, S.email, S.name, S.created_at, S.updated_at);

Best Practices

Data Quality

  • Validate JSON syntax before loading
  • Implement schema validation
  • Handle nulls and missing fields
  • Deduplicate records
  • Add data quality checks in pipelines

Performance

  • Partition data by date for faster queries
  • Compress JSONL files (gzip, snappy)
  • Use columnar formats (Parquet) for analytics
  • Implement incremental loads
  • Cluster tables on frequently queried columns

Reliability

  • Implement idempotent pipelines
  • Add retry logic with exponential backoff
  • Monitor pipeline failures
  • Maintain data lineage
  • Version your data and schemas

Cost Optimization

  • Use data lifecycle policies
  • Archive old data to cold storage
  • Optimize query patterns
  • Monitor storage costs
  • Use appropriate compression

Related Resources