Skip to Content

Phase 3 — Inngest Integration

Effort: L

Goal

Move every side-effectful operation (Meta API calls, webhook fan-out, media downloads, client notification push, retention crons) behind Inngest functions. Gain retries, concurrency control, durability across restarts, and a single observable history of what the system did.

Deliverables

Inngest SDK + wiring

  • src/inngest/client.ts — singleton Inngest({ id: 'wa-mcp', eventKey, signingKey }).
  • POST /api/inngest mounted on the Express server via the SDK’s serve adapter. No app-layer auth on this route — the SDK verifies INNGEST_SIGNING_KEY on every request. Nginx (Phase 7) rate-limits the path.
  • Until Phase 7 brings Nginx, expose /api/inngest to Inngest Cloud via cloudflared or the Inngest dev tunnel.

Event schemas (zod)

  • src/inngest/events.ts — every event has a zod schema; types derive from the schemas.
    • wa/webhook.received{ phoneNumberId, rawPayload, derivedEventId }
    • wa/message.send.requested{ clientId, phoneNumberId, to, type, payload, requestId }
    • wa/message.send.completed{ requestId, wamid, status }
    • wa/media.download.requested{ clientId, phoneNumberId, mediaId, wamid }
    • wa/media.upload.requested{ clientId, phoneNumberId, localPath, mime, requestId }
    • mcp/client.notify{ clientId, phoneNumberId, kind, wamid }
    • wa/status.update.received{ phoneNumberId, wamid, status, error, raw }
    • cron/audit.archive, cron/messages.retention, cron/media.retention, cron/rate-limit.gc

Functions

  • src/inngest/functions/process-message.ts:
    • Triggered by wa/webhook.received.
    • Idempotency: SDK-level idempotencyKey = data.derivedEventId and INSERT INTO inngest_idempotency ... ON CONFLICT DO NOTHING belt-and-braces.
    • Steps: normaliseupsert contactinsert message → if media → emit wa/media.download.requested; if interactive reply → record payload.selectedId; → emit mcp/client.notify (Phase 4 fans out to all granted clients; Phase 3 emits one for the owner only).
  • src/inngest/functions/send-message.ts:
    • Triggered by wa/message.send.requested.
    • Concurrency key: phone_number_id (one Meta call per number at a time, capped across clients).
    • Retries: SDK defaults for 5xx/429 with exponential backoff, max 4 attempts.
    • On success: update the queued messages row to status='sent' with the returned wamid; emit wa/message.send.completed.
    • On terminal failure: update row to status='failed' with the typed error code.
    • Daily-cap enforcement lives here (after dequeue) so retries don’t double-count. (Implementation lands in Phase 4 with the rate-limit module; Phase 3 leaves a TODO calling an as-yet-untyped enforceDailyCap helper.)
  • src/inngest/functions/status-updater.ts:
    • Triggered by wa/status.update.received.
    • Updates messages.status (delivered / read / failed) and error_code if present.
  • src/inngest/functions/download-media.tsstub with the function signature + idempotency wiring. Body comes in Phase 6.
  • src/inngest/functions/upload-media.tsstub.
  • src/inngest/functions/push-client-notification.tsstub (in-process session map lives in Phase 5; here this function just logs).
  • src/inngest/functions/index.ts — array of all functions, exported to serve().

Tool refactor

  • src/tools/send-message.ts:
    • Validate input.
    • Insert messages row with status='queued' and a generated request_id.
    • inngest.send('wa/message.send.requested', { ... }).
    • Return { queueId, status: 'queued' } immediately.
    • Optional wait=true: uses step.waitForEvent('wa/message.send.completed', { match: 'data.requestId' }) with a 30s timeout — better LLM ergonomics when blocking on the result is helpful. Default wait=false so the model stays snappy.

Webhook refactor

  • src/webhook/meta.ts POST handler now:
    1. Verify signature.
    2. Derive event_id, INSERT ... ON CONFLICT DO NOTHING into inngest_idempotency.
    3. If new: inngest.send('wa/webhook.received', { ... }).
    4. Return 200 immediately (≤ 50ms typical).
  • No DB writes to messages or contacts in the request handler anymore.

Crons

  • src/inngest/functions/gc-rate-limits.ts — every 10m, deletes rate_limit_buckets rows where updated_at < now() - interval '25 hours'. (Phase 4 brings the bucket writes; the cron is wired now so it’s ready.)
  • Other cron functions (audit.archive, messages.retention, media.retention) are stubbed; bodies in Phase 4 / Phase 6.

Docs (extended)

  • docs/architecture/inngest.md — event catalogue, function responsibilities, concurrency strategy, idempotency contract.
  • docs/components/inngest-runner.md — how the Inngest serve route is mounted, how the SDK verifies signatures, dev-vs-prod tunnel.
  • docs/architecture/webhook.md — updated: handler now enqueues; processing happens in process-message.
  • docs/api/mcp-tools.mdsend_message documented with both async (default) and wait=true modes.

Critical files

Tests

Unit

  • tests/unit/inngest/events.test.ts — every event’s zod schema accepts valid samples and rejects malformed.

