Skip to content

Streams Configuration

A stream defines how your data is stored, structured, and optionally enriched. This guide covers everything you need to know about creating and configuring streams.

What is a Stream?

A stream tells Enrich.sh:

  • Where to store your data (path in R2 storage)
  • How to structure your data (field definitions + schema mode)
  • What enrichments to apply (templates)
  • What to do when data doesn't match (reject, evolve, or flex)

You can create multiple streams per account for different data types (e.g., events, logs, purchases).

Creating a Stream

Dashboard UI

  1. Navigate to dashboard.enrich.sh
  2. Go to StreamsCreate New Stream
  3. Fill in the configuration form
  4. Click Create

API

bash
curl -X POST https://enrich.sh/streams \
  -H "Authorization: Bearer sk_live_your_key" \
  -H "Content-Type: application/json" \
  -d '{
    "stream_id": "events",
    "schema_mode": "evolve",
    "fields": {
      "ts": { "name": "timestamp", "type": "int64" },
      "url": { "name": "page_url", "type": "string" },
      "data": { "name": "payload", "type": "json" }
    }
  }'

Configuration Options

FieldTypeRequiredDefaultDescription
stream_idstringUnique identifier (alphanumeric, _, -)
fieldsobjectnullField type definitions (see below)
templatestringnullEnrichment template (e.g., clickstream)
schema_modestringflexSchema enforcement mode (see below)
webhook_urlstringnullURL to forward events on flush

Schema Modes

Every stream has a schema mode that controls how incoming data is validated against your field definitions.

flex (Default)

Accept everything. Map fields you've defined, null the rest.

BehaviorDescription
Missing fieldsStored as null
Extra fieldsSilently dropped
Type mismatchCoerced if possible, null otherwise
New columnsIgnored

Best for: Prototyping, schemaless sources, getting started quickly.

bash
curl -X POST https://enrich.sh/streams \
  -H "Authorization: Bearer sk_live_your_key" \
  -d '{ "stream_id": "raw_logs", "schema_mode": "flex" }'

evolve

Accept everything, but detect and alert when the incoming data doesn't match your schema. New fields are automatically added to the schema.

BehaviorDescription
Missing fieldsStored as null, alert generated
Extra fieldsAuto-added to schema, alert generated
Type mismatchCoerced, alert if type changed
New columnsAccepted + schema updated

Best for: SaaS API data, ERP exports, any source that changes over time.

bash
curl -X POST https://enrich.sh/streams \
  -H "Authorization: Bearer sk_live_your_key" \
  -d '{
    "stream_id": "erp_events",
    "schema_mode": "evolve",
    "fields": {
      "order_id": { "type": "string" },
      "amount": { "type": "float64" },
      "currency": { "type": "string" }
    }
  }'

TIP

When evolve detects a schema change, it logs the event to the Observability page: new fields, missing fields, and type changes are all surfaced automatically.

strict

Reject events that don't exactly match the defined schema. Rejected events go to the Dead Letter Queue so nothing is lost.

BehaviorDescription
Missing fieldsEvent rejected → DLQ
Extra fieldsEvent rejected → DLQ
Type mismatchEvent rejected → DLQ
New columnsNot accepted

Best for: Financial data, compliance, data contracts, any source where schema must be exact.

bash
curl -X POST https://enrich.sh/streams \
  -H "Authorization: Bearer sk_live_your_key" \
  -d '{
    "stream_id": "transactions",
    "schema_mode": "strict",
    "fields": {
      "txn_id": { "type": "string" },
      "amount": { "type": "float64" },
      "currency": { "type": "string" },
      "merchant_id": { "type": "string" }
    }
  }'

WARNING

strict mode requires field definitions. If you set strict without defining fields, every event will be rejected.

Choosing the Right Mode

ScenarioRecommended Mode
Getting started / testingflex
Integrating a SaaS APIevolve
Connecting an ERP systemevolve
Financial transactionsstrict
Product analyticsflex or template
ML inference loggingevolve

Schema Change Alerts

When evolve mode detects a schema change, Enrich.sh:

  1. Logs the change in the Observability UI
  2. Surfaces it on the stream's detail page
  3. Includes it in the /errors API response

Types of Schema Changes Detected

ChangeExampleAlert Level
New fieldcountry field appearedℹ️ Info
Missing fieldemail was always present, now missing⚠️ Warning
Type changeamount was int64, now receiving string🔴 Error

Viewing Alerts

Dashboard: Observability → Schema Events tab

API:

bash
curl "https://enrich.sh/streams/events/schema-events?days=7" \
  -H "Authorization: Bearer sk_live_your_key"

Response:

json
{
  "schema_events": [
    {
      "type": "new_field",
      "field": "country",
      "detected_type": "string",
      "detected_at": "2026-02-15T10:30:00Z",
      "event_count": 42
    },
    {
      "type": "type_change",
      "field": "amount",
      "previous_type": "int64",
      "detected_type": "string",
      "detected_at": "2026-02-15T11:00:00Z",
      "event_count": 3
    }
  ]
}

Supported Data Types

TypeDescriptionExample InputStored As
stringText data (default)"hello", 123"hello", "123"
int3232-bit integer"42", 4242
int6464-bit integer (timestamps)17387760001738776000
float3232-bit float"3.14", 3.143.14
float6464-bit float (precision)99.9999.99
booleanTrue/false1, "true", truetrue
jsonNested objects/arrays{"a": 1}, [1,2,3]"{\"a\":1}"

Type Coercion

Enrich.sh automatically converts incoming data to the specified type:

javascript
// Your input
{ "ts": "1738776000", "price": "99.99", "active": 1 }

// With field config:
{
  "ts": { "type": "int64" },
  "price": { "type": "float64" },
  "active": { "type": "boolean" }
}

// Stored as:
{ "ts": 1738776000, "price": 99.99, "active": true }

Working with Nested Objects

The json Type

Nested objects and arrays are stored as JSON strings in Parquet. This works directly with DuckDB's JSON functions.

bash
curl -X POST https://enrich.sh/ingest \
  -H "Authorization: Bearer sk_live_your_key" \
  -H "Content-Type: application/json" \
  -d '{
    "stream_id": "user_events",
    "data": [{
      "event": "purchase",
      "ts": 1738776000,
      "user": {
        "id": "user_123",
        "email": "[email protected]",
        "profile": { "age": 28, "country": "US" }
      },
      "items": [
        { "sku": "ABC123", "qty": 2, "price": 29.99 },
        { "sku": "XYZ789", "qty": 1, "price": 49.99 }
      ]
    }]
  }'

Querying Nested JSON with DuckDB

sql
-- Extract nested fields
SELECT
  timestamp,
  event,
  json_extract_string(user, '$.id') AS user_id,
  json_extract_string(user, '$.profile.country') AS country,
  json_array_length(items) AS item_count
FROM read_parquet('s3://bucket/customer/user_events/2026/02/**/*.parquet');

-- Expand arrays
SELECT
  timestamp,
  unnest(json_extract(items, '$[*]')) AS item
FROM read_parquet('s3://bucket/customer/user_events/2026/02/**/*.parquet');

Field Renaming

Map short field names to descriptive column names:

json
{
  "fields": {
    "ts": { "name": "timestamp", "type": "int64" },
    "u": { "name": "url", "type": "string" },
    "r": { "name": "referrer", "type": "string" },
    "d": { "name": "device_info", "type": "json" }
  }
}

Input: { "ts": 1738776000, "u": "/home", "r": "/login", "d": {"os": "iOS"} }Output columns: timestamp, url, referrer, device_info

This is useful when your client sends abbreviated field names to minimize payload size.


Webhook Forwarding

Forward events to an external URL on every flush. Data is already in memory — zero re-reading.

bash
curl -X POST https://enrich.sh/streams \
  -H "Authorization: Bearer sk_live_your_key" \
  -H "Content-Type: application/json" \
  -d '{
    "stream_id": "events",
    "webhook_url": "https://your-api.com/webhook",
    "fields": {
      "event": { "type": "string" },
      "user_id": { "type": "string" }
    }
  }'
Webhook BehaviorDescription
TriggerOn every flush (same batch as R2 write)
MethodPOST with JSON body
FailureData is safe in R2 regardless — webhook is fire-and-forget
RetriesBest-effort, no guarantees

INFO

Webhooks are useful for real-time alerting, Slack notifications, or pushing to systems that can't read from S3/R2 directly.


Metadata Columns

Every event automatically includes these metadata columns (prefixed with _):

ColumnTypeDescription
_ingested_atint64Server timestamp (Unix ms)
_client_ipstringClient IP address
_countrystringCountry code (Cloudflare)
_citystringCity name
_regionstringRegion / state
_ispstringISP / ASN organization

These are added automatically — you don't include them in your events.


Connecting Your Warehouse

Your data is stored in S3-compatible R2 storage. Connect directly from any warehouse.

Dashboard → Connect

