Streams Configuration
A stream defines how your data is stored, structured, and optionally enriched. This guide covers everything you need to know about creating and configuring streams.
What is a Stream?
A stream tells Enrich.sh:
- Where to store your data (path in R2 storage)
- How to structure your data (field definitions + schema mode)
- What enrichments to apply (templates)
- What to do when data doesn't match (reject, evolve, or flex)
You can create multiple streams per account for different data types (e.g., events, logs, purchases).
Creating a Stream
Dashboard UI
- Navigate to dashboard.enrich.sh
- Go to Streams → Create New Stream
- Fill in the configuration form
- Click Create
API
curl -X POST https://enrich.sh/streams \
-H "Authorization: Bearer sk_live_your_key" \
-H "Content-Type: application/json" \
-d '{
"stream_id": "events",
"schema_mode": "evolve",
"fields": {
"ts": { "name": "timestamp", "type": "int64" },
"url": { "name": "page_url", "type": "string" },
"data": { "name": "payload", "type": "json" }
}
}'Configuration Options
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
stream_id | string | ✅ | — | Unique identifier (alphanumeric, _, -) |
fields | object | ❌ | null | Field type definitions (see below) |
template | string | ❌ | null | Enrichment template (e.g., clickstream) |
schema_mode | string | ❌ | flex | Schema enforcement mode (see below) |
webhook_url | string | ❌ | null | URL to forward events on flush |
Schema Modes
Every stream has a schema mode that controls how incoming data is validated against your field definitions.
flex (Default)
Accept everything. Map fields you've defined, null the rest.
| Behavior | Description |
|---|---|
| Missing fields | Stored as null |
| Extra fields | Silently dropped |
| Type mismatch | Coerced if possible, null otherwise |
| New columns | Ignored |
Best for: Prototyping, schemaless sources, getting started quickly.
curl -X POST https://enrich.sh/streams \
-H "Authorization: Bearer sk_live_your_key" \
-d '{ "stream_id": "raw_logs", "schema_mode": "flex" }'evolve
Accept everything, but detect and alert when the incoming data doesn't match your schema. New fields are automatically added to the schema.
| Behavior | Description |
|---|---|
| Missing fields | Stored as null, alert generated |
| Extra fields | Auto-added to schema, alert generated |
| Type mismatch | Coerced, alert if type changed |
| New columns | Accepted + schema updated |
Best for: SaaS API data, ERP exports, any source that changes over time.
curl -X POST https://enrich.sh/streams \
-H "Authorization: Bearer sk_live_your_key" \
-d '{
"stream_id": "erp_events",
"schema_mode": "evolve",
"fields": {
"order_id": { "type": "string" },
"amount": { "type": "float64" },
"currency": { "type": "string" }
}
}'TIP
When evolve detects a schema change, it logs the event to the Observability page: new fields, missing fields, and type changes are all surfaced automatically.
strict
Reject events that don't exactly match the defined schema. Rejected events go to the Dead Letter Queue so nothing is lost.
| Behavior | Description |
|---|---|
| Missing fields | Event rejected → DLQ |
| Extra fields | Event rejected → DLQ |
| Type mismatch | Event rejected → DLQ |
| New columns | Not accepted |
Best for: Financial data, compliance, data contracts, any source where schema must be exact.
curl -X POST https://enrich.sh/streams \
-H "Authorization: Bearer sk_live_your_key" \
-d '{
"stream_id": "transactions",
"schema_mode": "strict",
"fields": {
"txn_id": { "type": "string" },
"amount": { "type": "float64" },
"currency": { "type": "string" },
"merchant_id": { "type": "string" }
}
}'WARNING
strict mode requires field definitions. If you set strict without defining fields, every event will be rejected.
Choosing the Right Mode
| Scenario | Recommended Mode |
|---|---|
| Getting started / testing | flex |
| Integrating a SaaS API | evolve |
| Connecting an ERP system | evolve |
| Financial transactions | strict |
| Product analytics | flex or template |
| ML inference logging | evolve |
Schema Change Alerts
When evolve mode detects a schema change, Enrich.sh:
- Logs the change in the Observability UI
- Surfaces it on the stream's detail page
- Includes it in the
/errorsAPI response
Types of Schema Changes Detected
| Change | Example | Alert Level |
|---|---|---|
| New field | country field appeared | ℹ️ Info |
| Missing field | email was always present, now missing | ⚠️ Warning |
| Type change | amount was int64, now receiving string | 🔴 Error |
Viewing Alerts
Dashboard: Observability → Schema Events tab
API:
curl "https://enrich.sh/streams/events/schema-events?days=7" \
-H "Authorization: Bearer sk_live_your_key"Response:
{
"schema_events": [
{
"type": "new_field",
"field": "country",
"detected_type": "string",
"detected_at": "2026-02-15T10:30:00Z",
"event_count": 42
},
{
"type": "type_change",
"field": "amount",
"previous_type": "int64",
"detected_type": "string",
"detected_at": "2026-02-15T11:00:00Z",
"event_count": 3
}
]
}Supported Data Types
| Type | Description | Example Input | Stored As |
|---|---|---|---|
string | Text data (default) | "hello", 123 | "hello", "123" |
int32 | 32-bit integer | "42", 42 | 42 |
int64 | 64-bit integer (timestamps) | 1738776000 | 1738776000 |
float32 | 32-bit float | "3.14", 3.14 | 3.14 |
float64 | 64-bit float (precision) | 99.99 | 99.99 |
boolean | True/false | 1, "true", true | true |
json | Nested objects/arrays | {"a": 1}, [1,2,3] | "{\"a\":1}" |
Type Coercion
Enrich.sh automatically converts incoming data to the specified type:
// Your input
{ "ts": "1738776000", "price": "99.99", "active": 1 }
// With field config:
{
"ts": { "type": "int64" },
"price": { "type": "float64" },
"active": { "type": "boolean" }
}
// Stored as:
{ "ts": 1738776000, "price": 99.99, "active": true }Working with Nested Objects
The json Type
Nested objects and arrays are stored as JSON strings in Parquet. This works directly with DuckDB's JSON functions.
curl -X POST https://enrich.sh/ingest \
-H "Authorization: Bearer sk_live_your_key" \
-H "Content-Type: application/json" \
-d '{
"stream_id": "user_events",
"data": [{
"event": "purchase",
"ts": 1738776000,
"user": {
"id": "user_123",
"email": "[email protected]",
"profile": { "age": 28, "country": "US" }
},
"items": [
{ "sku": "ABC123", "qty": 2, "price": 29.99 },
{ "sku": "XYZ789", "qty": 1, "price": 49.99 }
]
}]
}'Querying Nested JSON with DuckDB
-- Extract nested fields
SELECT
timestamp,
event,
json_extract_string(user, '$.id') AS user_id,
json_extract_string(user, '$.profile.country') AS country,
json_array_length(items) AS item_count
FROM read_parquet('s3://bucket/customer/user_events/2026/02/**/*.parquet');
-- Expand arrays
SELECT
timestamp,
unnest(json_extract(items, '$[*]')) AS item
FROM read_parquet('s3://bucket/customer/user_events/2026/02/**/*.parquet');Field Renaming
Map short field names to descriptive column names:
{
"fields": {
"ts": { "name": "timestamp", "type": "int64" },
"u": { "name": "url", "type": "string" },
"r": { "name": "referrer", "type": "string" },
"d": { "name": "device_info", "type": "json" }
}
}Input: { "ts": 1738776000, "u": "/home", "r": "/login", "d": {"os": "iOS"} }Output columns: timestamp, url, referrer, device_info
This is useful when your client sends abbreviated field names to minimize payload size.
Webhook Forwarding
Forward events to an external URL on every flush. Data is already in memory — zero re-reading.
curl -X POST https://enrich.sh/streams \
-H "Authorization: Bearer sk_live_your_key" \
-H "Content-Type: application/json" \
-d '{
"stream_id": "events",
"webhook_url": "https://your-api.com/webhook",
"fields": {
"event": { "type": "string" },
"user_id": { "type": "string" }
}
}'| Webhook Behavior | Description |
|---|---|
| Trigger | On every flush (same batch as R2 write) |
| Method | POST with JSON body |
| Failure | Data is safe in R2 regardless — webhook is fire-and-forget |
| Retries | Best-effort, no guarantees |
INFO
Webhooks are useful for real-time alerting, Slack notifications, or pushing to systems that can't read from S3/R2 directly.
Metadata Columns
Every event automatically includes these metadata columns (prefixed with _):
| Column | Type | Description |
|---|---|---|
_ingested_at | int64 | Server timestamp (Unix ms) |
_client_ip | string | Client IP address |
_country | string | Country code (Cloudflare) |
_city | string | City name |
_region | string | Region / state |
_isp | string | ISP / ASN organization |
These are added automatically — you don't include them in your events.
Connecting Your Warehouse
Your data is stored in S3-compatible R2 storage. Connect directly from any warehouse.
Dashboard → Connect
Go to Dashboard → Stream → Connect to get:
- S3 credentials (endpoint, bucket, access key, secret)
- Copy-paste SQL for ClickHouse, BigQuery, DuckDB, Snowflake
- Credentials are scoped read-only to your bucket
Direct S3 Access
| Property | Value |
|---|---|
| S3 Endpoint | {account_id}.r2.cloudflarestorage.com |
| Bucket | enrich-{customer_id} |
| Access | Scoped read-only to your data |
| Glob patterns | s3://bucket/events/2026/**/*.parquet ✅ |
ClickHouse
SELECT *
FROM s3(
'https://{account_id}.r2.cloudflarestorage.com/enrich-{customer_id}/events/2026/02/**/*.parquet',
'{access_key}',
'{secret_key}',
'Parquet'
);DuckDB
SET s3_region = 'auto';
SET s3_endpoint = '{account_id}.r2.cloudflarestorage.com';
SET s3_access_key_id = '{access_key}';
SET s3_secret_access_key = '{secret_key}';
SELECT * FROM read_parquet('s3://enrich-{customer_id}/events/2026/02/**/*.parquet');BigQuery
CREATE EXTERNAL TABLE `project.dataset.events`
OPTIONS (
format = 'PARQUET',
uris = ['gs://your-gcs-mirror/events/2026/02/*.parquet']
);Python + DuckDB
import duckdb
conn = duckdb.connect()
conn.execute("""
SET s3_region = 'auto';
SET s3_endpoint = '{account_id}.r2.cloudflarestorage.com';
SET s3_access_key_id = '{access_key}';
SET s3_secret_access_key = '{secret_key}';
""")
df = conn.execute("""
SELECT event, COUNT(*) as count
FROM read_parquet('s3://enrich-{customer_id}/events/2026/02/**/*.parquet')
GROUP BY event
ORDER BY count DESC
""").fetchdf()Updating a Stream
Update configuration without losing data:
curl -X PUT https://enrich.sh/streams/events \
-H "Authorization: Bearer sk_live_your_key" \
-d '{
"schema_mode": "evolve",
"fields": {
"ts": { "name": "timestamp", "type": "int64" },
"url": { "type": "string" },
"new_field": { "type": "json" }
}
}'WARNING
Schema Evolution: Adding new fields is safe. Removing or renaming fields may cause issues with historical data queries.
Stream Replay
Re-send historical events from a stream to a webhook URL. Useful for ML model retraining, backfilling downstream systems, or disaster recovery.
curl -X POST https://enrich.sh/streams/events/replay \
-H "Authorization: Bearer sk_live_your_key" \
-H "Content-Type: application/json" \
-d '{
"from": "2026-02-01",
"to": "2026-02-14",
"webhook_url": "https://your-api.com/replay-target"
}'| Parameter | Type | Description |
|---|---|---|
from | string | Start date (YYYY-MM-DD) |
to | string | End date (YYYY-MM-DD) |
webhook_url | string | Target URL for replayed events |
Deleting a Stream
curl -X DELETE https://enrich.sh/streams/events \
-H "Authorization: Bearer sk_live_your_key"WARNING
This deletes the stream configuration only. Data already stored in R2 is not deleted.
Examples
Simple Event Tracking
{
"stream_id": "page_views",
"schema_mode": "flex",
"fields": {
"url": { "type": "string" },
"referrer": { "type": "string" },
"session_id": { "type": "string" },
"ts": { "name": "timestamp", "type": "int64" }
}
}E-commerce with Strict Schema
{
"stream_id": "transactions",
"schema_mode": "strict",
"fields": {
"order_id": { "type": "string" },
"user_id": { "type": "string" },
"amount": { "type": "float64" },
"currency": { "type": "string" },
"items": { "type": "json" },
"ts": { "name": "purchased_at", "type": "int64" }
}
}ERP Integration with Evolve
{
"stream_id": "erp_orders",
"schema_mode": "evolve",
"fields": {
"order_number": { "type": "string" },
"customer_id": { "type": "string" },
"total": { "type": "float64" },
"line_items": { "type": "json" }
}
}IoT Sensor Data
{
"stream_id": "sensors",
"schema_mode": "evolve",
"fields": {
"device_id": { "type": "string" },
"reading": { "type": "float64" },
"unit": { "type": "string" },
"location": { "type": "json" },
"ts": { "name": "recorded_at", "type": "int64" }
}
}