JSONL for API Development

Build high-performance REST APIs, streaming endpoints, and webhooks with JSONL - the format that scales from startup to enterprise

Why JSONL for APIs?

Performance Advantages

JSONL enables streaming responses that start delivering data immediately, reducing time-to-first-byte and improving perceived performance. Instead of buffering entire responses, clients can process records as they arrive.

  • Stream large datasets without memory limits
  • Start processing before download completes
  • Lower memory footprint on server and client
  • Graceful handling of network interruptions
  • Resume from last successfully received record

Industry Use Cases

Leading API providers use JSONL for bulk exports and streaming endpoints:

  • Stripe: Export transactions and customer data
  • GitHub: Repository event streams
  • Twitter/X: Tweet stream API
  • Slack: Message history exports
  • PostgreSQL: pg_dump JSONL output

Jump to Topic

REST APIs with JSONL

Node.js Express API

Stream database results as JSONL for efficient data delivery without loading everything into memory.

const express = require('express');
const { Pool } = require('pg');
const app = express();

const pool = new Pool({
  host: 'localhost',
  database: 'myapp',
  user: 'postgres',
  password: 'secret'
});

// JSONL endpoint with streaming
app.get('/api/users.jsonl', async (req, res) => {
  // Set headers for JSONL
  res.setHeader('Content-Type', 'application/x-ndjson');
  res.setHeader('Content-Disposition', 'attachment; filename="users.jsonl"');

  try {
    // Query with cursor for streaming
    const client = await pool.connect();
    const query = 'SELECT * FROM users ORDER BY created_at DESC';
    const cursor = client.query(new Cursor(query));

    // Stream rows as JSONL
    const readRows = async () => {
      const rows = await cursor.read(100); // Batch of 100

      if (rows.length === 0) {
        res.end();
        cursor.close();
        client.release();
        return;
      }

      // Write each row as JSONL
      for (const row of rows) {
        res.write(JSON.stringify(row) + '\n');
      }

      // Continue streaming
      readRows();
    };

    await readRows();

  } catch (error) {
    console.error('Stream error:', error);
    res.status(500).end();
  }
});

// Alternative: Stream from array/generator
app.get('/api/events.jsonl', async (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');

  const eventStream = getEventsStream(); // Returns async generator

  for await (const event of eventStream) {
    if (!res.write(JSON.stringify(event) + '\n')) {
      // Client disconnected or buffer full
      await new Promise(resolve => res.once('drain', resolve));
    }
  }

  res.end();
});

// Transform stream pipeline
const { Transform } = require('stream');

class JSONLTransform extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk, encoding, callback) {
    try {
      this.push(JSON.stringify(chunk) + '\n');
      callback();
    } catch (error) {
      callback(error);
    }
  }
}

app.get('/api/products.jsonl', async (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');

  const dbStream = pool.query(new QueryStream('SELECT * FROM products'));
  const jsonlStream = new JSONLTransform();

  dbStream
    .pipe(jsonlStream)
    .pipe(res)
    .on('error', (error) => {
      console.error('Pipeline error:', error);
      res.status(500).end();
    });
});

app.listen(3000, () => {
  console.log('API server running on port 3000');
});

Python FastAPI

Use StreamingResponse for memory-efficient JSONL endpoints in FastAPI.

from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse
import asyncpg
import json
from typing import AsyncGenerator

app = FastAPI()

# Database connection pool
DATABASE_URL = "postgresql://user:password@localhost/myapp"
pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(DATABASE_URL)

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

async def generate_users_jsonl() -> AsyncGenerator[str, None]:
    """Stream users as JSONL"""
    async with pool.acquire() as conn:
        # Use cursor for streaming large datasets
        async with conn.transaction():
            cursor = await conn.cursor('SELECT * FROM users ORDER BY id')

            async for record in cursor:
                user = dict(record)
                yield json.dumps(user) + '\n'