Go to Dashboard → Stream → Connect to get:

  • S3 credentials (endpoint, bucket, access key, secret)
  • Copy-paste SQL for ClickHouse, BigQuery, DuckDB, Snowflake
  • Credentials are scoped read-only to your bucket

Direct S3 Access

PropertyValue
S3 Endpoint{account_id}.r2.cloudflarestorage.com
Bucketenrich-{customer_id}
AccessScoped read-only to your data
Glob patternss3://bucket/events/2026/**/*.parquet

ClickHouse

sql
SELECT *
FROM s3(
  'https://{account_id}.r2.cloudflarestorage.com/enrich-{customer_id}/events/2026/02/**/*.parquet',
  '{access_key}',
  '{secret_key}',
  'Parquet'
);

DuckDB

sql
SET s3_region = 'auto';
SET s3_endpoint = '{account_id}.r2.cloudflarestorage.com';
SET s3_access_key_id = '{access_key}';
SET s3_secret_access_key = '{secret_key}';

SELECT * FROM read_parquet('s3://enrich-{customer_id}/events/2026/02/**/*.parquet');

BigQuery

sql
CREATE EXTERNAL TABLE `project.dataset.events`
OPTIONS (
  format = 'PARQUET',
  uris = ['gs://your-gcs-mirror/events/2026/02/*.parquet']
);

Python + DuckDB

python
import duckdb

conn = duckdb.connect()
conn.execute("""
    SET s3_region = 'auto';
    SET s3_endpoint = '{account_id}.r2.cloudflarestorage.com';
    SET s3_access_key_id = '{access_key}';
    SET s3_secret_access_key = '{secret_key}';
""")

df = conn.execute("""
    SELECT event, COUNT(*) as count
    FROM read_parquet('s3://enrich-{customer_id}/events/2026/02/**/*.parquet')
    GROUP BY event
    ORDER BY count DESC
""").fetchdf()

Updating a Stream

Update configuration without losing data:

bash
curl -X PUT https://enrich.sh/streams/events \
  -H "Authorization: Bearer sk_live_your_key" \
  -d '{
    "schema_mode": "evolve",
    "fields": {
      "ts": { "name": "timestamp", "type": "int64" },
      "url": { "type": "string" },
      "new_field": { "type": "json" }
    }
  }'

WARNING

Schema Evolution: Adding new fields is safe. Removing or renaming fields may cause issues with historical data queries.


Stream Replay

Re-send historical events from a stream to a webhook URL. Useful for ML model retraining, backfilling downstream systems, or disaster recovery.

bash
curl -X POST https://enrich.sh/streams/events/replay \
  -H "Authorization: Bearer sk_live_your_key" \
  -H "Content-Type: application/json" \
  -d '{
    "from": "2026-02-01",
    "to": "2026-02-14",
    "webhook_url": "https://your-api.com/replay-target"
  }'
ParameterTypeDescription
fromstringStart date (YYYY-MM-DD)
tostringEnd date (YYYY-MM-DD)
webhook_urlstringTarget URL for replayed events

Deleting a Stream

bash
curl -X DELETE https://enrich.sh/streams/events \
  -H "Authorization: Bearer sk_live_your_key"

WARNING

This deletes the stream configuration only. Data already stored in R2 is not deleted.


Examples

Simple Event Tracking

json
{
  "stream_id": "page_views",
  "schema_mode": "flex",
  "fields": {
    "url": { "type": "string" },
    "referrer": { "type": "string" },
    "session_id": { "type": "string" },
    "ts": { "name": "timestamp", "type": "int64" }
  }
}

E-commerce with Strict Schema

json
{
  "stream_id": "transactions",
  "schema_mode": "strict",
  "fields": {
    "order_id": { "type": "string" },
    "user_id": { "type": "string" },
    "amount": { "type": "float64" },
    "currency": { "type": "string" },
    "items": { "type": "json" },
    "ts": { "name": "purchased_at", "type": "int64" }
  }
}

ERP Integration with Evolve

json
{
  "stream_id": "erp_orders",
  "schema_mode": "evolve",
  "fields": {
    "order_number": { "type": "string" },
    "customer_id": { "type": "string" },
    "total": { "type": "float64" },
    "line_items": { "type": "json" }
  }
}

IoT Sensor Data

json
{
  "stream_id": "sensors",
  "schema_mode": "evolve",
  "fields": {
    "device_id": { "type": "string" },
    "reading": { "type": "float64" },
    "unit": { "type": "string" },
    "location": { "type": "json" },
    "ts": { "name": "recorded_at", "type": "int64" }
  }
}

Serverless data ingestion for developers.