Integration (testcontainers Postgres + Inngest test helpers)

  • tests/integration/inngest/process-message.test.ts:
    • Inbound text → row inserted in messages, contact upserted, mcp/client.notify emitted.
    • Replay same derivedEventId → second invocation no-ops; one row, one mcp/client.notify event.
    • Inbound media → wa/media.download.requested event emitted (no actual download yet).
  • tests/integration/inngest/send-message.test.ts:
    • msw mocks Meta. Happy path: row goes queuedsent with wamid; wa/message.send.completed emitted.
    • 5xx + retry: two outbound HTTP calls, final sent.
    • 131047 from Meta: row → failed, wa/message.send.completed with status='failed'.
    • Concurrency key: simulate 5 simultaneous sends to the same phone_number_id — assert only one outbound HTTP call is in-flight at any moment (use msw delay + counter).
  • tests/integration/inngest/status-updater.test.ts:
    • Delivered status update → row’s status='delivered'.
    • Read status update → status='read'.
  • tests/integration/inngest/idempotency.test.ts:
    • 10 concurrent invocations of process-message with the same derivedEventId → exactly one row inserted (race condition on inngest_idempotency insert).
  • tests/integration/tools/send-message-async.test.ts:
    • wait=false returns { queueId, status: 'queued' } immediately, row exists with status='queued', eventually transitions to sent after function runs.
    • wait=true returns the final { wamid, status: 'sent' } after the function completes.
    • wait=true with simulated 30s+ Meta delay → tool result includes a timeout indication (status: 'timed_out_waiting') but the row eventually settles correctly.

Coverage

  • Phase total ≥ 80 %; src/inngest/functions/process-message.ts ≥ 90 % (idempotency-critical).

Code documentation

  • TSDoc with @remarks on every function in src/inngest/functions/ covering idempotency contract, concurrency key, retry policy.
  • events.ts — each schema gets a description doc-comment.
  • File-level headers on all new files.
  • docs/architecture/inngest.md and docs/components/inngest-runner.md written.
  • docs/architecture/webhook.md updated to reflect the new flow.
  • docs/reference/ regenerated.

Acceptance

  1. Calling send_message with wait=false returns { queueId, status: 'queued' } in ≤ 100 ms; the actual Meta call happens in the Inngest function within a few seconds.
  2. Killing the app mid-send and restarting → Inngest retries the function; the message either succeeds or terminally fails with a row update, never silently drops.
  3. The Inngest dashboard shows green runs for every process-message and send-message invocation; idempotency hits are visible.
  4. POSTing the same webhook payload three times in 5s → exactly one messages row, two inngest_idempotency rows are not created (only one), and the duplicates are visible as no-op invocations in the Inngest dashboard.
  5. pnpm test:ci green; coverage gates met.
  6. Webhook POST handler response time (measured) ≤ 50ms for valid signatures (excluding the network round-trip).

Definition of Done

Inngest SDK + wiring

  • src/inngest/client.ts singleton.
  • POST /api/inngest mounted via SDK serve.
  • cloudflared/Inngest tunnel set up for dev reachability until Phase 7.
  • Signing-key verification confirmed in dashboard.

Event schemas

  • src/inngest/events.ts defines every event with a zod schema and exported types.
  • wa/webhook.received, wa/message.send.requested, wa/message.send.completed, wa/media.download.requested, wa/media.upload.requested, mcp/client.notify, wa/status.update.received, all cron events.

Functions

  • process-message — idempotent (SDK + DB), normalises, upserts, emits notifications.
  • send-message — concurrency key phone_number_id, retries, status transitions.
  • status-updater — handles delivered/read/failed.
  • download-media, upload-media, push-client-notification — stubs in place with correct signatures.
  • gc-rate-limits cron registered (body wired up; data lands in Phase 4).
  • archive-audit, prune-messages, prune-media cron stubs registered.

Tool + webhook refactor

  • send_message enqueues + returns immediately; wait=true mode uses step.waitForEvent.
  • Webhook POST handler does no messages/contacts writes — only idempotency check + enqueue + 200.

Tests

  • tests/unit/inngest/events.test.ts passes.
  • tests/integration/inngest/process-message.test.ts (happy + replay + media trigger) passes.
  • tests/integration/inngest/send-message.test.ts (happy + retry + 131047 + concurrency-key) passes.
  • tests/integration/inngest/status-updater.test.ts passes.
  • tests/integration/inngest/idempotency.test.ts (10 concurrent same-event) passes.
  • tests/integration/tools/send-message-async.test.ts (wait=false + wait=true + timeout) passes.
  • Phase coverage gate ≥ 80%; process-message ≥ 90%.

Documentation

  • docs/architecture/inngest.md written.
  • docs/components/inngest-runner.md written.
  • docs/architecture/webhook.md updated for the new flow.
  • docs/api/mcp-tools.md documents async + wait=true modes.
  • TSDoc @remarks on every Inngest function (idempotency + concurrency + retry).
  • docs/reference/ regenerated cleanly.

Acceptance verified

  • send_message wait=false returns in ≤ 100 ms; Meta call follows within seconds.
  • Mid-send kill + restart → message eventually succeeds or fails cleanly.
  • Inngest dashboard shows green runs and idempotency hits.
  • Triple-replay webhook → exactly one messages row.
  • Webhook POST p99 response time ≤ 50ms for valid signatures.

Phase signoff

  • Phase 3 complete. README.md status table updated to ✅.