@app.get("/api/users.jsonl")
async def get_users_jsonl():
    """Stream all users as JSONL"""
    return StreamingResponse(
        generate_users_jsonl(),
        media_type="application/x-ndjson",
        headers={
            "Content-Disposition": "attachment; filename=users.jsonl"
        }
    )

@app.get("/api/search.jsonl")
async def search_jsonl(
    q: str = Query(..., description="Search query"),
    limit: int = Query(1000, le=10000)
):
    """Search and stream results as JSONL"""

    async def generate_results():
        async with pool.acquire() as conn:
            query = """
                SELECT * FROM products
                WHERE name ILIKE $1 OR description ILIKE $1
                LIMIT $2
            """
            async for record in conn.cursor(query, f'%{q}%', limit):
                yield json.dumps(dict(record)) + '\n'

    return StreamingResponse(
        generate_results(),
        media_type="application/x-ndjson"
    )

# Generator with filtering and transformation
@app.get("/api/analytics.jsonl")
async def analytics_jsonl(
    start_date: str,
    end_date: str,
    metric: str = "all"
):
    """Stream analytics data with filtering"""

    async def generate_analytics():
        async with pool.acquire() as conn:
            query = """
                SELECT
                    date,
                    metric_name,
                    metric_value,
                    metadata
                FROM analytics
                WHERE date BETWEEN $1 AND $2
                ORDER BY date
            """

            async for record in conn.cursor(query, start_date, end_date):
                data = dict(record)

                # Filter by metric if specified
                if metric != "all" and data['metric_name'] != metric:
                    continue

                # Transform metadata JSON
                if data['metadata']:
                    data['metadata'] = json.loads(data['metadata'])

                yield json.dumps(data) + '\n'

    return StreamingResponse(
        generate_analytics(),
        media_type="application/x-ndjson",
        headers={
            "Content-Disposition": f"attachment; filename=analytics_{start_date}_{end_date}.jsonl"
        }
    )

# Error handling in streams
@app.get("/api/export.jsonl")
async def export_jsonl():
    """Export with error handling"""

    async def generate_with_errors():
        try:
            async with pool.acquire() as conn:
                async for record in conn.cursor('SELECT * FROM large_table'):
                    try:
                        yield json.dumps(dict(record)) + '\n'
                    except Exception as e:
                        # Log error but continue stream
                        error_record = {
                            "error": str(e),
                            "record_id": record.get('id')
                        }
                        yield json.dumps(error_record) + '\n'

        except Exception as e:
            # Stream-level error
            error = {"fatal_error": str(e)}
            yield json.dumps(error) + '\n'

    return StreamingResponse(
        generate_with_errors(),
        media_type="application/x-ndjson"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Go HTTP API

High-performance streaming with Go's built-in HTTP server and channels.

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    _ "github.com/lib/pq"
)

type User struct {
    ID        int       `json:"id"`
    Email     string    `json:"email"`
    Name      string    `json:"name"`
    CreatedAt time.Time `json:"created_at"`
}

var db *sql.DB

func init() {
    var err error
    db, err = sql.Open("postgres",
        "host=localhost user=postgres password=secret dbname=myapp sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
}

// Stream users as JSONL
func usersJSONLHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/x-ndjson")
    w.Header().Set("Content-Disposition", "attachment; filename=users.jsonl")

    // Query with rows streaming
    rows, err := db.Query("SELECT id, email, name, created_at FROM users ORDER BY id")
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    defer rows.Close()

    encoder := json.NewEncoder(w)

    for rows.Next() {
        var user User
        err := rows.Scan(&user.ID, &user.Email, &user.Name, &user.CreatedAt)
        if err != nil {
            log.Printf("Scan error: %v", err)
            continue
        }

        // Encode directly to response writer
        if err := encoder.Encode(user); err != nil {
            log.Printf("Encode error: %v", err)
            return
        }

        // Flush to send data immediately
        if flusher, ok := w.(http.Flusher); ok {
            flusher.Flush()
        }
    }

    if err := rows.Err(); err != nil {
        log.Printf("Rows iteration error: %v", err)
    }
}

