Skip to Content
Deneva MCPPlan4 — Inngest Sync

Phase 4 — Background Sync with Inngest

Detailed execution doc for Phase 4 of the MCP Marketing Tool Architecture Plan. Builds on Phases 1, 2, and 3.

Estimated effort: 3–5 days for one engineer (a week with the four pulled-forward scope items). Phase 5 follow-up: (not yet written — will be docs/phase-5-hardening.md).


Goal

Move from lazy-only caching (Phase 2: fetch on first request, populate cache) to eager + lazy caching (Phase 4: pre-warm cache before TTL expiry so user requests almost always hit). Add the GDPR retention jobs the architecture mandates, eager token refresh, and ops-grade observability around sync health.

If Phase 4 is done correctly, a tool call from a connected tenant returns a cache hit ≥99% of the time during normal operation; expired metric_cache rows never accumulate; audit_log rows older than 12 months are archived; and a dead Inngest connection is detectable within 10 minutes.

Definition of Done (high-level — full checklist in §11)

  • Inngest Cloud successfully invokes our /api/inngest endpoint, signature verification passes, and unsigned requests are rejected.
  • For every (tenant, platform, report-type, date-range) tuple with active credentials, a scheduled function refreshes the cache before its TTL expires.
  • After 3 retries fail, a platform_credentials.last_sync_failure_at flag is set; tools serve the stale row with a stale: true marker; the next scheduled sync clears the flag on success.
  • Tokens expiring within the next hour get refreshed proactively without any user request.
  • metric_cache rows older than 90d are hard-deleted nightly; sync_log rows older than 30d are hard-deleted nightly; audit_log rows older than 12mo are moved to audit_log_archive.
  • GET /admin/metrics/sync returns per-tenant/platform/report sync health.
  • A heartbeat function runs every 5 min; absence of a fresh row is detectable via a single SQL check.

Workstream order & dependency graph

A. Inngest client + signed webhook ──┬──▶ B. Sync fan-out per cadence ──▶ C. Retry exhaustion → unhealthy state ├──▶ D. Eager token refresh ├──▶ E. GDPR purge + audit archive └──▶ G. Heartbeat + dead-canary F. Sync metrics endpoint (reads sync_log — independent of Inngest) H. Tests run alongside everything

Critical path: A → B → C. D, E, G can land in any order once A is in place. F is independent.


Workstream A — Inngest client + signed webhook

A1. Client

File: src/sync/inngest.ts

import { Inngest, EventSchemas } from 'inngest'; import { loadSecret } from '../security/secrets.loader.js'; interface Events { 'sync/refresh-tenant-report': { data: { tenantId: string; platform: 'google' | 'meta' | 'tiktok'; reportType: string; dateRange: 'last_7_days' | 'last_30_days' | 'last_90_days'; accountId: string; }; }; 'sync/refresh-token': { data: { tenantId: string; platform: 'google' | 'meta' | 'tiktok' }; }; } export const inngest = new Inngest({ id: 'deneva-mcp', signingKey: (await loadSecret('INNGEST_SIGNING_KEY')).toString('utf8'), eventKey: (await loadSecret('INNGEST_EVENT_KEY')).toString('utf8'), schemas: new EventSchemas().fromRecord<Events>(), });

Add 'INNGEST_EVENT_KEY' to REQUIRED_SECRETS in src/security/secrets.loader.ts. Inngest Cloud needs both keys: signing for inbound, event for outbound inngest.send(). Also extend scripts/dev-secrets.sh (Phase 1 §C3) so dev startup doesn’t fail-fast:

# append to scripts/dev-secrets.sh [[ -f secrets/INNGEST_EVENT_KEY ]] || gen INNGEST_EVENT_KEY

A2. HTTP handler with signature verification

File: src/sync/handler.ts

