Dozor

Ingest pipeline

Traces a single rrweb event from browser capture to Postgres row, naming every module along the way. Read the high-level walkthrough first if you haven't.

Pipeline is synchronous Postgres writes, end to end. No queue, no worker, no Kafka. Simplicity over throughput — explained at the bottom of this page.

Stage 1 — Capture (browser-side)

When Dozor.init({ apiKey, endpoint }) runs, the SDK wires up rrweb and binds a few listeners.

What rrweb captures

The recorder subscribes to:

  • MutationObserver — every DOM node added, removed, attribute changed
  • mousemove / mousedown / mouseup / click / scroll — pointer
    • scroll positions
  • input / focus / blur — form interactions (with masking applied before the event is buffered)
  • console.log/warn/error/info/debug — when recordConsole: true (default)

Each capture produces an rrweb event:

type rrwebEvent = {
  type: number; // discriminator: 2 = FullSnapshot, 3 = IncrementalSnapshot, 4 = Meta, ...
  data: unknown; // type-specific payload
  timestamp: number; // ms since epoch
};

Privacy applies at the source

Before an event lands in the buffer, the privacy layer runs:

  • Element with data-dozor-mask → text content replaced with asterisks in the captured node tree
  • Element with data-dozor-block → entire subtree replaced with a same-size empty placeholder
  • <input> / <textarea> / <select> value → asterisks (when privacyMaskInputs: true)

The unmasked content never enters the buffer. There's nothing to redact later — masked stays masked.

Buffer accumulation

Captured events go into an in-memory array. The buffer fills until one of these triggers:

TriggerWhen
Bootstrap (eager)Immediately after start() returns — ships the initial Meta + FullSnapshot via the regular path.
Flush intervalEvery flushInterval ms (default 60_000)
Buffer size capWhen events.length >= batchSize (default 2000)
Tab visibility changevisibilitychange → hidden
Page unloadbeforeunload / pagehide
Manual release()When transport was held

In-stream marker events

The SDK emits rrweb custom events (type=5) inline in the event stream — no separate channel. Two tags are reserved:

  • dozor:url — fires on SPA navigation (popstate, pushState, replaceState). Payload: { url, pathname }. Hash-only and query-only changes deliberately don't fire — they're not real navigations. Each url marker is followed by an rrweb FullSnapshot (type=2) so replay can render the new DOM.
  • dozor:identity — fires when dozor.identify(userId, traits) is called mid-session. Payload: { userId, traits? }. Pre-start identify rides on the first batch's metadata.userIdentity instead.

The server extracts these tags into typed Marker rows on ingest.

Stage 2 — Transport (browser → server)

When a flush triggers, the SDK serialises the buffer to JSON and ships it.

Serialisation + compression

Buffer (array of events) → JSON.stringify → byte string

            ┌─ regular flush ────────────────────┐   ┌─ keepalive flush (page unload) ─┐
            │ if size > 1 KB:                    │   │ always:                          │
            │   CompressionStream("gzip") → bytes│   │   fflate.gzipSync → bytes        │
            │   Content-Encoding: gzip           │   │   Content-Encoding: gzip         │
            └────────────────────────────────────┘   └──────────────────────────────────┘

Two compression paths exist because they have incompatible constraints. Regular flushes (the live page) use the browser-native CompressionStream — async streaming, ratio-optimal, no bundle cost. Keepalive flushes can't await the CompressionStream drain inside beforeunload / visibilitychange:hidden, so the SDK ships a sync gzip via fflate. Without that, a CSS-heavy page's bootstrap (Meta + FullSnapshot, 200–400 KB raw uncompressed) would silently overshoot the browser's ~64 KB keepalive body cap and the request would never reach the server.

Browsers without CompressionStream (older Safari, older Firefox) fall back to uncompressed JSON for regular flushes; keepalive always uses fflate so it stays compressed regardless.

POST request shape

POST /api/ingest HTTP/1.1
Host: your-dashboard.com
Content-Type: application/json
Content-Encoding: gzip                  # only if gzipped
X-Dozor-Public-Key: dp_a1b2c3...32hex

[gzipped JSON body]

The body is the IngestPayload shape:

{
  sessionId: string;          // UUID, stable per browser session
  events: rrwebEvent[];       // ≤ 500 per batch (incl. `dozor:*` markers)
  metadata?: SessionMetadata; // first batch + after `identify()`
}

metadata lands in the first batch and is re-shipped after a mid-session identify() so the server learns the updated identity. It carries url, referrer, userAgent, screenWidth, screenHeight, language, plus the optional userIdentity: { userId, traits }.

Retry semantics

Server responseSDK behaviour
204Success — proceed to next batch
4xx (400, 401, etc.)No retry. Buffer dropped, error logged in [dozor]
5xx or network errorRetry with exponential backoff: 1s → 2s → 4s
All retries exhaustedEvents re-queued to buffer, retried on the next flush cycle
Buffer reaches 10,000 eventsOldest dropped first (FIFO)

Page-unload flushes use fetch({ keepalive: true }) with a sync-gzipped body — the browser ships the request even after navigation, fire-and-forget. No retry on these. The eager bootstrap flush after start() covers the same window with a regular gzip + retry path, so unload-time keepalive typically carries only the incremental tail.

Stage 3 — Receive (server-side)

The request arrives at src/app/api/ingest/route.ts.

withPublicKey HOF — auth + CORS

src/app/api/ingest/route.ts
// `withPublicKey` is from src/app/api/_lib/with-public-key.ts (simplified usage shown)
export const POST = withPublicKey(async ({ project, req }) => {
  // ... handler body
});

The HOF runs before the handler:

  1. Reads X-Dozor-Public-Key header
  2. Looks up Project by key @unique (single indexed query)
  3. If not found → 401 Unauthorized with structured { kind: "auth", ... }
  4. If found → calls handler with { project, req }
  5. Applies CORS headers to the response (success or failure)
  6. Catches HttpError / ZodError and serialises to JSON

The handler never sees auth code, never sees CORS code. It sees a validated project and a req and produces a response.

parseIngestBody — gzip + JSON

src/app/api/ingest/_helpers/parse-body.ts
export async function parseIngestBody(req: Request): Promise<unknown> {
  if (req.headers.get("Content-Encoding") === "gzip") {
    const decoder = new DecompressionStream("gzip");
    const decompressed = req.body!.pipeThrough(decoder);
    const text = await new Response(decompressed).text();
    return JSON.parse(text);
  }
  return req.json();
}

Returns unknown. The route then runs ingestSchema.parse(...) — malformed payload throws ZodError, the HOF converts to 400.

Stage 4 — Persist (database writes)

Five sequential steps. Each one reads rows the previous wrote — no parallelism because the data dependencies are real.

src/app/api/ingest/_helpers/session-upsert.ts
// Simplified — full helper handles error paths + metadata first-batch logic
const session = await prisma.session.upsert({
  where: { projectId_externalId: { projectId, externalId } },
  create: { externalId, projectId, url, userAgent, ..., startedAt, endedAt, eventCount, duration },
  update: hasEvents ? { endedAt: maxTs, eventCount: { increment: events.length } } : {},
});

if (metadata?.userIdentity) {
  const trackedUser = await prisma.trackedUser.upsert({
    where: { projectId_externalId: { projectId, externalId: userId } },
    create: { externalId: userId, projectId, traits },
    update: { traits },
  });
  await prisma.session.update({ where: { id: session.id }, data: { trackedUserId: trackedUser.id } });
}

Key invariants:

  • Idempotent — same (projectId, externalId) upserts to the same row. Duplicate batches don't double-count.
  • First batch creates the row with full metadata. Subsequent batches only bump endedAt + eventCount.
  • Identity is one-way — once a session is linked to a tracked user, batches without userIdentity don't unlink. Re-identifying with a different userId does relink.

Step 2 — EventBatch insert

The events array is gzip-compressed as a JSON blob and stored in a single EventBatch row:

src/app/api/ingest/_helpers/event-batch.ts
const json = JSON.stringify(events);
const stream = new Blob([json]).stream().pipeThrough(new CompressionStream("gzip"));
const data = Buffer.from(await new Response(stream).arrayBuffer());

await prisma.eventBatch.create({
  data: {
    sessionId,
    firstTimestamp: BigInt(min(events.map(e => e.timestamp))),
    lastTimestamp: BigInt(max(events.map(e => e.timestamp))),
    eventCount: events.length,
    data,
  },
});