// Stream with cancellation support
func streamWithContext(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/x-ndjson")

    // Create channel for results
    results := make(chan interface{}, 100)
    errors := make(chan error, 1)

    // Start background goroutine to fetch data
    go func() {
        defer close(results)
        defer close(errors)

        rows, err := db.QueryContext(r.Context(), "SELECT * FROM large_table")
        if err != nil {
            errors <- err
            return
        }
        defer rows.Close()

        for rows.Next() {
            var record map[string]interface{}
            // Scan into map...

            select {
            case results <- record:
            case <-r.Context().Done():
                // Client disconnected
                return
            }
        }

        if err := rows.Err(); err != nil {
            errors <- err
        }
    }()

    // Stream results to client
    encoder := json.NewEncoder(w)

    for {
        select {
        case record, ok := <-results:
            if !ok {
                return // Channel closed, done
            }

            if err := encoder.Encode(record); err != nil {
                log.Printf("Encode error: %v", err)
                return
            }

            if flusher, ok := w.(http.Flusher); ok {
                flusher.Flush()
            }

        case err := <-errors:
            if err != nil {
                log.Printf("Query error: %v", err)
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
            }

        case <-r.Context().Done():
            log.Println("Client disconnected")
            return
        }
    }
}

// Batch streaming for better performance
func batchStreamHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/x-ndjson")

    const batchSize = 1000

    rows, err := db.Query("SELECT * FROM events ORDER BY timestamp")
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    defer rows.Close()

    encoder := json.NewEncoder(w)
    count := 0

    for rows.Next() {
        var event map[string]interface{}
        // Scan event...

        if err := encoder.Encode(event); err != nil {
            log.Printf("Encode error: %v", err)
            return
        }

        count++

        // Flush every N records
        if count%batchSize == 0 {
            if flusher, ok := w.(http.Flusher); ok {
                flusher.Flush()
            }
        }
    }

    // Final flush
    if flusher, ok := w.(http.Flusher); ok {
        flusher.Flush()
    }
}

func main() {
    http.HandleFunc("/api/users.jsonl", usersJSONLHandler)
    http.HandleFunc("/api/stream.jsonl", streamWithContext)
    http.HandleFunc("/api/batch.jsonl", batchStreamHandler)

    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Ruby on Rails API

class Api::UsersController < ApplicationController
  # Stream users as JSONL
  def index_jsonl
    headers['Content-Type'] = 'application/x-ndjson'
    headers['Content-Disposition'] = 'attachment; filename="users.jsonl"'

    # Disable layout and template rendering
    self.response_body = Enumerator.new do |yielder|
      User.find_each(batch_size: 1000) do |user|
        yielder << user.to_json + "\n"
      end
    end
  end

  # Stream with filtering
  def search_jsonl
    query = params[:q]
    headers['Content-Type'] = 'application/x-ndjson'

    self.response_body = Enumerator.new do |yielder|
      User.where("name ILIKE ?", "%#{query}%").find_each do |user|
        yielder << user.to_json + "\n"
      end
    end
  end

  # Export with associations
  def export_jsonl
    headers['Content-Type'] = 'application/x-ndjson'

    self.response_body = Enumerator.new do |yielder|
      User.includes(:posts, :comments).find_each do |user|
        record = {
          id: user.id,
          name: user.name,
          email: user.email,
          posts_count: user.posts.count,
          comments_count: user.comments.count,
          recent_posts: user.posts.limit(5).map(&:attributes)
        }
        yielder << record.to_json + "\n"
      end
    end
  end
end

# routes.rb
Rails.application.routes.draw do
  namespace :api do
    get 'users.jsonl', to: 'users#index_jsonl'
    get 'search.jsonl', to: 'users#search_jsonl'
    get 'export.jsonl', to: 'users#export_jsonl'
  end
end

Streaming Response Patterns

Chunked Transfer Encoding

Use HTTP chunked encoding to stream JSONL without knowing content length upfront.

// Node.js chunked streaming
app.get('/api/live-stream.jsonl', (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');
  res.setHeader('Transfer-Encoding', 'chunked');
  res.setHeader('X-Content-Type-Options', 'nosniff');

  let count = 0;
  const maxRecords = 10000;

  const interval = setInterval(() => {
    const record = {
      id: count,
      timestamp: new Date().toISOString(),
      value: Math.random() * 100
    };

    res.write(JSON.stringify(record) + '\n');
    count++;

    if (count >= maxRecords) {
      clearInterval(interval);
      res.end();
    }
  }, 10); // Send record every 10ms
});

// Client consumption
async function consumeStream(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();

    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');

    // Process complete lines
    buffer = lines.pop(); // Keep incomplete line in buffer

    for (const line of lines) {
      if (line.trim()) {
        const record = JSON.parse(line);
        processRecord(record);
      }
    }
  }

  // Process final buffer
  if (buffer.trim()) {
    const record = JSON.parse(buffer);
    processRecord(record);
  }
}