import { serve } from 'inngest/fastify'; import type { FastifyInstance } from 'fastify'; import { inngest } from './inngest.js'; import * as functions from './functions.js'; export async function mountInngest(app: FastifyInstance): Promise<void> { await app.register(serve, { client: inngest, functions: Object.values(functions), // signature verification is automatic when signingKey is set on the Inngest client. }); // serve mounts at /api/inngest by default }

The Inngest SDK’s serve() plugin verifies signatures on every request automatically. Verify in test by sending an unsigned POST to /api/inngest and asserting 401.

A3. nginx exposure

The /api/inngest endpoint must be reachable from Inngest Cloud — extend Phase 5’s nginx config (or a Phase 4 stub config in dev):

location /api/inngest { proxy_pass http://127.0.0.1:3001; # No additional auth — signature verification is inside Fastify. # Rate limit: Inngest's retry storms can be aggressive; set a generous limit. limit_req zone=mcp_inngest burst=200 nodelay; }

A4. Acceptance

  • An unsigned POST /api/inngest returns 401.
  • A correctly-signed POST (use inngest dev locally) routes to the right function.
  • Removing INNGEST_SIGNING_KEY at startup makes verifyAllSecretsLoadable fail.

Workstream B — Sync fan-out per report cadence

B1. Cadence config

Per-report cadences track TTLs from Phase 2 §A1. Refresh happens before TTL expires so user requests always hit a fresh-or-being-refreshed row.

File: src/sync/cadence.ts

import type { Platform } from '../cache/ttl-config.js'; export interface CadenceEntry { platform: Platform; reportType: string; cron: string; // standard cron expression dateRanges: ReadonlyArray<'last_7_days' | 'last_30_days' | 'last_90_days'>; } // Refresh slightly more often than TTL — give 10–20% headroom. export const CADENCE: CadenceEntry[] = [ // Hourly TTL → refresh every 50min { platform: 'google', reportType: 'account_health', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days', 'last_90_days'] }, { platform: 'google', reportType: 'budget_optimizer', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days'] }, // 2h TTL → refresh every 100min (1h40) { platform: 'google', reportType: 'search_term_waste', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] }, { platform: 'google', reportType: 'quality_score', cron: '40 */2 * * *', dateRanges: ['last_30_days'] }, { platform: 'google', reportType: 'pmax_breakdown', cron: '40 */2 * * *', dateRanges: ['last_30_days'] }, { platform: 'google', reportType: 'weekly_anomaly', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] }, // 4h TTL → refresh every 3h20 { platform: 'google', reportType: 'auction_insights', cron: '20 */3 * * *', dateRanges: ['last_30_days'] }, // Meta — same TTLs as their Google counterparts { platform: 'meta', reportType: 'account_health', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days', 'last_90_days'] }, { platform: 'meta', reportType: 'budget_optimizer', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days'] }, { platform: 'meta', reportType: 'search_term_waste', cron: '40 */2 * * *', dateRanges: ['last_7_days'] }, { platform: 'meta', reportType: 'auction_insights', cron: '20 */3 * * *', dateRanges: ['last_30_days'] }, { platform: 'meta', reportType: 'weekly_anomaly', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] }, // TikTok { platform: 'tiktok', reportType: 'account_health', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] }, { platform: 'tiktok', reportType: 'budget_optimizer', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] }, { platform: 'tiktok', reportType: 'weekly_anomaly', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] }, ];

The cron expressions are stored alongside the (platform, reportType, dateRange) tuple so a single fan-out function can be generated per cadence row.

B2. Fan-out functions

File: src/sync/functions.ts

import { and, eq, isNull } from 'drizzle-orm'; import { inngest } from './inngest.js'; import { db } from '../db/index.js'; import { platformCredentials } from '../db/schema.js'; import { CADENCE } from './cadence.js'; import { getAdapter } from '../adapters/registry.js'; import { readOrFetch } from '../cache/cache.service.js'; import { writeAuditEvent } from '../security/audit-log.service.js'; import { AdapterError } from '../adapters/errors.js'; import { recordSyncResult } from './sync-log.js'; // Build one scheduled function per cadence row at module load. export const scheduledSyncs = CADENCE.map(entry => inngest.createFunction( { id: `sync-${entry.platform}-${entry.reportType}`, retries: 0 }, // 0 retries for the fan-out itself; the worker retries { cron: entry.cron }, async ({ step }) => { // For each connected tenant on this platform with a chosen account, enqueue a worker event. const tenants = await step.run('list-tenants', async () => db.select({ tenantId: platformCredentials.tenantId, accountId: platformCredentials.accountId }) .from(platformCredentials) .where(eq(platformCredentials.platform, entry.platform)), ); const events = tenants.flatMap(t => t.accountId ? entry.dateRanges.map(dr => ({ name: 'sync/refresh-tenant-report' as const, data: { tenantId: t.tenantId, platform: entry.platform, accountId: t.accountId, reportType: entry.reportType, dateRange: dr }, })) : [], ); if (events.length > 0) await step.sendEvent(`fanout-${entry.platform}-${entry.reportType}`, events); }, ), ); // The worker — retried up to 3 times by Inngest with exponential backoff. export const refreshTenantReport = inngest.createFunction( { id: 'refresh-tenant-report', retries: 3 }, { event: 'sync/refresh-tenant-report' }, async ({ event, step, attempt }) => { const { tenantId, platform, reportType, dateRange, accountId } = event.data; const start = Date.now(); try { const adapter = getAdapter(platform); // force: true bypasses the cache-hit shortcut. The cron fires *before* TTL expiry // by design, so without force, the worker would hit the still-fresh row and skip // the platform fetch — defeating the whole point of eager sync. await step.run('fetch-and-cache', async () => { const fetcher = () => callAdapterFor(adapter, reportType, tenantId, dateRange); return readOrFetch( { tenantId, platform, accountId, reportType: reportType as never, dateRangeKey: dateRange }, fetcher, { force: true }, ); }); await recordSyncResult({ tenantId, platform, reportType, dateRange, status: 'success', durationMs: Date.now() - start }); await markHealthy(tenantId, platform); // §C — clears any prior unhealthy flag } catch (err) { const isLast = attempt >= 3; await recordSyncResult({ tenantId, platform, reportType, dateRange, status: isLast ? 'failure_final' : 'failure_retry', durationMs: Date.now() - start, errorCode: err instanceof AdapterError ? err.code : 'internal', errorMessage: err instanceof Error ? err.message : String(err), }); if (isLast) await markUnhealthy(tenantId, platform, err); // §C throw err; // let Inngest schedule the next retry / mark final failure } }, ); function callAdapterFor(adapter: ReturnType<typeof getAdapter>, reportType: string, tenantId: string, dateRange: 'last_7_days' | 'last_30_days' | 'last_90_days') { switch (reportType) { case 'account_health': return adapter.fetchAccountHealth(tenantId, dateRange); case 'search_term_waste': return adapter.fetchSearchTerms(tenantId, dateRange); case 'auction_insights': return adapter.fetchAuctionInsights(tenantId, dateRange); case 'quality_score': return adapter.fetchKeywordQualityScores(tenantId, dateRange); case 'pmax_breakdown': return adapter.fetchAssetGroups(tenantId, dateRange); case 'budget_optimizer': return adapter.fetchCampaigns(tenantId, dateRange); // proxy case 'weekly_anomaly': return adapter.fetchAccountHealth(tenantId, dateRange); // proxy default: throw new Error(`unknown_report_type: ${reportType}`); } }

Phase 2’s readOrFetch already handles thundering-herd via advisory locks. Two crons firing close together can’t double-fetch the same key — the second waits, sees the freshly-written row, and returns.

Concurrency / pool sizing. The fan-out enqueues one worker event per (tenant × dateRange) for each of the 14 cadence rows. Inngest serialises by function ID (refresh-tenant-report) with default concurrency, so workers don’t actually run all at once — but each running worker holds one Postgres connection (cache read + cache write + RLS set_config). Phase 1 §B2 pins max: 20 on the Pool, which covers Inngest’s default concurrency (10) plus headroom for live /mcp traffic. If you raise Inngest’s per-function concurrency beyond ~12, raise max in lockstep — saturated pools manifest as request timeouts, not loud errors.

B3. sync_log helper

File: src/sync/sync-log.ts

import { db } from '../db/index.js'; import { syncLog } from '../db/schema.js'; interface SyncResult { tenantId: string; platform: 'google' | 'meta' | 'tiktok'; reportType: string; dateRange: string; status: 'success' | 'failure_retry' | 'failure_final'; durationMs: number; errorCode?: string; errorMessage?: string; } export async function recordSyncResult(r: SyncResult): Promise<void> { await db.insert(syncLog).values({ tenantId: r.tenantId, platform: r.platform, status: r.status, durationMs: r.durationMs, errorMessage: r.errorMessage ?? null, // Add reportType / dateRange / errorCode columns via Phase 4 migration: reportType: r.reportType, dateRange: r.dateRange, errorCode: r.errorCode ?? null, } as never); }

Add to syncLog schema (Phase 4 migration):

reportType: text('report_type'), dateRange: text('date_range'), errorCode: text('error_code'),

B4. Acceptance

  • A test using Inngest’s local dev server triggers a cron event for sync-google-account_health and verifies a worker event is enqueued for each connected tenant with a selected account.
  • A worker invocation hits the (mocked) adapter, writes a metric_cache row, and a sync_log row with status: 'success'.
  • A worker failing once writes a failure_retry row; the third failure writes a failure_final row.

Workstream C — Retry exhaustion → unhealthy state

C1. Schema

Add to platformCredentials (Phase 4 migration):

lastSyncSuccessAt: timestamp('last_sync_success_at'), lastSyncFailureAt: timestamp('last_sync_failure_at'), lastSyncErrorCode: text('last_sync_error_code'),

Add to metricCache:

isStale: boolean('is_stale').default(false).notNull(),

C2. Mark-unhealthy / mark-healthy

File: src/sync/health.ts

import { and, eq } from 'drizzle-orm'; import { db } from '../db/index.js'; import { platformCredentials, metricCache } from '../db/schema.js'; import { AdapterError } from '../adapters/errors.js'; import { writeAuditEvent } from '../security/audit-log.service.js'; export async function markUnhealthy(tenantId: string, platform: string, err: unknown): Promise<void> { const code = err instanceof AdapterError ? err.code : 'internal'; await db.update(platformCredentials) .set({ lastSyncFailureAt: new Date(), lastSyncErrorCode: code }) .where(and(eq(platformCredentials.tenantId, tenantId), eq(platformCredentials.platform, platform))); // Mark all currently-cached rows for this tenant+platform as stale. await db.update(metricCache).set({ isStale: true }) .where(and(eq(metricCache.tenantId, tenantId), eq(metricCache.platform, platform))); // Background sync exhaustion — distinct from the request-path `mcp.tool_failed` // emitted by the tool registry in Phase 1 §G1. Same code shape, different audience: // dashboards/alerts can target `sync.exhausted` independently of tool-call failures. await writeAuditEvent('sync.exhausted', 'failure', { tenantId, platform, code }); } export async function markHealthy(tenantId: string, platform: string): Promise<void> { await db.update(platformCredentials) .set({ lastSyncSuccessAt: new Date(), lastSyncFailureAt: null, lastSyncErrorCode: null }) .where(and(eq(platformCredentials.tenantId, tenantId), eq(platformCredentials.platform, platform))); await db.update(metricCache).set({ isStale: false }) .where(and(eq(metricCache.tenantId, tenantId), eq(metricCache.platform, platform))); }

The successful branch of refreshTenantReport calls markHealthy; the final-failure branch calls markUnhealthy. The markHealthy call is wired into the worker — see the §B2 patch for refreshTenantReport.

C3. Tool-side exposure

Add a small helper alongside markHealthy / markUnhealthy:

// src/sync/health.ts (extend) export async function platformHealth(tenantId: string, platform: string): Promise<{ stale: boolean; lastFailureCode: string | null }> { const [row] = await db.select({ lastFailureAt: platformCredentials.lastSyncFailureAt, lastFailureCode: platformCredentials.lastSyncErrorCode, }).from(platformCredentials).where(and( eq(platformCredentials.tenantId, tenantId), eq(platformCredentials.platform, platform), )); if (!row) return { stale: false, lastFailureCode: null }; return { stale: row.lastFailureAt !== null, lastFailureCode: row.lastFailureCode ?? null }; }

makeTool (Phase 2 §E1) annotates responses:

// after readOrFetch: const health = await platformHealth(ctx.tenantId, input.platform); return { data, cache, ...(health.stale ? { stale: true, lastFailureCode: health.lastFailureCode } : {}), };

Tools never block on staleness — clients see the data and the warning together.

C4. Acceptance

  • A worker that exhausts 3 retries → lastSyncFailureAt set, all metric_cache rows for that tenant+platform marked isStale: true.
  • A subsequent successful sync → flags cleared, rows un-staled.
  • A tool call during the unhealthy window returns { data, cache: 'hit', stale: true, lastFailureCode: 'token_revoked' }.

Workstream D — Eager token refresh

D1. Function

File: src/sync/functions.ts (extend)

export const eagerTokenRefresh = inngest.createFunction( { id: 'eager-token-refresh', retries: 0 }, { cron: '*/15 * * * *' }, // every 15 min async ({ step }) => { const soon = new Date(Date.now() + 60 * 60 * 1000); // expiring within 1h const rows = await step.run('list-expiring', () => db.select({ tenantId: platformCredentials.tenantId, platform: platformCredentials.platform }) .from(platformCredentials) .where(lt(platformCredentials.tokenExpiresAt, soon)), ); if (rows.length > 0) await step.sendEvent('eager-refresh-fanout', rows.map(r => ({ name: 'sync/refresh-token' as const, data: { tenantId: r.tenantId, platform: r.platform }, }))); }, ); export const refreshOneToken = inngest.createFunction( { id: 'refresh-one-token', retries: 2 }, { event: 'sync/refresh-token' }, async ({ event }) => { const { tenantId, platform } = event.data; const adapter = getAdapter(platform); await adapter.ensureValidToken(tenantId); // Phase 2 / 3 logic: refreshes if needed }, );

The Phase 2 lazy-refresh path stays — eager is belt-and-braces. If both fire near-simultaneously, the row-level lock on UPDATE platform_credentials serialises them and the second sees the freshly-rotated token.

D2. Acceptance

  • Set a tenant’s tokenExpiresAt to now + 30min. Trigger eager-token-refresh manually. Assert refreshOneToken fires for that tenant and tokenExpiresAt advances.
  • A row with tokenExpiresAt 24h out is NOT in the eager-refresh fan-out.

Workstream E — GDPR purge + audit archive

E1. Cache purge

File: src/sync/functions.ts (extend)

export const purgeExpiredCache = inngest.createFunction( { id: 'gdpr-purge-cache', retries: 1 }, { cron: '0 2 * * *' }, // 02:00 daily async ({ step }) => { const ninetyDaysAgo = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000); const deleted = await step.run('delete-expired-cache', async () => { const r = await db.delete(metricCache) .where(or(lt(metricCache.expiresAt, new Date()), lt(metricCache.fetchedAt, ninetyDaysAgo))) .returning({ id: metricCache.id }); return r.length; }); await writeAuditEvent('gdpr.purge_cache', 'success', { deleted }); }, );

Add 'gdpr.purge_cache', 'gdpr.purge_sync_log', 'gdpr.archive_audit', and 'sync.exhausted' (the §C2 background-path event) to the AuditEventType union in audit-log.service.ts (Phase 1 file).

E2. sync_log purge

export const purgeSyncLog = inngest.createFunction( { id: 'gdpr-purge-sync-log', retries: 1 }, { cron: '15 2 * * *' }, async ({ step }) => { const cutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const deleted = await step.run('delete-old-sync', async () => { const r = await db.delete(syncLog).where(lt(syncLog.createdAt, cutoff)).returning({ id: syncLog.id }); return r.length; }); await writeAuditEvent('gdpr.purge_sync_log', 'success', { deleted }); }, );

E3. Audit archive

Add a new table audit_log_archive in a Phase 4 migration with the same schema as audit_log. The archive table has no insert restrictions for mcp_app — it’s append-once write from the archive job, and read access is restricted via role.

export const auditLogArchive = pgTable('audit_log_archive', { /* same columns as audit_log */ });
export const archiveAuditLog = inngest.createFunction( { id: 'gdpr-archive-audit', retries: 1 }, { cron: '30 2 * * *' }, async ({ step }) => { const cutoff = new Date(Date.now() - 365 * 24 * 60 * 60 * 1000); const moved = await step.run('move-rows', async () => { // Single SQL statement — atomic. CTE moves and deletes in one shot. const r = await db.execute(sql` WITH moved AS ( DELETE FROM audit_log WHERE created_at < ${cutoff} RETURNING * ) INSERT INTO audit_log_archive SELECT * FROM moved RETURNING id `); return r.rowCount ?? 0; }); await writeAuditEvent('gdpr.archive_audit', 'success', { moved }); }, );

The CTE-based atomic move is critical: a non-atomic copy-then-delete loses rows on failure. Verify this works with the mcp_app role’s INSERT-only policy on audit_log — the archive write needs INSERT on audit_log_archive, not audit_log, so the existing REVOKE is fine.

E4. Grants for archive

GRANT SELECT, INSERT ON audit_log_archive TO mcp_app; REVOKE UPDATE, DELETE ON audit_log_archive FROM mcp_app; GRANT DELETE ON audit_log TO mcp_app; -- needed for the archive CTE; tighten in Phase 5 to "via this stored proc only"

E5. Acceptance

  • Insert a metric_cache row with fetchedAt 91d ago → purgeExpiredCache deletes it.
  • Insert a sync_log row with createdAt 31d ago → purgeSyncLog deletes it.
  • Insert an audit_log row with createdAt 13mo ago → archiveAuditLog moves it to audit_log_archive. Total row count across both tables is preserved.

Workstream F — Sync metrics endpoint

F1. Service

File: src/sync/metrics.ts

import { and, eq, gte, sql } from 'drizzle-orm'; import { db } from '../db/index.js'; import { syncLog, platformCredentials } from '../db/schema.js'; export async function syncMetrics(opts: { sinceHours?: number } = {}) { const since = new Date(Date.now() - (opts.sinceHours ?? 24) * 3600_000); const rows = await db.select({ tenantId: syncLog.tenantId, platform: syncLog.platform, reportType: syncLog.reportType, successes: sql<number>`count(*) filter (where ${syncLog.status} = 'success')`, retries: sql<number>`count(*) filter (where ${syncLog.status} = 'failure_retry')`, finals: sql<number>`count(*) filter (where ${syncLog.status} = 'failure_final')`, avgDurationMs: sql<number>`coalesce(avg(${syncLog.durationMs}) filter (where ${syncLog.status} = 'success'), 0)::int`, }).from(syncLog) .where(gte(syncLog.createdAt, since)) .groupBy(syncLog.tenantId, syncLog.platform, syncLog.reportType); // Join in current health state. const credentials = await db.select({ tenantId: platformCredentials.tenantId, platform: platformCredentials.platform, lastSuccess: platformCredentials.lastSyncSuccessAt, lastFailure: platformCredentials.lastSyncFailureAt, failureCode: platformCredentials.lastSyncErrorCode, }).from(platformCredentials); return { window: { sinceHours: opts.sinceHours ?? 24, since }, syncs: rows, credentials }; }

F2. Route

Add to src/auth/admin-routes.ts:

fastify.get('/admin/metrics/sync', async (req, reply) => { if (req.headers['x-admin-token'] !== adminToken) return reply.code(401).send(); const Q = z.object({ sinceHours: z.coerce.number().int().min(1).max(168).optional() }).parse(req.query); return syncMetrics(Q); });

F3. Acceptance

  • After seeding 3 successful + 1 failed sync, the endpoint returns the right counts and a non-zero avgDurationMs.
  • Unauthenticated request → 401.
  • The credentials array reflects the unhealthy state set by §C.

Workstream G — Heartbeat + dead-canary

G1. Heartbeats table

Phase 4 migration:

export const heartbeats = pgTable('heartbeats', { source: text('source').primaryKey(), // 'inngest' for now lastBeatAt: timestamp('last_beat_at').notNull(), });

G2. Heartbeat function

export const heartbeatInngest = inngest.createFunction( { id: 'heartbeat-inngest', retries: 0 }, { cron: '*/5 * * * *' }, async () => { await db.insert(heartbeats).values({ source: 'inngest', lastBeatAt: new Date() }) .onConflictDoUpdate({ target: heartbeats.source, set: { lastBeatAt: new Date() } }); }, );

G3. Canary check

A single SQL query exposes liveness. GET /admin/health/inngest:

fastify.get('/admin/health/inngest', async (req, reply) => { if (req.headers['x-admin-token'] !== adminToken) return reply.code(401).send(); const [row] = await db.select().from(heartbeats).where(eq(heartbeats.source, 'inngest')); const ageMs = row ? Date.now() - row.lastBeatAt.getTime() : Infinity; const alive = ageMs < 10 * 60_000; // 10-min threshold reply.code(alive ? 200 : 503); return { alive, lastBeatAt: row?.lastBeatAt ?? null, ageMs }; });

Phase 5 will wire this to nginx/uptime monitoring (the architecture doc mentions external monitoring as part of SOC 2). For Phase 4 the endpoint is enough.

G4. Acceptance

  • After server start, within 6 minutes the heartbeats row exists and /admin/health/inngest returns { alive: true, ageMs: < 360000 }.
  • Manually setting lastBeatAt to 11 minutes ago makes the endpoint return 503.

Workstream H — Tests

H1. Test files

FileCovers
tests/integration/inngest-handler.test.tsA — signature verification, route mounting
tests/integration/sync-fanout.test.tsB — fan-out events, worker happy path
tests/integration/sync-retry-exhausted.test.tsC — markUnhealthy / markHealthy
tests/integration/eager-token-refresh.test.tsD
tests/integration/gdpr-jobs.test.tsE — all three retention jobs
tests/integration/sync-metrics.test.tsF
tests/integration/heartbeat.test.tsG

H2. Inngest test harness

Use Inngest’s local dev server (npx inngest-cli dev) for integration tests. A small helper:

// tests/_helpers/inngest.ts export async function triggerCron(functionId: string) { // POST to the local inngest dev server's invoke endpoint await fetch(`http://localhost:8288/v0/invoke/${functionId}`, { method: 'POST' }); }

CI workflow gains a service container for inngest/inngest-dev (see Inngest’s docs).

H3. Adapter mocking

Same pattern as Phase 2/3 — mock at the fetch boundary; everything below runs against real Postgres + real envelope encryption. The Inngest worker code path is exercised end-to-end.


§11 — Definition of Done (full checklist)

A. Inngest client + signed webhook

  • INNGEST_SIGNING_KEY and INNGEST_EVENT_KEY in REQUIRED_SECRETS.
  • /api/inngest mounted; unsigned POSTs return 401.
  • nginx/dev config exposes /api/inngest to Inngest Cloud.

B. Sync fan-out

  • One scheduled function per cadence row, all firing on the configured cron.
  • Worker refreshTenantReport retried 3 times, writes sync_log rows for each attempt.
  • Cache rows are populated/updated by the worker; thundering-herd impossible (Phase 2 advisory locks still in play).

C. Retry exhaustion → unhealthy state

  • platform_credentials.lastSyncFailureAt / lastSyncErrorCode columns added.
  • metric_cache.isStale column added; flipped on/off by markUnhealthy/markHealthy.
  • Tools annotate responses with stale: true + lastFailureCode when applicable.

D. Eager token refresh

  • Every 15 minutes, tokens expiring within 1h are refreshed proactively.
  • Lazy refresh from Phase 2 still works (defence in depth).

E. GDPR purge + audit archive

  • metric_cache rows expired or older than 90d hard-deleted nightly.
  • sync_log older than 30d hard-deleted nightly.
  • audit_log older than 12mo moved atomically to audit_log_archive; total row count preserved.
  • All three jobs write gdpr.* audit events.

F. Sync metrics endpoint

  • GET /admin/metrics/sync returns counts + avg duration per (tenant, platform, report).
  • ?sinceHours= query param respected (default 24h).
  • Auth-protected.

G. Heartbeat + dead-canary

  • heartbeats table populated every 5 min by the Inngest function.
  • GET /admin/health/inngest returns 200 fresh, 503 stale.

H. Tests

  • All seven new test files pass with the Inngest dev server in CI.

§12 — Out of scope (deferred)

ItemPhase
External alerting on /admin/health/inngest (PagerDuty / OpsGenie)5
Real Prometheus / OTel exporter (replaces Phase 2 in-memory + Phase 4 SQL counters)5
Horizontal scaling of the Fastify cluster (Inngest concurrency tuning)5
Penetration test of /api/inngest (verify replay protection, signature spoofing)5
Multi-region failover for the sync layer5
Stored-proc gate around audit_log DELETE (currently a broad GRANT for the archive CTE)5

§13 — Manual smoke test

# 0. Phase 3 stack running. Inngest signing key + event key seeded into secrets/. # 1. Apply Phase 4 migrations npm run db:migrate psql "..." -c "GRANT DELETE ON audit_log TO mcp_app;" psql "..." -c "GRANT SELECT, INSERT ON audit_log_archive TO mcp_app;" # 2. Boot Inngest dev server npx inngest-cli@latest dev -u http://127.0.0.1:3001/api/inngest & # 3. Confirm Inngest discovered the functions curl http://localhost:8288/v0/functions | jq '.[].slug' | sort # → expect: sync-google-account_health, sync-meta-account_health, ..., refresh-tenant-report, # refresh-one-token, eager-token-refresh, gdpr-purge-cache, gdpr-purge-sync-log, # gdpr-archive-audit, heartbeat-inngest # 4. Manually trigger account_health sync for Google curl -X POST http://localhost:8288/v0/invoke/sync-google-account_health sleep 5 psql "..." -c "SELECT count(*), max(created_at) FROM sync_log WHERE platform='google' AND report_type='account_health' AND created_at > now() - interval '1 minute';" # → count > 0 (one row per connected tenant × dateRange combo) # 5. Force a tenant unhealthy: temporarily revoke their Meta token in Meta UI, then trigger Meta sync. curl -X POST http://localhost:8288/v0/invoke/sync-meta-account_health sleep 30 # wait for retries to exhaust psql "..." -c "SELECT tenant_id, last_sync_error_code FROM platform_credentials WHERE platform='meta';" # → last_sync_error_code = 'token_revoked' # 6. Tool call should now return stale data curl -X POST http://127.0.0.1:3001/mcp -H "X-Api-Key: $KEY" -H "Content-Type: application/json" \ -d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_account_health","arguments":{"platform":"meta","dateRange":"last_7_days"}}}' # → response includes "stale":true, "lastFailureCode":"token_revoked" # 7. Check sync metrics curl -H "x-admin-token: $ADMIN_TOKEN" 'http://127.0.0.1:3001/admin/metrics/sync?sinceHours=1' | jq # → grouped counts, avg duration, current credential health # 8. Heartbeat liveness curl -i -H "x-admin-token: $ADMIN_TOKEN" http://127.0.0.1:3001/admin/health/inngest # → HTTP 200 within 6 minutes of starting; ageMs < 360000 # 9. GDPR purge — insert an old row and trigger psql "..." -c "UPDATE metric_cache SET expires_at = now() - interval '1 day' WHERE id = (SELECT id FROM metric_cache LIMIT 1);" curl -X POST http://localhost:8288/v0/invoke/gdpr-purge-cache sleep 2 psql "..." -c "SELECT count(*) FROM metric_cache WHERE expires_at < now();" # → 0 # 10. Archive a synthetic-old audit row psql "..." -c "INSERT INTO audit_log (event_type, outcome, created_at) VALUES ('test', 'success', now() - interval '13 months');" curl -X POST http://localhost:8288/v0/invoke/gdpr-archive-audit sleep 2 psql "..." -c "SELECT count(*) FROM audit_log_archive WHERE event_type='test';" # → 1 # 11. Restore Meta connection: complete OAuth again, run sync, expect markHealthy curl -X POST http://localhost:8288/v0/invoke/sync-meta-account_health sleep 5 psql "..." -c "SELECT last_sync_failure_at, last_sync_error_code FROM platform_credentials WHERE platform='meta';" # → both NULL # 12. Tool call returns fresh, no stale flag curl -X POST http://127.0.0.1:3001/mcp -H "X-Api-Key: $KEY" -H "Content-Type: application/json" \ -d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_account_health","arguments":{"platform":"meta","dateRange":"last_7_days"}}}' # → no "stale" field in response

If every step produces the expected outcome, Phase 4 is shipped. Move on to Phase 5 (production hardening + compliance).