One INSERT per ingest POST. No row-per-event, no slice aggregates. Out-of-order or concurrent batches are non-issues — each lands as its own row. Read paths sort by (sessionId, firstTimestamp ASC, id ASC) to produce a stable replay timeline.

Step 3 — Marker extraction

Scan the batch for rrweb custom events (type=5) whose data.tag starts with dozor:. Each match produces a typed Marker row:

src/app/api/ingest/_helpers/markers.ts
const rows = events
  .filter((e) => e.type === 5 && e.data?.tag?.startsWith("dozor:"))
  .map((e) => ({
    sessionId,
    timestamp: BigInt(e.timestamp),
    kind: e.data.tag.slice("dozor:".length),  // "url" | "identity"
    data: e.data.payload,
  }));

if (rows.length > 0) await prisma.marker.createMany({ data: rows });

Stats queries (page distribution, activity histogram, timeline periods) read this table directly — no decompression of event blobs needed.

Step 3 (alt) — Initial url marker on session creation

When upsertSessionAndLinkTrackedUser reports wasCreated: true (first batch of a brand-new session), the route synthesises an initial Marker(kind="url") from metadata.url:

src/app/api/ingest/_helpers/markers.ts
await prisma.marker.create({
  data: {
    sessionId,
    timestamp: BigInt(session.startedAt.getTime()),
    kind: "url",
    data: { url: metadata.url, pathname: derivedPathname },
  },
});

This guarantees every session has at least one URL anchor. Stats queries can assume "first url-marker = session start" without special-casing single-page sessions.

Step 4 — Fire-and-forget lastUsedAt

src/app/api/ingest/route.ts
prisma.project.update({ where: { id: project.id }, data: { lastUsedAt: new Date() } }).catch(() => {});

A slow or failed update here must never break ingestion itself. The lastUsedAt field is for the dashboard's "last seen" UI hint — if it's a few seconds stale or skipped on one batch, nobody notices.

Stage 5 — Response

HTTP/1.1 204 No Content
Access-Control-Allow-Origin: *
Cache-Control: no-store

Empty body. The SDK proceeds to the next batch.

Logging surface

Every state-mutating step emits a structured pino log with a domain:entity:action[:state] tag — see Architecture → Observability for the full convention. Tags emitted by ingest:

TagWhenLevel
ingest:batch:receivedSuccessful batch acceptanceinfo
ingest:tracked_user:linkedIdentity linked or relinkeddebug
ingest:auth:invalid_key401 from withPublicKeywarn

These are the search keys for finding ingestion-related events in your host's function logs (Vercel logs, or wherever you ship pino output).

Why no queue?

The whole stage 4 happens inside a single function invocation. On a warm-DB, same-region deploy the round-trip is fast enough that the SDK's batching cadence (60 s default) absorbs any spikes — measure on your own deploy if it matters.

If ingest volume grows past single-request capacity (Vercel's function timeout is 60 s on Hobby and 300 s on Pro, your host's limits will differ; Neon's connection limit is finite), the canonical next step is a queue + worker:

SDK → /api/ingest → enqueue (Inngest / Trigger.dev) → 204
                  ↓ (later, async)
                  worker → batched Postgres writes

This is deliberately not implemented yet. The product is self-hosted, traffic profile is per-deploy, and the simpler path ("synchronous writes") works for the first several orders of magnitude. Adding a queue is a known next step, not a current need.

See Self-hosting reference → Capacity planning for concrete numbers on when this matters.

Code map (every file referenced)

FileRole
src/app/api/ingest/route.tsEntry point — composes the helpers below
src/app/api/_lib/with-public-key.tsPublic-key auth HOF + CORS
src/app/api/_lib/cors.tsOPTIONS preflight + CORS headers
src/app/api/ingest/_helpers/parse-body.tsgzip-aware body read + Zod schema
src/app/api/ingest/_helpers/session-upsert.tsStep 1 — session upsert + identity link
src/app/api/ingest/_helpers/event-batch.tsStep 2 — gzip + INSERT EventBatch
src/app/api/ingest/_helpers/markers.tsStep 3 — dozor:* marker extraction + initial url-marker on first batch
prisma/schema.prismaTables: Session, EventBatch, Marker, TrackedUser, Project

See also

On this page