function processRecord(record) {
  console.log('Received:', record);
  // Update UI, store in database, etc.
}

Infinite Streams

// Python: Infinite event stream
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from datetime import datetime

app = FastAPI()

async def event_generator():
    """Generate events indefinitely"""
    event_id = 0

    while True:
        event = {
            "id": event_id,
            "type": "heartbeat",
            "timestamp": datetime.utcnow().isoformat(),
            "data": {
                "cpu_usage": random.random() * 100,
                "memory_usage": random.random() * 100
            }
        }

        yield json.dumps(event) + '\n'
        event_id += 1

        await asyncio.sleep(1)  # Wait 1 second between events

@app.get("/api/events/stream.jsonl")
async def event_stream():
    """Infinite JSONL event stream"""
    return StreamingResponse(
        event_generator(),
        media_type="application/x-ndjson",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"  # Disable nginx buffering
        }
    )

# Client with reconnection
import requests
import json
import time

def consume_infinite_stream(url, reconnect_delay=5):
    """Consume infinite stream with auto-reconnect"""

    while True:
        try:
            with requests.get(url, stream=True, timeout=30) as response:
                response.raise_for_status()

                for line in response.iter_lines():
                    if line:
                        event = json.loads(line)
                        process_event(event)

        except requests.exceptions.RequestException as e:
            print(f"Stream error: {e}")
            print(f"Reconnecting in {reconnect_delay} seconds...")
            time.sleep(reconnect_delay)
            continue

        except KeyboardInterrupt:
            print("Stream stopped by user")
            break

def process_event(event):
    print(f"Event {event['id']}: {event['type']}")
    # Handle event...

Compressed Streaming

// Node.js with gzip compression
const express = require('express');
const zlib = require('zlib');
const app = express();

app.get('/api/compressed.jsonl', (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');
  res.setHeader('Content-Encoding', 'gzip');

  // Create gzip stream
  const gzip = zlib.createGzip();
  gzip.pipe(res);

  // Stream data through gzip
  const interval = setInterval(() => {
    const record = {
      id: Math.random(),
      data: 'x'.repeat(1000) // Large payload
    };

    gzip.write(JSON.stringify(record) + '\n');
  }, 100);

  // Cleanup on client disconnect
  req.on('close', () => {
    clearInterval(interval);
    gzip.end();
  });

  // End after 10 seconds
  setTimeout(() => {
    clearInterval(interval);
    gzip.end();
  }, 10000);
});

// Client decompression
const zlib = require('zlib');
const https = require('https');

