SDK Reference
Official JavaScript / TypeScript client for Enrich.sh. Zero dependencies, < 4 KB.
Installation
npm install enrich.shWorks in Node.js 18+, browsers, and edge runtimes (Deno, Bun, Workers).
Quick Start
import { Enrich } from 'enrich.sh'
const enrich = new Enrich('sk_live_your_api_key')
// Create a stream
await enrich.createStream('page_views', { schema_mode: 'evolve' })
// Buffer events — the SDK batches and flushes automatically
enrich.track('page_views', {
url: window.location.href,
user_id: 'user_123',
})
// Query your data as Parquet
const urls = await enrich.query('page_views', { days: 7 })Constructor
new Enrich(apiKey: string, options?: EnrichOptions)| Parameter | Type | Required | Description |
|---|---|---|---|
apiKey | string | ✅ | Your API key (sk_live_* or sk_test_*) |
options | EnrichOptions | ❌ | Configuration options |
EnrichOptions
| Option | Type | Default | Description |
|---|---|---|---|
baseUrl | string | https://enrich.sh | API base URL (use for custom domains) |
const enrich = new Enrich('sk_live_your_key')
// Custom domain
const enrich = new Enrich('sk_live_your_key', {
baseUrl: 'https://data.yourdomain.com',
})TIP
Batching, flush timing, and retries are managed automatically. No tuning needed.
Ingestion
track(streamId, event)
Buffer a single event for automatic batched delivery. Events are flushed in optimized batches — you don't need to manage timing.
track(streamId: string, event: Record<string, unknown>): voidenrich.track('events', {
event: 'page_view',
url: '/pricing',
user_id: 'user_123',
})INFO
A _ts field (Unix millisecond timestamp) is automatically added to each tracked event.
ingest(streamId, data)
Send events immediately without buffering. Use for server-side workflows.
ingest(streamId: string, data: object | object[]): Promise<IngestResponse>Returns: { accepted: number, buffered: number }
// Single event
await enrich.ingest('events', { event: 'signup', user_id: 'u_123' })
// Batch (recommended for throughput)
await enrich.ingest('events', [
{ event: 'page_view', url: '/home' },
{ event: 'page_view', url: '/pricing' },
])flush(streamId?)
Force-flush buffered events. The SDK auto-flushes — use this before shutdown.
await enrich.flush('events') // one stream
await enrich.flush() // all streamsbeacon(streamId?)
Best-effort flush that survives page unload / tab close. Fire-and-forget — no Promise returned.
window.addEventListener('beforeunload', () => enrich.beacon())INFO
Uses fetch({ keepalive: true }) with Authorization headers — your API key never appears in the URL.
destroy()
Flush all buffers and stop timers. Call before process exit.
process.on('SIGTERM', async () => {
await enrich.destroy()
process.exit(0)
})Streams
listStreams()
List all streams for your account.
const streams = await enrich.listStreams()
// → [{ stream_id: 'clicks', schema_mode: 'evolve', created_at: '...' }, ...]createStream(streamId, options?)
Create a new stream.
| Option | Type | Default | Description |
|---|---|---|---|
schema_mode | string | flex | flex, evolve, or strict |
fields | object | — | Field type definitions |
template | string | — | Enrichment template id |
dedicated_do | boolean | false | Use a dedicated node |
await enrich.createStream('purchases', {
schema_mode: 'strict',
fields: {
amount: { type: 'float64' },
currency: { type: 'string' },
user_id: { type: 'int64' },
ts: { name: 'timestamp', type: 'timestamp' },
},
})getStream(streamId)
Get a stream by ID.
const stream = await enrich.getStream('purchases')updateStream(streamId, updates)
Update stream configuration.
await enrich.updateStream('purchases', {
schema_mode: 'evolve',
webhook_url: 'https://your-api.com/webhook',
})deleteStream(streamId)
Delete a stream. Data already stored is not deleted.
await enrich.deleteStream('old-stream')Query
query(streamId, params?)
Get presigned URLs for stored Parquet files. Pass directly to DuckDB.
| Parameter | Type | Description |
|---|---|---|
date | string | Single day — YYYY-MM-DD |
start | string | Range start — YYYY-MM-DD |
end | string | Range end — YYYY-MM-DD |
days | number | Last N days |
const urls = await enrich.query('events', { days: 7 })
// Use with DuckDB
await conn.query(`SELECT * FROM read_parquet(${JSON.stringify(urls)})`)queryDetailed(streamId, params?)
Same as query() but returns full metadata.
interface QueryDetailedResponse {
success: boolean
stream_id: string
file_count: number
urls: string[]
files: FileInfo[]
expires_at: string
}const result = await enrich.queryDetailed('events', { days: 7 })
console.log(`${result.file_count} files, expires ${result.expires_at}`)Monitoring
usage(params?)
Get usage statistics.
const stats = await enrich.usage({ start: '2026-02-01', end: '2026-02-28' })
// → { usage: [...], totals: { event_count, bytes_stored, file_count } }errors(params?)
Get recent errors.
const errs = await enrich.errors({ limit: 20, stream_id: 'purchases' })
// → { errors_24h, by_type, recent: [...] }dlq(streamId, params?)
Get Dead Letter Queue events (rejected by strict mode).
const rejected = await enrich.dlq('purchases', { days: 7 })
// → { dlq_count, events: [{ rejected_at, reason, field, original }] }schemaEvents(streamId?, params?)
Get schema change events (detected by evolve mode).
const changes = await enrich.schemaEvents('purchases')
// → { events: [{ type: 'new_field', field: 'discount_code', ... }] }templates()
List available enrichment templates.
const templates = await enrich.templates()
// → [{ id: 'clickstream', name: 'clickstream', fields: [...] }, ...]Connect
connect()
Get S3-compatible credentials for direct warehouse access. Returns endpoint, bucket, keys, and ready-to-use SQL.
const creds = await enrich.connect()
console.log(creds.sql.duckdb) // Ready-to-paste DuckDB SQLinterface ConnectResponse {
endpoint: string
bucket: string
access_key_id: string
secret_access_key: string
region: string
sql: { duckdb: string, clickhouse: string }
}Examples
Browser — Clickstream
import { Enrich } from 'enrich.sh'
const enrich = new Enrich('sk_live_your_key')
document.addEventListener('click', (e) => {
enrich.track('clicks', {
tag: e.target.tagName,
id: e.target.id,
path: location.pathname,
})
})
window.addEventListener('beforeunload', () => enrich.beacon())Node.js — Express Middleware
import { Enrich } from 'enrich.sh'
const enrich = new Enrich('sk_live_your_key')
app.use((req, res, next) => {
const start = Date.now()
res.on('finish', () => {
enrich.track('api_logs', {
method: req.method,
path: req.path,
status: res.statusCode,
duration_ms: Date.now() - start,
})
})
next()
})
process.on('SIGTERM', async () => {
await enrich.destroy()
process.exit(0)
})Full Pipeline: Create → Ingest → Query
import { Enrich } from 'enrich.sh'
const enrich = new Enrich('sk_live_your_key')
// 1. Create a stream with types
await enrich.createStream('transactions', {
schema_mode: 'evolve',
fields: {
amount: { type: 'float64' },
currency: { type: 'string' },
ts: { type: 'timestamp' },
},
})
// 2. Send events
await enrich.ingest('transactions', [
{ amount: 99.99, currency: 'USD', ts: Date.now() },
{ amount: 49.00, currency: 'EUR', ts: Date.now() },
])
// 3. Query with DuckDB
const urls = await enrich.query('transactions', { days: 30 })
const result = await conn.query(`
SELECT currency, SUM(amount) as total
FROM read_parquet(${JSON.stringify(urls)})
GROUP BY currency
`)Batch Import (CSV → Enrich)
import { Enrich } from 'enrich.sh'
import { parse } from 'csv-parse/sync'
import { readFileSync } from 'fs'
const enrich = new Enrich('sk_live_your_key')
const records = parse(readFileSync('data.csv'), { columns: true })
// Send in batches of 100
for (let i = 0; i < records.length; i += 100) {
await enrich.ingest('imports', records.slice(i, i + 100))
}Specs
| Spec | Value |
|---|---|
| Package size | < 4 KB |
| Dependencies | 0 |
| Runtime | Node 18+, Browsers, Edge |
| Transport | HTTPS |
| Auth | Bearer token (sk_live_* / sk_test_*) |
| Output format | Apache Parquet |
| Max request size | 1 MB |
| Signed URL TTL | 24 hours |
