Phase 3 — Inngest Integration
Effort: L
Goal
Move every side-effectful operation (Meta API calls, webhook fan-out, media downloads, client notification push, retention crons) behind Inngest functions. Gain retries, concurrency control, durability across restarts, and a single observable history of what the system did.
Deliverables
Inngest SDK + wiring
src/inngest/client.ts— singletonInngest({ id: 'wa-mcp', eventKey, signingKey }).POST /api/inngestmounted on the Express server via the SDK’sserveadapter. No app-layer auth on this route — the SDK verifiesINNGEST_SIGNING_KEYon every request. Nginx (Phase 7) rate-limits the path.- Until Phase 7 brings Nginx, expose
/api/inngestto Inngest Cloud viacloudflaredor the Inngest dev tunnel.
Event schemas (zod)
src/inngest/events.ts— every event has a zod schema; types derive from the schemas.wa/webhook.received—{ phoneNumberId, rawPayload, derivedEventId }wa/message.send.requested—{ clientId, phoneNumberId, to, type, payload, requestId }wa/message.send.completed—{ requestId, wamid, status }wa/media.download.requested—{ clientId, phoneNumberId, mediaId, wamid }wa/media.upload.requested—{ clientId, phoneNumberId, localPath, mime, requestId }mcp/client.notify—{ clientId, phoneNumberId, kind, wamid }wa/status.update.received—{ phoneNumberId, wamid, status, error, raw }cron/audit.archive,cron/messages.retention,cron/media.retention,cron/rate-limit.gc
Functions
src/inngest/functions/process-message.ts:- Triggered by
wa/webhook.received. - Idempotency: SDK-level
idempotencyKey = data.derivedEventIdandINSERT INTO inngest_idempotency ... ON CONFLICT DO NOTHINGbelt-and-braces. - Steps:
normalise→upsert contact→insert message→ if media → emitwa/media.download.requested; if interactive reply → recordpayload.selectedId; → emitmcp/client.notify(Phase 4 fans out to all granted clients; Phase 3 emits one for the owner only).
- Triggered by
src/inngest/functions/send-message.ts:- Triggered by
wa/message.send.requested. - Concurrency key:
phone_number_id(one Meta call per number at a time, capped across clients). - Retries: SDK defaults for 5xx/429 with exponential backoff, max 4 attempts.
- On success: update the queued
messagesrow tostatus='sent'with the returnedwamid; emitwa/message.send.completed. - On terminal failure: update row to
status='failed'with the typed error code. - Daily-cap enforcement lives here (after dequeue) so retries don’t double-count. (Implementation lands in Phase 4 with the rate-limit module; Phase 3 leaves a TODO calling an as-yet-untyped
enforceDailyCaphelper.)
- Triggered by
src/inngest/functions/status-updater.ts:- Triggered by
wa/status.update.received. - Updates
messages.status(delivered / read / failed) anderror_codeif present.
- Triggered by
src/inngest/functions/download-media.ts— stub with the function signature + idempotency wiring. Body comes in Phase 6.src/inngest/functions/upload-media.ts— stub.src/inngest/functions/push-client-notification.ts— stub (in-process session map lives in Phase 5; here this function just logs).src/inngest/functions/index.ts— array of all functions, exported toserve().
Tool refactor
src/tools/send-message.ts:- Validate input.
- Insert
messagesrow withstatus='queued'and a generatedrequest_id. inngest.send('wa/message.send.requested', { ... }).- Return
{ queueId, status: 'queued' }immediately. - Optional
wait=true: usesstep.waitForEvent('wa/message.send.completed', { match: 'data.requestId' })with a 30s timeout — better LLM ergonomics when blocking on the result is helpful. Defaultwait=falseso the model stays snappy.
Webhook refactor
src/webhook/meta.tsPOST handler now:- Verify signature.
- Derive
event_id,INSERT ... ON CONFLICT DO NOTHINGintoinngest_idempotency. - If new:
inngest.send('wa/webhook.received', { ... }). - Return 200 immediately (≤ 50ms typical).
- No DB writes to
messagesorcontactsin the request handler anymore.
Crons
src/inngest/functions/gc-rate-limits.ts— every 10m, deletesrate_limit_bucketsrows whereupdated_at < now() - interval '25 hours'. (Phase 4 brings the bucket writes; the cron is wired now so it’s ready.)- Other cron functions (
audit.archive,messages.retention,media.retention) are stubbed; bodies in Phase 4 / Phase 6.
Docs (extended)
docs/architecture/inngest.md— event catalogue, function responsibilities, concurrency strategy, idempotency contract.docs/components/inngest-runner.md— how the Inngest serve route is mounted, how the SDK verifies signatures, dev-vs-prod tunnel.docs/architecture/webhook.md— updated: handler now enqueues; processing happens inprocess-message.docs/api/mcp-tools.md—send_messagedocumented with both async (default) andwait=truemodes.
Critical files
- src/inngest/client.ts
- src/inngest/events.ts
- src/inngest/functions/{process-message,send-message,status-updater,download-media,upload-media,push-client-notification,gc-rate-limits,index}.ts
- src/transport/http.ts — mounts the Inngest serve route
- src/tools/send-message.ts — refactored to enqueue
- src/webhook/meta.ts — refactored to enqueue
Tests
Unit
tests/unit/inngest/events.test.ts— every event’s zod schema accepts valid samples and rejects malformed.
Integration (testcontainers Postgres + Inngest test helpers)
tests/integration/inngest/process-message.test.ts:- Inbound text → row inserted in
messages, contact upserted,mcp/client.notifyemitted. - Replay same
derivedEventId→ second invocation no-ops; one row, onemcp/client.notifyevent. - Inbound media →
wa/media.download.requestedevent emitted (no actual download yet).
- Inbound text → row inserted in
tests/integration/inngest/send-message.test.ts:mswmocks Meta. Happy path: row goesqueued→sentwithwamid;wa/message.send.completedemitted.- 5xx + retry: two outbound HTTP calls, final
sent. - 131047 from Meta: row →
failed,wa/message.send.completedwithstatus='failed'. - Concurrency key: simulate 5 simultaneous sends to the same
phone_number_id— assert only one outbound HTTP call is in-flight at any moment (use msw delay + counter).
tests/integration/inngest/status-updater.test.ts:- Delivered status update → row’s
status='delivered'. - Read status update →
status='read'.
- Delivered status update → row’s
tests/integration/inngest/idempotency.test.ts:- 10 concurrent invocations of
process-messagewith the samederivedEventId→ exactly one row inserted (race condition oninngest_idempotencyinsert).
- 10 concurrent invocations of
tests/integration/tools/send-message-async.test.ts:wait=falsereturns{ queueId, status: 'queued' }immediately, row exists withstatus='queued', eventually transitions tosentafter function runs.wait=truereturns the final{ wamid, status: 'sent' }after the function completes.wait=truewith simulated 30s+ Meta delay → tool result includes a timeout indication (status: 'timed_out_waiting') but the row eventually settles correctly.
Coverage
- Phase total ≥ 80 %;
src/inngest/functions/process-message.ts≥ 90 % (idempotency-critical).
Code documentation
- TSDoc with
@remarkson every function insrc/inngest/functions/covering idempotency contract, concurrency key, retry policy. events.ts— each schema gets a description doc-comment.- File-level headers on all new files.
docs/architecture/inngest.mdanddocs/components/inngest-runner.mdwritten.docs/architecture/webhook.mdupdated to reflect the new flow.docs/reference/regenerated.
Acceptance
- Calling
send_messagewithwait=falsereturns{ queueId, status: 'queued' }in ≤ 100 ms; the actual Meta call happens in the Inngest function within a few seconds. - Killing the app mid-send and restarting → Inngest retries the function; the message either succeeds or terminally fails with a row update, never silently drops.
- The Inngest dashboard shows green runs for every
process-messageandsend-messageinvocation; idempotency hits are visible. - POSTing the same webhook payload three times in 5s → exactly one
messagesrow, twoinngest_idempotencyrows are not created (only one), and the duplicates are visible as no-op invocations in the Inngest dashboard. pnpm test:cigreen; coverage gates met.- Webhook POST handler response time (measured) ≤ 50ms for valid signatures (excluding the network round-trip).
Definition of Done
Inngest SDK + wiring
-
src/inngest/client.tssingleton. -
POST /api/inngestmounted via SDKserve. -
cloudflared/Inngest tunnel set up for dev reachability until Phase 7. - Signing-key verification confirmed in dashboard.
Event schemas
-
src/inngest/events.tsdefines every event with a zod schema and exported types. -
wa/webhook.received,wa/message.send.requested,wa/message.send.completed,wa/media.download.requested,wa/media.upload.requested,mcp/client.notify,wa/status.update.received, all cron events.
Functions
-
process-message— idempotent (SDK + DB), normalises, upserts, emits notifications. -
send-message— concurrency keyphone_number_id, retries, status transitions. -
status-updater— handles delivered/read/failed. -
download-media,upload-media,push-client-notification— stubs in place with correct signatures. -
gc-rate-limitscron registered (body wired up; data lands in Phase 4). -
archive-audit,prune-messages,prune-mediacron stubs registered.
Tool + webhook refactor
-
send_messageenqueues + returns immediately;wait=truemode usesstep.waitForEvent. - Webhook POST handler does no
messages/contactswrites — only idempotency check + enqueue + 200.
Tests
-
tests/unit/inngest/events.test.tspasses. -
tests/integration/inngest/process-message.test.ts(happy + replay + media trigger) passes. -
tests/integration/inngest/send-message.test.ts(happy + retry + 131047 + concurrency-key) passes. -
tests/integration/inngest/status-updater.test.tspasses. -
tests/integration/inngest/idempotency.test.ts(10 concurrent same-event) passes. -
tests/integration/tools/send-message-async.test.ts(wait=false + wait=true + timeout) passes. - Phase coverage gate ≥ 80%;
process-message≥ 90%.
Documentation
-
docs/architecture/inngest.mdwritten. -
docs/components/inngest-runner.mdwritten. -
docs/architecture/webhook.mdupdated for the new flow. -
docs/api/mcp-tools.mddocuments async +wait=truemodes. - TSDoc
@remarkson every Inngest function (idempotency + concurrency + retry). -
docs/reference/regenerated cleanly.
Acceptance verified
-
send_messagewait=falsereturns in ≤ 100 ms; Meta call follows within seconds. - Mid-send kill + restart → message eventually succeeds or fails cleanly.
- Inngest dashboard shows green runs and idempotency hits.
- Triple-replay webhook → exactly one
messagesrow. - Webhook POST p99 response time ≤ 50ms for valid signatures.
Phase signoff
- Phase 3 complete. README.md status table updated to ✅.