https.get('https://api.example.com/compressed.jsonl', (res) => {
  const gunzip = zlib.createGunzip();
  res.pipe(gunzip);

  let buffer = '';

  gunzip.on('data', (chunk) => {
    buffer += chunk.toString();
    const lines = buffer.split('\n');
    buffer = lines.pop();

    lines.forEach(line => {
      if (line.trim()) {
        const record = JSON.parse(line);
        console.log(record);
      }
    });
  });

  gunzip.on('end', () => {
    if (buffer.trim()) {
      const record = JSON.parse(buffer);
      console.log(record);
    }
  });
});

Bulk Data Exports

Database Export Endpoints

// Multi-table export with relationships
app.get('/api/export/full.jsonl', async (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');
  res.setHeader('Content-Disposition', 'attachment; filename="full_export.jsonl"');

  const tables = ['users', 'posts', 'comments', 'likes'];

  for (const table of tables) {
    const rows = await db.query(`SELECT * FROM ${table}`);

    for (const row of rows) {
      const record = {
        _table: table,
        _exported_at: new Date().toISOString(),
        ...row
      };
      res.write(JSON.stringify(record) + '\n');
    }
  }

  res.end();
});

// Export with progress tracking
app.get('/api/export/users.jsonl', async (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');

  const totalCount = await db.query('SELECT COUNT(*) FROM users');
  let exported = 0;

  // Send metadata as first line
  res.write(JSON.stringify({
    _metadata: true,
    total_records: totalCount.rows[0].count,
    started_at: new Date().toISOString()
  }) + '\n');

  const stream = db.query(new QueryStream('SELECT * FROM users'));

  stream.on('data', (row) => {
    res.write(JSON.stringify(row) + '\n');
    exported++;

    // Send progress updates every 1000 records
    if (exported % 1000 === 0) {
      res.write(JSON.stringify({
        _progress: true,
        exported,
        percent: (exported / totalCount.rows[0].count * 100).toFixed(2)
      }) + '\n');
    }
  });

  stream.on('end', () => {
    res.write(JSON.stringify({
      _metadata: true,
      completed_at: new Date().toISOString(),
      total_exported: exported
    }) + '\n');
    res.end();
  });
});

// Incremental exports (only changed records)
app.get('/api/export/incremental.jsonl', async (req, res) => {
  const since = req.query.since; // ISO timestamp

  res.setHeader('Content-Type', 'application/x-ndjson');

  const query = `
    SELECT *
    FROM users
    WHERE updated_at > $1
    ORDER BY updated_at ASC
  `;

  const rows = await db.query(query, [since]);

  for (const row of rows) {
    res.write(JSON.stringify(row) + '\n');
  }

  res.end();
});

Async Export Jobs

// Create async export job
app.post('/api/exports', async (req, res) => {
  const { table, filters } = req.body;

  const exportJob = {
    id: uuidv4(),
    table,
    filters,
    status: 'queued',
    created_at: new Date(),
    download_url: null
  };

  await db.query(
    'INSERT INTO export_jobs (id, table, filters, status) VALUES ($1, $2, $3, $4)',
    [exportJob.id, table, JSON.stringify(filters), 'queued']
  );

  // Queue background job
  exportQueue.add('generate-export', exportJob);

  res.status(202).json({
    export_id: exportJob.id,
    status: 'queued',
    status_url: `/api/exports/${exportJob.id}`
  });
});

// Check export status
app.get('/api/exports/:id', async (req, res) => {
  const job = await db.query(
    'SELECT * FROM export_jobs WHERE id = $1',
    [req.params.id]
  );

  if (!job.rows.length) {
    return res.status(404).json({ error: 'Export not found' });
  }

  const exportJob = job.rows[0];

  res.json({
    id: exportJob.id,
    status: exportJob.status,
    progress: exportJob.progress,
    download_url: exportJob.download_url,
    expires_at: exportJob.expires_at
  });
});

