Build high-performance REST APIs, streaming endpoints, and webhooks with JSONL - the format that scales from startup to enterprise
JSONL enables streaming responses that start delivering data immediately, reducing time-to-first-byte and improving perceived performance. Instead of buffering entire responses, clients can process records as they arrive.
Leading API providers use JSONL for bulk exports and streaming endpoints:
Stream database results as JSONL for efficient data delivery without loading everything into memory.
const express = require('express');
const { Pool } = require('pg');
const app = express();
const pool = new Pool({
host: 'localhost',
database: 'myapp',
user: 'postgres',
password: 'secret'
});
// JSONL endpoint with streaming
app.get('/api/users.jsonl', async (req, res) => {
// Set headers for JSONL
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Content-Disposition', 'attachment; filename="users.jsonl"');
try {
// Query with cursor for streaming
const client = await pool.connect();
const query = 'SELECT * FROM users ORDER BY created_at DESC';
const cursor = client.query(new Cursor(query));
// Stream rows as JSONL
const readRows = async () => {
const rows = await cursor.read(100); // Batch of 100
if (rows.length === 0) {
res.end();
cursor.close();
client.release();
return;
}
// Write each row as JSONL
for (const row of rows) {
res.write(JSON.stringify(row) + '\n');
}
// Continue streaming
readRows();
};
await readRows();
} catch (error) {
console.error('Stream error:', error);
res.status(500).end();
}
});
// Alternative: Stream from array/generator
app.get('/api/events.jsonl', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
const eventStream = getEventsStream(); // Returns async generator
for await (const event of eventStream) {
if (!res.write(JSON.stringify(event) + '\n')) {
// Client disconnected or buffer full
await new Promise(resolve => res.once('drain', resolve));
}
}
res.end();
});
// Transform stream pipeline
const { Transform } = require('stream');
class JSONLTransform extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
this.push(JSON.stringify(chunk) + '\n');
callback();
} catch (error) {
callback(error);
}
}
}
app.get('/api/products.jsonl', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
const dbStream = pool.query(new QueryStream('SELECT * FROM products'));
const jsonlStream = new JSONLTransform();
dbStream
.pipe(jsonlStream)
.pipe(res)
.on('error', (error) => {
console.error('Pipeline error:', error);
res.status(500).end();
});
});
app.listen(3000, () => {
console.log('API server running on port 3000');
});
Use StreamingResponse for memory-efficient JSONL endpoints in FastAPI.
from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse
import asyncpg
import json
from typing import AsyncGenerator
app = FastAPI()
# Database connection pool
DATABASE_URL = "postgresql://user:password@localhost/myapp"
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(DATABASE_URL)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
async def generate_users_jsonl() -> AsyncGenerator[str, None]:
"""Stream users as JSONL"""
async with pool.acquire() as conn:
# Use cursor for streaming large datasets
async with conn.transaction():
cursor = await conn.cursor('SELECT * FROM users ORDER BY id')
async for record in cursor:
user = dict(record)
yield json.dumps(user) + '\n'
@app.get("/api/users.jsonl")
async def get_users_jsonl():
"""Stream all users as JSONL"""
return StreamingResponse(
generate_users_jsonl(),
media_type="application/x-ndjson",
headers={
"Content-Disposition": "attachment; filename=users.jsonl"
}
)
@app.get("/api/search.jsonl")
async def search_jsonl(
q: str = Query(..., description="Search query"),
limit: int = Query(1000, le=10000)
):
"""Search and stream results as JSONL"""
async def generate_results():
async with pool.acquire() as conn:
query = """
SELECT * FROM products
WHERE name ILIKE $1 OR description ILIKE $1
LIMIT $2
"""
async for record in conn.cursor(query, f'%{q}%', limit):
yield json.dumps(dict(record)) + '\n'
return StreamingResponse(
generate_results(),
media_type="application/x-ndjson"
)
# Generator with filtering and transformation
@app.get("/api/analytics.jsonl")
async def analytics_jsonl(
start_date: str,
end_date: str,
metric: str = "all"
):
"""Stream analytics data with filtering"""
async def generate_analytics():
async with pool.acquire() as conn:
query = """
SELECT
date,
metric_name,
metric_value,
metadata
FROM analytics
WHERE date BETWEEN $1 AND $2
ORDER BY date
"""
async for record in conn.cursor(query, start_date, end_date):
data = dict(record)
# Filter by metric if specified
if metric != "all" and data['metric_name'] != metric:
continue
# Transform metadata JSON
if data['metadata']:
data['metadata'] = json.loads(data['metadata'])
yield json.dumps(data) + '\n'
return StreamingResponse(
generate_analytics(),
media_type="application/x-ndjson",
headers={
"Content-Disposition": f"attachment; filename=analytics_{start_date}_{end_date}.jsonl"
}
)
# Error handling in streams
@app.get("/api/export.jsonl")
async def export_jsonl():
"""Export with error handling"""
async def generate_with_errors():
try:
async with pool.acquire() as conn:
async for record in conn.cursor('SELECT * FROM large_table'):
try:
yield json.dumps(dict(record)) + '\n'
except Exception as e:
# Log error but continue stream
error_record = {
"error": str(e),
"record_id": record.get('id')
}
yield json.dumps(error_record) + '\n'
except Exception as e:
# Stream-level error
error = {"fatal_error": str(e)}
yield json.dumps(error) + '\n'
return StreamingResponse(
generate_with_errors(),
media_type="application/x-ndjson"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
High-performance streaming with Go's built-in HTTP server and channels.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
_ "github.com/lib/pq"
)
type User struct {
ID int `json:"id"`
Email string `json:"email"`
Name string `json:"name"`
CreatedAt time.Time `json:"created_at"`
}
var db *sql.DB
func init() {
var err error
db, err = sql.Open("postgres",
"host=localhost user=postgres password=secret dbname=myapp sslmode=disable")
if err != nil {
log.Fatal(err)
}
}
// Stream users as JSONL
func usersJSONLHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Content-Disposition", "attachment; filename=users.jsonl")
// Query with rows streaming
rows, err := db.Query("SELECT id, email, name, created_at FROM users ORDER BY id")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer rows.Close()
encoder := json.NewEncoder(w)
for rows.Next() {
var user User
err := rows.Scan(&user.ID, &user.Email, &user.Name, &user.CreatedAt)
if err != nil {
log.Printf("Scan error: %v", err)
continue
}
// Encode directly to response writer
if err := encoder.Encode(user); err != nil {
log.Printf("Encode error: %v", err)
return
}
// Flush to send data immediately
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
if err := rows.Err(); err != nil {
log.Printf("Rows iteration error: %v", err)
}
}
// Stream with cancellation support
func streamWithContext(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-ndjson")
// Create channel for results
results := make(chan interface{}, 100)
errors := make(chan error, 1)
// Start background goroutine to fetch data
go func() {
defer close(results)
defer close(errors)
rows, err := db.QueryContext(r.Context(), "SELECT * FROM large_table")
if err != nil {
errors <- err
return
}
defer rows.Close()
for rows.Next() {
var record map[string]interface{}
// Scan into map...
select {
case results <- record:
case <-r.Context().Done():
// Client disconnected
return
}
}
if err := rows.Err(); err != nil {
errors <- err
}
}()
// Stream results to client
encoder := json.NewEncoder(w)
for {
select {
case record, ok := <-results:
if !ok {
return // Channel closed, done
}
if err := encoder.Encode(record); err != nil {
log.Printf("Encode error: %v", err)
return
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
case err := <-errors:
if err != nil {
log.Printf("Query error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
case <-r.Context().Done():
log.Println("Client disconnected")
return
}
}
}
// Batch streaming for better performance
func batchStreamHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-ndjson")
const batchSize = 1000
rows, err := db.Query("SELECT * FROM events ORDER BY timestamp")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer rows.Close()
encoder := json.NewEncoder(w)
count := 0
for rows.Next() {
var event map[string]interface{}
// Scan event...
if err := encoder.Encode(event); err != nil {
log.Printf("Encode error: %v", err)
return
}
count++
// Flush every N records
if count%batchSize == 0 {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
}
// Final flush
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
func main() {
http.HandleFunc("/api/users.jsonl", usersJSONLHandler)
http.HandleFunc("/api/stream.jsonl", streamWithContext)
http.HandleFunc("/api/batch.jsonl", batchStreamHandler)
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
class Api::UsersController < ApplicationController
# Stream users as JSONL
def index_jsonl
headers['Content-Type'] = 'application/x-ndjson'
headers['Content-Disposition'] = 'attachment; filename="users.jsonl"'
# Disable layout and template rendering
self.response_body = Enumerator.new do |yielder|
User.find_each(batch_size: 1000) do |user|
yielder << user.to_json + "\n"
end
end
end
# Stream with filtering
def search_jsonl
query = params[:q]
headers['Content-Type'] = 'application/x-ndjson'
self.response_body = Enumerator.new do |yielder|
User.where("name ILIKE ?", "%#{query}%").find_each do |user|
yielder << user.to_json + "\n"
end
end
end
# Export with associations
def export_jsonl
headers['Content-Type'] = 'application/x-ndjson'
self.response_body = Enumerator.new do |yielder|
User.includes(:posts, :comments).find_each do |user|
record = {
id: user.id,
name: user.name,
email: user.email,
posts_count: user.posts.count,
comments_count: user.comments.count,
recent_posts: user.posts.limit(5).map(&:attributes)
}
yielder << record.to_json + "\n"
end
end
end
end
# routes.rb
Rails.application.routes.draw do
namespace :api do
get 'users.jsonl', to: 'users#index_jsonl'
get 'search.jsonl', to: 'users#search_jsonl'
get 'export.jsonl', to: 'users#export_jsonl'
end
end
Use HTTP chunked encoding to stream JSONL without knowing content length upfront.
// Node.js chunked streaming
app.get('/api/live-stream.jsonl', (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Transfer-Encoding', 'chunked');
res.setHeader('X-Content-Type-Options', 'nosniff');
let count = 0;
const maxRecords = 10000;
const interval = setInterval(() => {
const record = {
id: count,
timestamp: new Date().toISOString(),
value: Math.random() * 100
};
res.write(JSON.stringify(record) + '\n');
count++;
if (count >= maxRecords) {
clearInterval(interval);
res.end();
}
}, 10); // Send record every 10ms
});
// Client consumption
async function consumeStream(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
// Process complete lines
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
const record = JSON.parse(line);
processRecord(record);
}
}
}
// Process final buffer
if (buffer.trim()) {
const record = JSON.parse(buffer);
processRecord(record);
}
}
function processRecord(record) {
console.log('Received:', record);
// Update UI, store in database, etc.
}
// Python: Infinite event stream
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from datetime import datetime
app = FastAPI()
async def event_generator():
"""Generate events indefinitely"""
event_id = 0
while True:
event = {
"id": event_id,
"type": "heartbeat",
"timestamp": datetime.utcnow().isoformat(),
"data": {
"cpu_usage": random.random() * 100,
"memory_usage": random.random() * 100
}
}
yield json.dumps(event) + '\n'
event_id += 1
await asyncio.sleep(1) # Wait 1 second between events
@app.get("/api/events/stream.jsonl")
async def event_stream():
"""Infinite JSONL event stream"""
return StreamingResponse(
event_generator(),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
# Client with reconnection
import requests
import json
import time
def consume_infinite_stream(url, reconnect_delay=5):
"""Consume infinite stream with auto-reconnect"""
while True:
try:
with requests.get(url, stream=True, timeout=30) as response:
response.raise_for_status()
for line in response.iter_lines():
if line:
event = json.loads(line)
process_event(event)
except requests.exceptions.RequestException as e:
print(f"Stream error: {e}")
print(f"Reconnecting in {reconnect_delay} seconds...")
time.sleep(reconnect_delay)
continue
except KeyboardInterrupt:
print("Stream stopped by user")
break
def process_event(event):
print(f"Event {event['id']}: {event['type']}")
# Handle event...
// Node.js with gzip compression
const express = require('express');
const zlib = require('zlib');
const app = express();
app.get('/api/compressed.jsonl', (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Content-Encoding', 'gzip');
// Create gzip stream
const gzip = zlib.createGzip();
gzip.pipe(res);
// Stream data through gzip
const interval = setInterval(() => {
const record = {
id: Math.random(),
data: 'x'.repeat(1000) // Large payload
};
gzip.write(JSON.stringify(record) + '\n');
}, 100);
// Cleanup on client disconnect
req.on('close', () => {
clearInterval(interval);
gzip.end();
});
// End after 10 seconds
setTimeout(() => {
clearInterval(interval);
gzip.end();
}, 10000);
});
// Client decompression
const zlib = require('zlib');
const https = require('https');
https.get('https://api.example.com/compressed.jsonl', (res) => {
const gunzip = zlib.createGunzip();
res.pipe(gunzip);
let buffer = '';
gunzip.on('data', (chunk) => {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop();
lines.forEach(line => {
if (line.trim()) {
const record = JSON.parse(line);
console.log(record);
}
});
});
gunzip.on('end', () => {
if (buffer.trim()) {
const record = JSON.parse(buffer);
console.log(record);
}
});
});
// Multi-table export with relationships
app.get('/api/export/full.jsonl', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Content-Disposition', 'attachment; filename="full_export.jsonl"');
const tables = ['users', 'posts', 'comments', 'likes'];
for (const table of tables) {
const rows = await db.query(`SELECT * FROM ${table}`);
for (const row of rows) {
const record = {
_table: table,
_exported_at: new Date().toISOString(),
...row
};
res.write(JSON.stringify(record) + '\n');
}
}
res.end();
});
// Export with progress tracking
app.get('/api/export/users.jsonl', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
const totalCount = await db.query('SELECT COUNT(*) FROM users');
let exported = 0;
// Send metadata as first line
res.write(JSON.stringify({
_metadata: true,
total_records: totalCount.rows[0].count,
started_at: new Date().toISOString()
}) + '\n');
const stream = db.query(new QueryStream('SELECT * FROM users'));
stream.on('data', (row) => {
res.write(JSON.stringify(row) + '\n');
exported++;
// Send progress updates every 1000 records
if (exported % 1000 === 0) {
res.write(JSON.stringify({
_progress: true,
exported,
percent: (exported / totalCount.rows[0].count * 100).toFixed(2)
}) + '\n');
}
});
stream.on('end', () => {
res.write(JSON.stringify({
_metadata: true,
completed_at: new Date().toISOString(),
total_exported: exported
}) + '\n');
res.end();
});
});
// Incremental exports (only changed records)
app.get('/api/export/incremental.jsonl', async (req, res) => {
const since = req.query.since; // ISO timestamp
res.setHeader('Content-Type', 'application/x-ndjson');
const query = `
SELECT *
FROM users
WHERE updated_at > $1
ORDER BY updated_at ASC
`;
const rows = await db.query(query, [since]);
for (const row of rows) {
res.write(JSON.stringify(row) + '\n');
}
res.end();
});
// Create async export job
app.post('/api/exports', async (req, res) => {
const { table, filters } = req.body;
const exportJob = {
id: uuidv4(),
table,
filters,
status: 'queued',
created_at: new Date(),
download_url: null
};
await db.query(
'INSERT INTO export_jobs (id, table, filters, status) VALUES ($1, $2, $3, $4)',
[exportJob.id, table, JSON.stringify(filters), 'queued']
);
// Queue background job
exportQueue.add('generate-export', exportJob);
res.status(202).json({
export_id: exportJob.id,
status: 'queued',
status_url: `/api/exports/${exportJob.id}`
});
});
// Check export status
app.get('/api/exports/:id', async (req, res) => {
const job = await db.query(
'SELECT * FROM export_jobs WHERE id = $1',
[req.params.id]
);
if (!job.rows.length) {
return res.status(404).json({ error: 'Export not found' });
}
const exportJob = job.rows[0];
res.json({
id: exportJob.id,
status: exportJob.status,
progress: exportJob.progress,
download_url: exportJob.download_url,
expires_at: exportJob.expires_at
});
});
// Background worker
exportQueue.process('generate-export', async (job) => {
const { id, table, filters } = job.data;
const filename = `export_${id}.jsonl.gz`;
const filepath = path.join('/tmp/exports', filename);
// Update status
await db.query(
'UPDATE export_jobs SET status = $1 WHERE id = $2',
['processing', id]
);
// Generate export
const writeStream = fs.createWriteStream(filepath);
const gzip = zlib.createGzip();
writeStream.pipe(gzip);
let query = `SELECT * FROM ${table}`;
if (filters) {
// Apply filters...
}
const rows = await db.query(query);
let processed = 0;
for (const row of rows) {
gzip.write(JSON.stringify(row) + '\n');
processed++;
if (processed % 1000 === 0) {
await db.query(
'UPDATE export_jobs SET progress = $1 WHERE id = $2',
[processed, id]
);
}
}
gzip.end();
// Upload to S3
const s3Url = await uploadToS3(filepath, filename);
// Update job with download URL
await db.query(`
UPDATE export_jobs
SET status = $1, download_url = $2, expires_at = $3
WHERE id = $4
`, ['completed', s3Url, new Date(Date.now() + 24 * 60 * 60 * 1000), id]);
// Cleanup local file
fs.unlinkSync(filepath);
});
Send multiple events in a single webhook call using JSONL for efficiency.
// Batch webhook delivery
class WebhookService {
constructor() {
this.queue = [];
this.batchSize = 100;
this.batchInterval = 5000; // 5 seconds
setInterval(() => this.flush(), this.batchInterval);
}
addEvent(webhookUrl, event) {
this.queue.push({ webhookUrl, event });
if (this.queue.length >= this.batchSize) {
this.flush();
}
}
async flush() {
if (this.queue.length === 0) return;
// Group events by webhook URL
const grouped = {};
for (const item of this.queue) {
if (!grouped[item.webhookUrl]) {
grouped[item.webhookUrl] = [];
}
grouped[item.webhookUrl].push(item.event);
}
this.queue = [];
// Send batches
for (const [url, events] of Object.entries(grouped)) {
await this.sendBatch(url, events);
}
}
async sendBatch(url, events) {
// Convert to JSONL
const payload = events.map(e => JSON.stringify(e)).join('\n');
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
'X-Webhook-Signature': this.generateSignature(payload),
'X-Event-Count': events.length.toString()
},
body: payload
});
if (!response.ok) {
throw new Error(`Webhook failed: ${response.status}`);
}
console.log(`Sent ${events.length} events to ${url}`);
} catch (error) {
console.error('Webhook delivery error:', error);
// Retry logic here...
}
}
generateSignature(payload) {
const crypto = require('crypto');
const secret = process.env.WEBHOOK_SECRET;
return crypto.createHmac('sha256', secret).update(payload).digest('hex');
}
}
// Usage
const webhookService = new WebhookService();
app.post('/api/orders', async (req, res) => {
const order = await createOrder(req.body);
// Queue webhook event
webhookService.addEvent(
'https://customer.com/webhooks/orders',
{
event_type: 'order.created',
order_id: order.id,
timestamp: new Date().toISOString(),
data: order
}
);
res.json(order);
});
// Webhook receiver
app.post('/webhooks/receive', (req, res) => {
const signature = req.headers['x-webhook-signature'];
const eventCount = parseInt(req.headers['x-event-count']);
// Verify signature
if (!verifySignature(req.body, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
// Parse JSONL
const lines = req.body.split('\n');
const events = lines.filter(line => line.trim()).map(line => JSON.parse(line));
console.log(`Received ${events.length} events`);
// Process each event
for (const event of events) {
processWebhookEvent(event);
}
res.status(200).json({ processed: events.length });
});
class ReliableWebhookService {
async sendWithRetry(url, events, maxRetries = 3) {
let attempt = 0;
let backoff = 1000; // Start with 1 second
while (attempt < maxRetries) {
try {
const payload = events.map(e => JSON.stringify(e)).join('\n');
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
'X-Attempt': (attempt + 1).toString()
},
body: payload,
timeout: 30000
});
if (response.ok) {
return { success: true, attempt: attempt + 1 };
}
// 4xx errors: don't retry
if (response.status >= 400 && response.status < 500) {
return { success: false, error: 'Client error', status: response.status };
}
// 5xx errors: retry
throw new Error(`Server error: ${response.status}`);
} catch (error) {
attempt++;
if (attempt >= maxRetries) {
return { success: false, error: error.message, attempts: attempt };
}
// Exponential backoff
await new Promise(resolve => setTimeout(resolve, backoff));
backoff *= 2;
}
}
}
}
// Cursor pagination with JSONL
app.get('/api/posts.jsonl', async (req, res) => {
const limit = parseInt(req.query.limit) || 100;
const cursor = req.query.cursor;
res.setHeader('Content-Type', 'application/x-ndjson');
let query = 'SELECT * FROM posts ORDER BY id ASC LIMIT $1';
let params = [limit + 1]; // Fetch one extra to determine if there's more
if (cursor) {
query = 'SELECT * FROM posts WHERE id > $1 ORDER BY id ASC LIMIT $2';
params = [cursor, limit + 1];
}
const rows = await db.query(query, params);
const hasMore = rows.length > limit;
const records = hasMore ? rows.slice(0, -1) : rows;
// Write metadata as first line
const metadata = {
_metadata: true,
count: records.length,
has_more: hasMore,
next_cursor: hasMore ? records[records.length - 1].id : null,
next_url: hasMore
? `/api/posts.jsonl?cursor=${records[records.length - 1].id}&limit=${limit}`
: null
};
res.write(JSON.stringify(metadata) + '\n');
// Write records
for (const record of records) {
res.write(JSON.stringify(record) + '\n');
}
res.end();
});
// Client that auto-fetches all pages
async function fetchAllPages(baseUrl) {
let url = baseUrl;
let allRecords = [];
while (url) {
const response = await fetch(url);
const text = await response.text();
const lines = text.trim().split('\n');
let metadata = null;
const records = [];
for (const line of lines) {
const data = JSON.parse(line);
if (data._metadata) {
metadata = data;
} else {
records.push(data);
}
}
allRecords = allRecords.concat(records);
console.log(`Fetched ${records.length} records, total: ${allRecords.length}`);
url = metadata.next_url;
}
return allRecords;
}
// Keyset pagination for better performance on large datasets
app.get('/api/events.jsonl', async (req, res) => {
const limit = 1000;
const lastId = req.query.last_id;
const lastTimestamp = req.query.last_timestamp;
res.setHeader('Content-Type', 'application/x-ndjson');
let query, params;
if (lastId && lastTimestamp) {
query = `
SELECT * FROM events
WHERE (timestamp, id) > ($1, $2)
ORDER BY timestamp ASC, id ASC
LIMIT $3
`;
params = [lastTimestamp, lastId, limit];
} else {
query = `
SELECT * FROM events
ORDER BY timestamp ASC, id ASC
LIMIT $1
`;
params = [limit];
}
const rows = await db.query(query, params);
if (rows.length > 0) {
const lastRow = rows[rows.length - 1];
const nextUrl = `/api/events.jsonl?last_id=${lastRow.id}&last_timestamp=${lastRow.timestamp}`;
res.write(JSON.stringify({
_metadata: true,
count: rows.length,
next_url: rows.length === limit ? nextUrl : null
}) + '\n');
}
for (const row of rows) {
res.write(JSON.stringify(row) + '\n');
}
res.end();
});
const { ApolloServer, gql, PubSub } = require('apollo-server-express');
const express = require('express');
const pubsub = new PubSub();
const typeDefs = gql`
type Message {
id: ID!
text: String!
user: String!
timestamp: String!
}
type Query {
messages: [Message!]!
}
type Mutation {
sendMessage(text: String!, user: String!): Message!
}
`;
const resolvers = {
Mutation: {
sendMessage: (_, { text, user }) => {
const message = {
id: Date.now().toString(),
text,
user,
timestamp: new Date().toISOString()
};
pubsub.publish('NEW_MESSAGE', { newMessage: message });
return message;
}
}
};
const app = express();
// JSONL subscription endpoint
app.get('/graphql/subscribe/messages.jsonl', (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
res.setHeader('Cache-Control', 'no-cache');
const subscriptionId = pubsub.asyncIterator(['NEW_MESSAGE']);
(async () => {
for await (const { newMessage } of subscriptionId) {
res.write(JSON.stringify(newMessage) + '\n');
}
})();
req.on('close', () => {
// Cleanup subscription
});
});
const rateLimit = require('express-rate-limit');
// Rate limit JSONL endpoints
const jsonlLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Limit each IP to 100 requests per window
message: JSON.stringify({
error: 'Too many requests',
retry_after: 900
}),
standardHeaders: true,
legacyHeaders: false,
});
app.get('/api/export.jsonl', jsonlLimiter, async (req, res) => {
// Stream export...
});
// Throttle stream output
class ThrottledStream {
constructor(recordsPerSecond = 1000) {
this.recordsPerSecond = recordsPerSecond;
this.interval = 1000 / recordsPerSecond;
this.lastSent = 0;
}
async write(res, data) {
const now = Date.now();
const elapsed = now - this.lastSent;
if (elapsed < this.interval) {
await new Promise(resolve => setTimeout(resolve, this.interval - elapsed));
}
res.write(data);
this.lastSent = Date.now();
}
}
app.get('/api/throttled.jsonl', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
const throttle = new ThrottledStream(100); // 100 records/sec
for (const record of await getRecords()) {
await throttle.write(res, JSON.stringify(record) + '\n');
}
res.end();
});