Power your ETL workflows, data warehouses, and analytics at scale with JSONL - the format trusted by BigQuery, Snowflake, and modern data platforms
JSONL is the preferred format for loading data into modern data warehouses. Its line-based structure allows for parallel loading, schema-on-read flexibility, and efficient incremental updates.
# Python: Extract from PostgreSQL to JSONL
import psycopg2
import json
from datetime import datetime
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
# Query with cursor for memory efficiency
cursor.execute("SELECT * FROM users WHERE created_at > %s", (last_sync,))
with open(f"users_{datetime.now().strftime('%Y%m%d')}.jsonl", 'w') as f:
while True:
rows = cursor.fetchmany(1000) # Batch processing
if not rows:
break
for row in rows:
record = {
'id': row[0],
'email': row[1],
'name': row[2],
'created_at': row[3].isoformat()
}
f.write(json.dumps(record) + '\n')
cursor.close()
conn.close()
# Extract from MongoDB
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['myapp']
with open('orders.jsonl', 'w') as f:
for doc in db.orders.find({'status': 'completed'}):
doc['_id'] = str(doc['_id']) # Convert ObjectId
f.write(json.dumps(doc) + '\n')
# Extract from REST API
import requests
url = "https://api.example.com/events"
params = {'since': '2025-01-01', 'limit': 1000}
with open('api_events.jsonl', 'w') as f:
while True:
response = requests.get(url, params=params)
data = response.json()
for item in data['items']:
f.write(json.dumps(item) + '\n')
if not data.get('next_page'):
break
params['page'] = data['next_page']
# Python: Transform JSONL with pandas
import pandas as pd
import json
# Read JSONL into DataFrame
records = []
with open('users.jsonl', 'r') as f:
for line in f:
records.append(json.loads(line))
df = pd.DataFrame(records)
# Transformations
df['created_date'] = pd.to_datetime(df['created_at']).dt.date
df['email_domain'] = df['email'].str.split('@').str[1]
df['name_length'] = df['name'].str.len()
# Enrich with external data
df = df.merge(companies_df, left_on='email_domain', right_on='domain')
# Filter and clean
df = df[df['email'].str.contains('@')] # Valid emails
df = df.drop_duplicates(subset=['email'])
# Write transformed data
with open('users_transformed.jsonl', 'w') as f:
for record in df.to_dict('records'):
f.write(json.dumps(record) + '\n')
# Streaming transformation for large files
def transform_record(record):
"""Transform individual record"""
record['email'] = record['email'].lower()
record['created_year'] = record['created_at'][:4]
return record
with open('input.jsonl', 'r') as infile, \
open('output.jsonl', 'w') as outfile:
for line in infile:
record = json.loads(line)
transformed = transform_record(record)
outfile.write(json.dumps(transformed) + '\n')
# Load to PostgreSQL
import psycopg2
import json
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
with open('users_transformed.jsonl', 'r') as f:
for line in f:
record = json.loads(line)
cursor.execute(
"""
INSERT INTO users (id, email, name, created_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = NOW()
""",
(record['id'], record['email'], record['name'], record['created_at'])
)
conn.commit()
cursor.close()
# Bulk load with COPY
with open('users.jsonl', 'r') as f:
cursor.copy_expert(
"""
COPY users (data)
FROM STDIN
""",
f
)
# Python: Load JSONL to BigQuery
from google.cloud import bigquery
client = bigquery.Client()
table_id = "project.dataset.users"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True, # Auto-detect schema
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
# Load from local file
with open("users.jsonl", "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() # Wait for completion
print(f"Loaded {job.output_rows} rows")
# Load from Google Cloud Storage
uri = "gs://my-bucket/data/*.jsonl"
job = client.load_table_from_uri(uri, table_id, job_config=job_config)
job.result()
# Streaming inserts (real-time)
rows_to_insert = [
{"id": 1, "name": "Alice", "email": "[email protected]"},
{"id": 2, "name": "Bob", "email": "[email protected]"}
]
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"Errors: {errors}")
# Query nested JSON
query = """
SELECT
user_id,
JSON_VALUE(metadata, '$.country') as country,
JSON_VALUE(metadata, '$.city') as city
FROM `project.dataset.events`
WHERE DATE(timestamp) = CURRENT_DATE()
"""
results = client.query(query)
for row in results:
print(f"{row.user_id}: {row.country}, {row.city}")
-- Create table with schema
CREATE OR REPLACE TABLE `project.dataset.users` (
id INT64,
email STRING,
name STRING,
metadata JSON, -- Native JSON type
tags ARRAY<STRING>,
address STRUCT<
street STRING,
city STRING,
country STRING
>,
created_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY email;
-- Query JSON fields
SELECT
id,
JSON_EXTRACT_SCALAR(metadata, '$.signup_source') as signup_source,
JSON_EXTRACT_ARRAY(metadata, '$.interests') as interests
FROM `project.dataset.users`
WHERE JSON_EXTRACT_SCALAR(metadata, '$.plan') = 'premium';
-- Flatten nested arrays
SELECT
id,
tag
FROM `project.dataset.users`,
UNNEST(tags) as tag;
-- Create file format for JSONL
CREATE OR REPLACE FILE FORMAT jsonl_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = FALSE
COMPRESSION = 'GZIP';
-- Create stage for S3
CREATE OR REPLACE STAGE s3_stage
URL = 's3://my-bucket/data/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = jsonl_format;
-- Create target table
CREATE OR REPLACE TABLE users (
raw_data VARIANT, -- JSON data type
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load data from stage
COPY INTO users (raw_data)
FROM @s3_stage/users.jsonl.gz
FILE_FORMAT = jsonl_format
ON_ERROR = 'CONTINUE';
-- Create view with parsed columns
CREATE OR REPLACE VIEW users_parsed AS
SELECT
raw_data:id::NUMBER as id,
raw_data:email::STRING as email,
raw_data:name::STRING as name,
raw_data:metadata::VARIANT as metadata,
raw_data:created_at::TIMESTAMP as created_at,
loaded_at
FROM users;
-- Query nested JSON
SELECT
id,
email,
metadata:country::STRING as country,
metadata:preferences[0]::STRING as first_preference
FROM users_parsed
WHERE metadata:plan::STRING = 'premium';
import snowflake.connector
import json
conn = snowflake.connector.connect(
user='myuser',
password='mypassword',
account='myaccount',
warehouse='COMPUTE_WH',
database='MYDB',
schema='PUBLIC'
)
cursor = conn.cursor()
# Stage local file
cursor.execute("PUT file://users.jsonl @~")
# Load into table
cursor.execute("""
COPY INTO users (raw_data)
FROM @~/users.jsonl.gz
FILE_FORMAT = jsonl_format
""")
# Query data
cursor.execute("""
SELECT raw_data:email::STRING as email
FROM users
WHERE raw_data:created_at::DATE = CURRENT_DATE()
""")
for row in cursor:
print(row[0])
cursor.close()
conn.close()
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'etl_to_bigquery',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2025, 1, 1),
catchup=False
)
def extract_data(**context):
"""Extract data to JSONL"""
import json
import psycopg2
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
execution_date = context['execution_date']
filepath = f"/tmp/data_{execution_date.strftime('%Y%m%d')}.jsonl"
cursor.execute("SELECT * FROM users WHERE updated_at::DATE = %s", (execution_date.date(),))
with open(filepath, 'w') as f:
for row in cursor:
record = {'id': row[0], 'email': row[1], 'name': row[2]}
f.write(json.dumps(record) + '\n')
return filepath
def transform_data(**context):
"""Transform JSONL data"""
import json
filepath = context['task_instance'].xcom_pull(task_ids='extract')
output_path = filepath.replace('.jsonl', '_transformed.jsonl')
with open(filepath, 'r') as infile, open(output_path, 'w') as outfile:
for line in infile:
record = json.loads(line)
record['email'] = record['email'].lower()
record['processed_at'] = datetime.utcnow().isoformat()
outfile.write(json.dumps(record) + '\n')
return output_path
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag
)
upload_to_gcs = LocalFilesystemToGCSOperator(
task_id='upload_to_gcs',
src="{{ task_instance.xcom_pull(task_ids='transform') }}",
dst="data/{{ ds }}/users.jsonl",
bucket='my-data-bucket',
dag=dag
)
load_to_bq = BigQueryInsertJobOperator(
task_id='load_to_bigquery',
configuration={
"load": {
"sourceUris": ["gs://my-data-bucket/data/{{ ds }}/users.jsonl"],
"destinationTable": {
"projectId": "my-project",
"datasetId": "analytics",
"tableId": "users"
},
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"writeDisposition": "WRITE_APPEND",
"autodetect": True
}
},
dag=dag
)
extract >> transform >> upload_to_gcs >> load_to_bq
-- models/staging/stg_events.sql
-- Parse JSONL loaded into raw table
{{
config(
materialized='incremental',
unique_key='event_id'
)
}}
SELECT
raw_data:event_id::STRING as event_id,
raw_data:user_id::NUMBER as user_id,
raw_data:event_type::STRING as event_type,
raw_data:timestamp::TIMESTAMP as event_timestamp,
raw_data:properties::VARIANT as properties,
CURRENT_TIMESTAMP() as dbt_loaded_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE raw_data:timestamp::TIMESTAMP > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
-- models/marts/fct_user_events.sql
-- Aggregate user events
SELECT
user_id,
DATE(event_timestamp) as event_date,
event_type,
COUNT(*) as event_count,
MIN(event_timestamp) as first_event_at,
MAX(event_timestamp) as last_event_at
FROM {{ ref('stg_events') }}
GROUP BY 1, 2, 3
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("JSONLProcessor").getOrCreate()
# Read JSONL
df = spark.read.json("s3://bucket/data/*.jsonl")
# Schema inference
df.printSchema()
# Transformations
transformed_df = df \
.withColumn("created_date", to_date(col("created_at"))) \
.withColumn("email_domain", split(col("email"), "@").getItem(1)) \
.filter(col("status") == "active")
# Nested JSON operations
df_exploded = df \
.select(
col("id"),
explode(col("tags")).alias("tag")
)
# Aggregations
summary = df \
.groupBy("email_domain") \
.agg(
count("*").alias("user_count"),
avg("age").alias("avg_age")
) \
.orderBy(desc("user_count"))
# Write to JSONL (partitioned)
transformed_df.write \
.mode("overwrite") \
.partitionBy("created_date") \
.json("s3://bucket/output/")
# Write to Parquet for efficiency
transformed_df.write \
.mode("overwrite") \
.partitionBy("created_date") \
.parquet("s3://bucket/parquet/")
# Python: Incremental extract based on timestamps
import json
from datetime import datetime, timedelta
def incremental_extract(last_sync_time):
"""Extract only records modified since last sync"""
import psycopg2
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM users
WHERE updated_at > %s
ORDER BY updated_at
""", (last_sync_time,))
filename = f"incremental_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
with open(filename, 'w') as f:
for row in cursor:
record = {
'id': row[0],
'email': row[1],
'updated_at': row[2].isoformat()
}
f.write(json.dumps(record) + '\n')
cursor.close()
return filename
# Track last sync
with open('last_sync.txt', 'r') as f:
last_sync = datetime.fromisoformat(f.read().strip())
extract_file = incremental_extract(last_sync)
# Update last sync timestamp
with open('last_sync.txt', 'w') as f:
f.write(datetime.utcnow().isoformat())
-- BigQuery MERGE for upserts
MERGE `project.dataset.users` T
USING (
SELECT * FROM EXTERNAL_QUERY(
"projects/project/locations/us/connections/postgres",
"SELECT * FROM users WHERE updated_at > CURRENT_DATE - 1"
)
) S
ON T.id = S.id
WHEN MATCHED THEN
UPDATE SET
email = S.email,
name = S.name,
updated_at = S.updated_at
WHEN NOT MATCHED THEN
INSERT (id, email, name, created_at, updated_at)
VALUES (S.id, S.email, S.name, S.created_at, S.updated_at);