// Background worker
exportQueue.process('generate-export', async (job) => {
  const { id, table, filters } = job.data;

  const filename = `export_${id}.jsonl.gz`;
  const filepath = path.join('/tmp/exports', filename);

  // Update status
  await db.query(
    'UPDATE export_jobs SET status = $1 WHERE id = $2',
    ['processing', id]
  );

  // Generate export
  const writeStream = fs.createWriteStream(filepath);
  const gzip = zlib.createGzip();
  writeStream.pipe(gzip);

  let query = `SELECT * FROM ${table}`;
  if (filters) {
    // Apply filters...
  }

  const rows = await db.query(query);
  let processed = 0;

  for (const row of rows) {
    gzip.write(JSON.stringify(row) + '\n');
    processed++;

    if (processed % 1000 === 0) {
      await db.query(
        'UPDATE export_jobs SET progress = $1 WHERE id = $2',
        [processed, id]
      );
    }
  }

  gzip.end();

  // Upload to S3
  const s3Url = await uploadToS3(filepath, filename);

  // Update job with download URL
  await db.query(`
    UPDATE export_jobs
    SET status = $1, download_url = $2, expires_at = $3
    WHERE id = $4
  `, ['completed', s3Url, new Date(Date.now() + 24 * 60 * 60 * 1000), id]);

  // Cleanup local file
  fs.unlinkSync(filepath);
});

Webhook Payloads

JSONL Webhook Format

Send multiple events in a single webhook call using JSONL for efficiency.

// Batch webhook delivery
class WebhookService {
  constructor() {
    this.queue = [];
    this.batchSize = 100;
    this.batchInterval = 5000; // 5 seconds

    setInterval(() => this.flush(), this.batchInterval);
  }

  addEvent(webhookUrl, event) {
    this.queue.push({ webhookUrl, event });

    if (this.queue.length >= this.batchSize) {
      this.flush();
    }
  }

  async flush() {
    if (this.queue.length === 0) return;

    // Group events by webhook URL
    const grouped = {};

    for (const item of this.queue) {
      if (!grouped[item.webhookUrl]) {
        grouped[item.webhookUrl] = [];
      }
      grouped[item.webhookUrl].push(item.event);
    }

    this.queue = [];

    // Send batches
    for (const [url, events] of Object.entries(grouped)) {
      await this.sendBatch(url, events);
    }
  }

  async sendBatch(url, events) {
    // Convert to JSONL
    const payload = events.map(e => JSON.stringify(e)).join('\n');

    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/x-ndjson',
          'X-Webhook-Signature': this.generateSignature(payload),
          'X-Event-Count': events.length.toString()
        },
        body: payload
      });

      if (!response.ok) {
        throw new Error(`Webhook failed: ${response.status}`);
      }

      console.log(`Sent ${events.length} events to ${url}`);

    } catch (error) {
      console.error('Webhook delivery error:', error);
      // Retry logic here...
    }
  }

  generateSignature(payload) {
    const crypto = require('crypto');
    const secret = process.env.WEBHOOK_SECRET;
    return crypto.createHmac('sha256', secret).update(payload).digest('hex');
  }
}

// Usage
const webhookService = new WebhookService();

app.post('/api/orders', async (req, res) => {
  const order = await createOrder(req.body);

  // Queue webhook event
  webhookService.addEvent(
    'https://customer.com/webhooks/orders',
    {
      event_type: 'order.created',
      order_id: order.id,
      timestamp: new Date().toISOString(),
      data: order
    }
  );

  res.json(order);
});

// Webhook receiver
app.post('/webhooks/receive', (req, res) => {
  const signature = req.headers['x-webhook-signature'];
  const eventCount = parseInt(req.headers['x-event-count']);

  // Verify signature
  if (!verifySignature(req.body, signature)) {
    return res.status(401).json({ error: 'Invalid signature' });
  }

  // Parse JSONL
  const lines = req.body.split('\n');
  const events = lines.filter(line => line.trim()).map(line => JSON.parse(line));

  console.log(`Received ${events.length} events`);

  // Process each event
  for (const event of events) {
    processWebhookEvent(event);
  }

  res.status(200).json({ processed: events.length });
});

