Ingest pipeline
Traces a single rrweb event from browser capture to Postgres row, naming every module along the way. Read the high-level walkthrough first if you haven't.
Pipeline is synchronous Postgres writes, end to end. No queue, no worker, no Kafka. Simplicity over throughput — explained at the bottom of this page.
Stage 1 — Capture (browser-side)
When Dozor.init({ apiKey, endpoint }) runs, the SDK wires up rrweb
and binds a few listeners.
What rrweb captures
The recorder subscribes to:
MutationObserver— every DOM node added, removed, attribute changedmousemove/mousedown/mouseup/click/scroll— pointer- scroll positions
input/focus/blur— form interactions (with masking applied before the event is buffered)console.log/warn/error/info/debug— whenrecordConsole: true(default)
Each capture produces an rrweb event:
type rrwebEvent = {
type: number; // discriminator: 2 = FullSnapshot, 3 = IncrementalSnapshot, 4 = Meta, ...
data: unknown; // type-specific payload
timestamp: number; // ms since epoch
};Privacy applies at the source
Before an event lands in the buffer, the privacy layer runs:
- Element with
data-dozor-mask→ text content replaced with asterisks in the captured node tree - Element with
data-dozor-block→ entire subtree replaced with a same-size empty placeholder <input>/<textarea>/<select>value → asterisks (whenprivacyMaskInputs: true)
The unmasked content never enters the buffer. There's nothing to redact later — masked stays masked.
Buffer accumulation
Captured events go into an in-memory array. The buffer fills until one of these triggers:
| Trigger | When |
|---|---|
| Bootstrap (eager) | Immediately after start() returns — ships the initial Meta + FullSnapshot via the regular path. |
| Flush interval | Every flushInterval ms (default 60_000) |
| Buffer size cap | When events.length >= batchSize (default 2000) |
| Tab visibility change | visibilitychange → hidden |
| Page unload | beforeunload / pagehide |
Manual release() | When transport was held |
In-stream marker events
The SDK emits rrweb custom events (type=5) inline in the event
stream — no separate channel. Two tags are reserved:
dozor:url— fires on SPA navigation (popstate,pushState,replaceState). Payload:{ url, pathname }. Hash-only and query-only changes deliberately don't fire — they're not real navigations. Each url marker is followed by an rrweb FullSnapshot (type=2) so replay can render the new DOM.dozor:identity— fires whendozor.identify(userId, traits)is called mid-session. Payload:{ userId, traits? }. Pre-start identify rides on the first batch'smetadata.userIdentityinstead.
The server extracts these tags into typed Marker rows on ingest.
Stage 2 — Transport (browser → server)
When a flush triggers, the SDK serialises the buffer to JSON and ships it.
Serialisation + compression
Buffer (array of events) → JSON.stringify → byte string
↓
┌─ regular flush ────────────────────┐ ┌─ keepalive flush (page unload) ─┐
│ if size > 1 KB: │ │ always: │
│ CompressionStream("gzip") → bytes│ │ fflate.gzipSync → bytes │
│ Content-Encoding: gzip │ │ Content-Encoding: gzip │
└────────────────────────────────────┘ └──────────────────────────────────┘Two compression paths exist because they have incompatible
constraints. Regular flushes (the live page) use the
browser-native CompressionStream — async streaming, ratio-optimal,
no bundle cost. Keepalive flushes can't await the
CompressionStream drain inside beforeunload /
visibilitychange:hidden, so the SDK ships a sync gzip via fflate.
Without that, a CSS-heavy page's bootstrap (Meta + FullSnapshot,
200–400 KB raw uncompressed) would silently overshoot the browser's
~64 KB keepalive body cap and the request would never reach the
server.
Browsers without CompressionStream (older Safari, older Firefox)
fall back to uncompressed JSON for regular flushes; keepalive always
uses fflate so it stays compressed regardless.
POST request shape
POST /api/ingest HTTP/1.1
Host: your-dashboard.com
Content-Type: application/json
Content-Encoding: gzip # only if gzipped
X-Dozor-Public-Key: dp_a1b2c3...32hex
[gzipped JSON body]The body is the IngestPayload shape:
{
sessionId: string; // UUID, stable per browser session
events: rrwebEvent[]; // ≤ 500 per batch (incl. `dozor:*` markers)
metadata?: SessionMetadata; // first batch + after `identify()`
}metadata lands in the first batch and is re-shipped after a
mid-session identify() so the server learns the updated identity.
It carries url, referrer, userAgent, screenWidth,
screenHeight, language, plus the optional
userIdentity: { userId, traits }.
Retry semantics
| Server response | SDK behaviour |
|---|---|
204 | Success — proceed to next batch |
4xx (400, 401, etc.) | No retry. Buffer dropped, error logged in [dozor] |
5xx or network error | Retry with exponential backoff: 1s → 2s → 4s |
| All retries exhausted | Events re-queued to buffer, retried on the next flush cycle |
| Buffer reaches 10,000 events | Oldest dropped first (FIFO) |
Page-unload flushes use fetch({ keepalive: true }) with a
sync-gzipped body — the browser ships the request even after
navigation, fire-and-forget. No retry on these. The eager bootstrap
flush after start() covers the same window with a regular
gzip + retry path, so unload-time keepalive typically carries only
the incremental tail.
Stage 3 — Receive (server-side)
The request arrives at src/app/api/ingest/route.ts.
withPublicKey HOF — auth + CORS
// `withPublicKey` is from src/app/api/_lib/with-public-key.ts (simplified usage shown)
export const POST = withPublicKey(async ({ project, req }) => {
// ... handler body
});The HOF runs before the handler:
- Reads
X-Dozor-Public-Keyheader - Looks up
Projectbykey @unique(single indexed query) - If not found →
401 Unauthorizedwith structured{ kind: "auth", ... } - If found → calls handler with
{ project, req } - Applies CORS headers to the response (success or failure)
- Catches
HttpError/ZodErrorand serialises to JSON
The handler never sees auth code, never sees CORS code. It sees a
validated project and a req and produces a response.
parseIngestBody — gzip + JSON
export async function parseIngestBody(req: Request): Promise<unknown> {
if (req.headers.get("Content-Encoding") === "gzip") {
const decoder = new DecompressionStream("gzip");
const decompressed = req.body!.pipeThrough(decoder);
const text = await new Response(decompressed).text();
return JSON.parse(text);
}
return req.json();
}Returns unknown. The route then runs ingestSchema.parse(...) —
malformed payload throws ZodError, the HOF converts to 400.
Stage 4 — Persist (database writes)
Five sequential steps. Each one reads rows the previous wrote — no parallelism because the data dependencies are real.
Step 1 — Session upsert + identity link
// Simplified — full helper handles error paths + metadata first-batch logic
const session = await prisma.session.upsert({
where: { projectId_externalId: { projectId, externalId } },
create: { externalId, projectId, url, userAgent, ..., startedAt, endedAt, eventCount, duration },
update: hasEvents ? { endedAt: maxTs, eventCount: { increment: events.length } } : {},
});
if (metadata?.userIdentity) {
const trackedUser = await prisma.trackedUser.upsert({
where: { projectId_externalId: { projectId, externalId: userId } },
create: { externalId: userId, projectId, traits },
update: { traits },
});
await prisma.session.update({ where: { id: session.id }, data: { trackedUserId: trackedUser.id } });
}Key invariants:
- Idempotent — same
(projectId, externalId)upserts to the same row. Duplicate batches don't double-count. - First batch creates the row with full metadata. Subsequent
batches only bump
endedAt+eventCount. - Identity is one-way — once a session is linked to a tracked
user, batches without
userIdentitydon't unlink. Re-identifying with a differentuserIddoes relink.
Step 2 — EventBatch insert
The events array is gzip-compressed as a JSON blob and stored in a
single EventBatch row:
const json = JSON.stringify(events);
const stream = new Blob([json]).stream().pipeThrough(new CompressionStream("gzip"));
const data = Buffer.from(await new Response(stream).arrayBuffer());
await prisma.eventBatch.create({
data: {
sessionId,
firstTimestamp: BigInt(min(events.map(e => e.timestamp))),
lastTimestamp: BigInt(max(events.map(e => e.timestamp))),
eventCount: events.length,
data,
},
});One INSERT per ingest POST. No row-per-event, no slice aggregates.
Out-of-order or concurrent batches are non-issues — each lands as
its own row. Read paths sort by (sessionId, firstTimestamp ASC, id ASC) to produce a stable replay timeline.
Step 3 — Marker extraction
Scan the batch for rrweb custom events (type=5) whose data.tag
starts with dozor:. Each match produces a typed Marker row:
const rows = events
.filter((e) => e.type === 5 && e.data?.tag?.startsWith("dozor:"))
.map((e) => ({
sessionId,
timestamp: BigInt(e.timestamp),
kind: e.data.tag.slice("dozor:".length), // "url" | "identity"
data: e.data.payload,
}));
if (rows.length > 0) await prisma.marker.createMany({ data: rows });Stats queries (page distribution, activity histogram, timeline periods) read this table directly — no decompression of event blobs needed.
Step 3 (alt) — Initial url marker on session creation
When upsertSessionAndLinkTrackedUser reports wasCreated: true
(first batch of a brand-new session), the route synthesises an
initial Marker(kind="url") from metadata.url:
await prisma.marker.create({
data: {
sessionId,
timestamp: BigInt(session.startedAt.getTime()),
kind: "url",
data: { url: metadata.url, pathname: derivedPathname },
},
});This guarantees every session has at least one URL anchor. Stats queries can assume "first url-marker = session start" without special-casing single-page sessions.
Step 4 — Fire-and-forget lastUsedAt
prisma.project.update({ where: { id: project.id }, data: { lastUsedAt: new Date() } }).catch(() => {});A slow or failed update here must never break ingestion itself. The
lastUsedAt field is for the dashboard's "last seen" UI hint — if
it's a few seconds stale or skipped on one batch, nobody notices.
Stage 5 — Response
HTTP/1.1 204 No Content
Access-Control-Allow-Origin: *
Cache-Control: no-storeEmpty body. The SDK proceeds to the next batch.
Logging surface
Every state-mutating step emits a structured pino log with a
domain:entity:action[:state] tag — see
Architecture → Observability
for the full convention. Tags emitted by ingest:
| Tag | When | Level |
|---|---|---|
ingest:batch:received | Successful batch acceptance | info |
ingest:tracked_user:linked | Identity linked or relinked | debug |
ingest:auth:invalid_key | 401 from withPublicKey | warn |
These are the search keys for finding ingestion-related events in your host's function logs (Vercel logs, or wherever you ship pino output).
Why no queue?
The whole stage 4 happens inside a single function invocation. On a warm-DB, same-region deploy the round-trip is fast enough that the SDK's batching cadence (60 s default) absorbs any spikes — measure on your own deploy if it matters.
If ingest volume grows past single-request capacity (Vercel's function timeout is 60 s on Hobby and 300 s on Pro, your host's limits will differ; Neon's connection limit is finite), the canonical next step is a queue + worker:
SDK → /api/ingest → enqueue (Inngest / Trigger.dev) → 204
↓ (later, async)
worker → batched Postgres writesThis is deliberately not implemented yet. The product is self-hosted, traffic profile is per-deploy, and the simpler path ("synchronous writes") works for the first several orders of magnitude. Adding a queue is a known next step, not a current need.
See Self-hosting reference → Capacity planning for concrete numbers on when this matters.
Code map (every file referenced)
| File | Role |
|---|---|
src/app/api/ingest/route.ts | Entry point — composes the helpers below |
src/app/api/_lib/with-public-key.ts | Public-key auth HOF + CORS |
src/app/api/_lib/cors.ts | OPTIONS preflight + CORS headers |
src/app/api/ingest/_helpers/parse-body.ts | gzip-aware body read + Zod schema |
src/app/api/ingest/_helpers/session-upsert.ts | Step 1 — session upsert + identity link |
src/app/api/ingest/_helpers/event-batch.ts | Step 2 — gzip + INSERT EventBatch |
src/app/api/ingest/_helpers/markers.ts | Step 3 — dozor:* marker extraction + initial url-marker on first batch |
prisma/schema.prisma | Tables: Session, EventBatch, Marker, TrackedUser, Project |
See also
- SDK → Wire format — the REST contract this pipeline serves.
- SDK → @kharko/dozor → Init — client-side counterparts to stages 1+2.