Retry Logic

class ReliableWebhookService {
  async sendWithRetry(url, events, maxRetries = 3) {
    let attempt = 0;
    let backoff = 1000; // Start with 1 second

    while (attempt < maxRetries) {
      try {
        const payload = events.map(e => JSON.stringify(e)).join('\n');

        const response = await fetch(url, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/x-ndjson',
            'X-Attempt': (attempt + 1).toString()
          },
          body: payload,
          timeout: 30000
        });

        if (response.ok) {
          return { success: true, attempt: attempt + 1 };
        }

        // 4xx errors: don't retry
        if (response.status >= 400 && response.status < 500) {
          return { success: false, error: 'Client error', status: response.status };
        }

        // 5xx errors: retry
        throw new Error(`Server error: ${response.status}`);

      } catch (error) {
        attempt++;
        if (attempt >= maxRetries) {
          return { success: false, error: error.message, attempts: attempt };
        }

        // Exponential backoff
        await new Promise(resolve => setTimeout(resolve, backoff));
        backoff *= 2;
      }
    }
  }
}

Pagination Strategies

Cursor-Based Pagination

// Cursor pagination with JSONL
app.get('/api/posts.jsonl', async (req, res) => {
  const limit = parseInt(req.query.limit) || 100;
  const cursor = req.query.cursor;

  res.setHeader('Content-Type', 'application/x-ndjson');

  let query = 'SELECT * FROM posts ORDER BY id ASC LIMIT $1';
  let params = [limit + 1]; // Fetch one extra to determine if there's more

  if (cursor) {
    query = 'SELECT * FROM posts WHERE id > $1 ORDER BY id ASC LIMIT $2';
    params = [cursor, limit + 1];
  }

  const rows = await db.query(query, params);
  const hasMore = rows.length > limit;
  const records = hasMore ? rows.slice(0, -1) : rows;

  // Write metadata as first line
  const metadata = {
    _metadata: true,
    count: records.length,
    has_more: hasMore,
    next_cursor: hasMore ? records[records.length - 1].id : null,
    next_url: hasMore
      ? `/api/posts.jsonl?cursor=${records[records.length - 1].id}&limit=${limit}`
      : null
  };

  res.write(JSON.stringify(metadata) + '\n');

  // Write records
  for (const record of records) {
    res.write(JSON.stringify(record) + '\n');
  }

  res.end();
});

// Client that auto-fetches all pages
async function fetchAllPages(baseUrl) {
  let url = baseUrl;
  let allRecords = [];

  while (url) {
    const response = await fetch(url);
    const text = await response.text();
    const lines = text.trim().split('\n');

    let metadata = null;
    const records = [];

    for (const line of lines) {
      const data = JSON.parse(line);
      if (data._metadata) {
        metadata = data;
      } else {
        records.push(data);
      }
    }

    allRecords = allRecords.concat(records);
    console.log(`Fetched ${records.length} records, total: ${allRecords.length}`);

    url = metadata.next_url;
  }

  return allRecords;
}

Keyset Pagination

// Keyset pagination for better performance on large datasets
app.get('/api/events.jsonl', async (req, res) => {
  const limit = 1000;
  const lastId = req.query.last_id;
  const lastTimestamp = req.query.last_timestamp;

  res.setHeader('Content-Type', 'application/x-ndjson');

  let query, params;

  if (lastId && lastTimestamp) {
    query = `
      SELECT * FROM events
      WHERE (timestamp, id) > ($1, $2)
      ORDER BY timestamp ASC, id ASC
      LIMIT $3
    `;
    params = [lastTimestamp, lastId, limit];
  } else {
    query = `
      SELECT * FROM events
      ORDER BY timestamp ASC, id ASC
      LIMIT $1
    `;
    params = [limit];
  }

  const rows = await db.query(query, params);

  if (rows.length > 0) {
    const lastRow = rows[rows.length - 1];
    const nextUrl = `/api/events.jsonl?last_id=${lastRow.id}&last_timestamp=${lastRow.timestamp}`;

    res.write(JSON.stringify({
      _metadata: true,
      count: rows.length,
      next_url: rows.length === limit ? nextUrl : null
    }) + '\n');
  }

  for (const row of rows) {
    res.write(JSON.stringify(row) + '\n');
  }

  res.end();
});

GraphQL with JSONL

GraphQL Subscriptions as JSONL

const { ApolloServer, gql, PubSub } = require('apollo-server-express');
const express = require('express');

const pubsub = new PubSub();

const typeDefs = gql`
  type Message {
    id: ID!
    text: String!
    user: String!
    timestamp: String!
  }

  type Query {
    messages: [Message!]!
  }

  type Mutation {
    sendMessage(text: String!, user: String!): Message!
  }
`;

const resolvers = {
  Mutation: {
    sendMessage: (_, { text, user }) => {
      const message = {
        id: Date.now().toString(),
        text,
        user,
        timestamp: new Date().toISOString()
      };

      pubsub.publish('NEW_MESSAGE', { newMessage: message });
      return message;
    }
  }
};

const app = express();

// JSONL subscription endpoint
app.get('/graphql/subscribe/messages.jsonl', (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');
  res.setHeader('Cache-Control', 'no-cache');

  const subscriptionId = pubsub.asyncIterator(['NEW_MESSAGE']);

  (async () => {
    for await (const { newMessage } of subscriptionId) {
      res.write(JSON.stringify(newMessage) + '\n');
    }
  })();

  req.on('close', () => {
    // Cleanup subscription
  });
});

Rate Limiting & Throttling

Stream Rate Limiting

const rateLimit = require('express-rate-limit');

// Rate limit JSONL endpoints
const jsonlLimiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 minutes
  max: 100, // Limit each IP to 100 requests per window
  message: JSON.stringify({
    error: 'Too many requests',
    retry_after: 900
  }),
  standardHeaders: true,
  legacyHeaders: false,
});

app.get('/api/export.jsonl', jsonlLimiter, async (req, res) => {
  // Stream export...
});

// Throttle stream output
class ThrottledStream {
  constructor(recordsPerSecond = 1000) {
    this.recordsPerSecond = recordsPerSecond;
    this.interval = 1000 / recordsPerSecond;
    this.lastSent = 0;
  }

  async write(res, data) {
    const now = Date.now();
    const elapsed = now - this.lastSent;

    if (elapsed < this.interval) {
      await new Promise(resolve => setTimeout(resolve, this.interval - elapsed));
    }

    res.write(data);
    this.lastSent = Date.now();
  }
}

app.get('/api/throttled.jsonl', async (req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');

  const throttle = new ThrottledStream(100); // 100 records/sec

  for (const record of await getRecords()) {
    await throttle.write(res, JSON.stringify(record) + '\n');
  }

  res.end();
});

Best Practices

API Design

  • Use .jsonl file extension for clarity
  • Set Content-Type: application/x-ndjson header
  • Include metadata in first line for pagination info
  • Support compression (gzip) for large exports
  • Document expected response format in API docs

Performance

  • Stream from database without buffering in memory
  • Use database cursors for large result sets
  • Flush response buffer periodically
  • Implement backpressure handling
  • Monitor memory usage during streaming

Error Handling

  • Handle client disconnections gracefully
  • Log errors but continue stream when possible
  • Include error records in stream with _error flag
  • Set appropriate timeouts
  • Implement retry logic for failed webhooks

Security

  • Require authentication for all endpoints
  • Implement rate limiting to prevent abuse
  • Sign webhook payloads with HMAC
  • Validate and sanitize query parameters
  • Use HTTPS for all API endpoints

Related Resources