Phase 4 — Background Sync with Inngest
Detailed execution doc for Phase 4 of the MCP Marketing Tool Architecture Plan. Builds on Phases 1, 2, and 3.
Estimated effort: 3–5 days for one engineer (a week with the four pulled-forward scope items).
Phase 5 follow-up: (not yet written — will be docs/phase-5-hardening.md).
Goal
Move from lazy-only caching (Phase 2: fetch on first request, populate cache) to eager + lazy caching (Phase 4: pre-warm cache before TTL expiry so user requests almost always hit). Add the GDPR retention jobs the architecture mandates, eager token refresh, and ops-grade observability around sync health.
If Phase 4 is done correctly, a tool call from a connected tenant returns a cache hit ≥99% of the time during normal operation; expired metric_cache rows never accumulate; audit_log rows older than 12 months are archived; and a dead Inngest connection is detectable within 10 minutes.
Definition of Done (high-level — full checklist in §11)
- Inngest Cloud successfully invokes our
/api/inngestendpoint, signature verification passes, and unsigned requests are rejected. - For every (tenant, platform, report-type, date-range) tuple with active credentials, a scheduled function refreshes the cache before its TTL expires.
- After 3 retries fail, a
platform_credentials.last_sync_failure_atflag is set; tools serve the stale row with astale: truemarker; the next scheduled sync clears the flag on success. - Tokens expiring within the next hour get refreshed proactively without any user request.
-
metric_cacherows older than 90d are hard-deleted nightly;sync_logrows older than 30d are hard-deleted nightly;audit_logrows older than 12mo are moved toaudit_log_archive. -
GET /admin/metrics/syncreturns per-tenant/platform/report sync health. - A heartbeat function runs every 5 min; absence of a fresh row is detectable via a single SQL check.
Workstream order & dependency graph
A. Inngest client + signed webhook ──┬──▶ B. Sync fan-out per cadence ──▶ C. Retry exhaustion → unhealthy state
│
├──▶ D. Eager token refresh
├──▶ E. GDPR purge + audit archive
└──▶ G. Heartbeat + dead-canary
F. Sync metrics endpoint (reads sync_log — independent of Inngest)
H. Tests run alongside everythingCritical path: A → B → C. D, E, G can land in any order once A is in place. F is independent.
Workstream A — Inngest client + signed webhook
A1. Client
File: src/sync/inngest.ts
import { Inngest, EventSchemas } from 'inngest';
import { loadSecret } from '../security/secrets.loader.js';
interface Events {
'sync/refresh-tenant-report': {
data: {
tenantId: string;
platform: 'google' | 'meta' | 'tiktok';
reportType: string;
dateRange: 'last_7_days' | 'last_30_days' | 'last_90_days';
accountId: string;
};
};
'sync/refresh-token': {
data: { tenantId: string; platform: 'google' | 'meta' | 'tiktok' };
};
}
export const inngest = new Inngest({
id: 'deneva-mcp',
signingKey: (await loadSecret('INNGEST_SIGNING_KEY')).toString('utf8'),
eventKey: (await loadSecret('INNGEST_EVENT_KEY')).toString('utf8'),
schemas: new EventSchemas().fromRecord<Events>(),
});Add
'INNGEST_EVENT_KEY'toREQUIRED_SECRETSinsrc/security/secrets.loader.ts. Inngest Cloud needs both keys: signing for inbound, event for outboundinngest.send(). Also extendscripts/dev-secrets.sh(Phase 1 §C3) so dev startup doesn’t fail-fast:# append to scripts/dev-secrets.sh [[ -f secrets/INNGEST_EVENT_KEY ]] || gen INNGEST_EVENT_KEY
A2. HTTP handler with signature verification
File: src/sync/handler.ts
import { serve } from 'inngest/fastify';
import type { FastifyInstance } from 'fastify';
import { inngest } from './inngest.js';
import * as functions from './functions.js';
export async function mountInngest(app: FastifyInstance): Promise<void> {
await app.register(serve, {
client: inngest,
functions: Object.values(functions),
// signature verification is automatic when signingKey is set on the Inngest client.
});
// serve mounts at /api/inngest by default
}The Inngest SDK’s serve() plugin verifies signatures on every request automatically. Verify in test by sending an unsigned POST to /api/inngest and asserting 401.
A3. nginx exposure
The /api/inngest endpoint must be reachable from Inngest Cloud — extend Phase 5’s nginx config (or a Phase 4 stub config in dev):
location /api/inngest {
proxy_pass http://127.0.0.1:3001;
# No additional auth — signature verification is inside Fastify.
# Rate limit: Inngest's retry storms can be aggressive; set a generous limit.
limit_req zone=mcp_inngest burst=200 nodelay;
}A4. Acceptance
- An unsigned
POST /api/inngestreturns 401. - A correctly-signed POST (use
inngest devlocally) routes to the right function. - Removing
INNGEST_SIGNING_KEYat startup makesverifyAllSecretsLoadablefail.
Workstream B — Sync fan-out per report cadence
B1. Cadence config
Per-report cadences track TTLs from Phase 2 §A1. Refresh happens before TTL expires so user requests always hit a fresh-or-being-refreshed row.
File: src/sync/cadence.ts
import type { Platform } from '../cache/ttl-config.js';
export interface CadenceEntry {
platform: Platform;
reportType: string;
cron: string; // standard cron expression
dateRanges: ReadonlyArray<'last_7_days' | 'last_30_days' | 'last_90_days'>;
}
// Refresh slightly more often than TTL — give 10–20% headroom.
export const CADENCE: CadenceEntry[] = [
// Hourly TTL → refresh every 50min
{ platform: 'google', reportType: 'account_health', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days', 'last_90_days'] },
{ platform: 'google', reportType: 'budget_optimizer', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days'] },
// 2h TTL → refresh every 100min (1h40)
{ platform: 'google', reportType: 'search_term_waste', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] },
{ platform: 'google', reportType: 'quality_score', cron: '40 */2 * * *', dateRanges: ['last_30_days'] },
{ platform: 'google', reportType: 'pmax_breakdown', cron: '40 */2 * * *', dateRanges: ['last_30_days'] },
{ platform: 'google', reportType: 'weekly_anomaly', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] },
// 4h TTL → refresh every 3h20
{ platform: 'google', reportType: 'auction_insights', cron: '20 */3 * * *', dateRanges: ['last_30_days'] },
// Meta — same TTLs as their Google counterparts
{ platform: 'meta', reportType: 'account_health', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days', 'last_90_days'] },
{ platform: 'meta', reportType: 'budget_optimizer', cron: '*/50 * * * *', dateRanges: ['last_7_days', 'last_30_days'] },
{ platform: 'meta', reportType: 'search_term_waste', cron: '40 */2 * * *', dateRanges: ['last_7_days'] },
{ platform: 'meta', reportType: 'auction_insights', cron: '20 */3 * * *', dateRanges: ['last_30_days'] },
{ platform: 'meta', reportType: 'weekly_anomaly', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] },
// TikTok
{ platform: 'tiktok', reportType: 'account_health', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] },
{ platform: 'tiktok', reportType: 'budget_optimizer', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] },
{ platform: 'tiktok', reportType: 'weekly_anomaly', cron: '40 */2 * * *', dateRanges: ['last_7_days', 'last_30_days'] },
];The cron expressions are stored alongside the (platform, reportType, dateRange) tuple so a single fan-out function can be generated per cadence row.
B2. Fan-out functions
File: src/sync/functions.ts
import { and, eq, isNull } from 'drizzle-orm';
import { inngest } from './inngest.js';
import { db } from '../db/index.js';
import { platformCredentials } from '../db/schema.js';
import { CADENCE } from './cadence.js';
import { getAdapter } from '../adapters/registry.js';
import { readOrFetch } from '../cache/cache.service.js';
import { writeAuditEvent } from '../security/audit-log.service.js';
import { AdapterError } from '../adapters/errors.js';
import { recordSyncResult } from './sync-log.js';
// Build one scheduled function per cadence row at module load.
export const scheduledSyncs = CADENCE.map(entry =>
inngest.createFunction(
{ id: `sync-${entry.platform}-${entry.reportType}`, retries: 0 }, // 0 retries for the fan-out itself; the worker retries
{ cron: entry.cron },
async ({ step }) => {
// For each connected tenant on this platform with a chosen account, enqueue a worker event.
const tenants = await step.run('list-tenants', async () =>
db.select({ tenantId: platformCredentials.tenantId, accountId: platformCredentials.accountId })
.from(platformCredentials)
.where(eq(platformCredentials.platform, entry.platform)),
);
const events = tenants.flatMap(t =>
t.accountId ? entry.dateRanges.map(dr => ({
name: 'sync/refresh-tenant-report' as const,
data: { tenantId: t.tenantId, platform: entry.platform, accountId: t.accountId,
reportType: entry.reportType, dateRange: dr },
})) : [],
);
if (events.length > 0) await step.sendEvent(`fanout-${entry.platform}-${entry.reportType}`, events);
},
),
);
// The worker — retried up to 3 times by Inngest with exponential backoff.
export const refreshTenantReport = inngest.createFunction(
{ id: 'refresh-tenant-report', retries: 3 },
{ event: 'sync/refresh-tenant-report' },
async ({ event, step, attempt }) => {
const { tenantId, platform, reportType, dateRange, accountId } = event.data;
const start = Date.now();
try {
const adapter = getAdapter(platform);
// force: true bypasses the cache-hit shortcut. The cron fires *before* TTL expiry
// by design, so without force, the worker would hit the still-fresh row and skip
// the platform fetch — defeating the whole point of eager sync.
await step.run('fetch-and-cache', async () => {
const fetcher = () => callAdapterFor(adapter, reportType, tenantId, dateRange);
return readOrFetch(
{ tenantId, platform, accountId, reportType: reportType as never, dateRangeKey: dateRange },
fetcher,
{ force: true },
);
});
await recordSyncResult({ tenantId, platform, reportType, dateRange, status: 'success', durationMs: Date.now() - start });
await markHealthy(tenantId, platform); // §C — clears any prior unhealthy flag
} catch (err) {
const isLast = attempt >= 3;
await recordSyncResult({
tenantId, platform, reportType, dateRange,
status: isLast ? 'failure_final' : 'failure_retry',
durationMs: Date.now() - start,
errorCode: err instanceof AdapterError ? err.code : 'internal',
errorMessage: err instanceof Error ? err.message : String(err),
});
if (isLast) await markUnhealthy(tenantId, platform, err); // §C
throw err; // let Inngest schedule the next retry / mark final failure
}
},
);
function callAdapterFor(adapter: ReturnType<typeof getAdapter>, reportType: string, tenantId: string, dateRange: 'last_7_days' | 'last_30_days' | 'last_90_days') {
switch (reportType) {
case 'account_health': return adapter.fetchAccountHealth(tenantId, dateRange);
case 'search_term_waste': return adapter.fetchSearchTerms(tenantId, dateRange);
case 'auction_insights': return adapter.fetchAuctionInsights(tenantId, dateRange);
case 'quality_score': return adapter.fetchKeywordQualityScores(tenantId, dateRange);
case 'pmax_breakdown': return adapter.fetchAssetGroups(tenantId, dateRange);
case 'budget_optimizer': return adapter.fetchCampaigns(tenantId, dateRange); // proxy
case 'weekly_anomaly': return adapter.fetchAccountHealth(tenantId, dateRange); // proxy
default: throw new Error(`unknown_report_type: ${reportType}`);
}
}Phase 2’s
readOrFetchalready handles thundering-herd via advisory locks. Two crons firing close together can’t double-fetch the same key — the second waits, sees the freshly-written row, and returns.
Concurrency / pool sizing. The fan-out enqueues one worker event per (tenant × dateRange) for each of the 14 cadence rows. Inngest serialises by function ID (
refresh-tenant-report) with default concurrency, so workers don’t actually run all at once — but each running worker holds one Postgres connection (cache read + cache write + RLSset_config). Phase 1 §B2 pinsmax: 20on the Pool, which covers Inngest’s default concurrency (10) plus headroom for live/mcptraffic. If you raise Inngest’s per-function concurrency beyond ~12, raisemaxin lockstep — saturated pools manifest as request timeouts, not loud errors.
B3. sync_log helper
File: src/sync/sync-log.ts
import { db } from '../db/index.js';
import { syncLog } from '../db/schema.js';
interface SyncResult {
tenantId: string;
platform: 'google' | 'meta' | 'tiktok';
reportType: string;
dateRange: string;
status: 'success' | 'failure_retry' | 'failure_final';
durationMs: number;
errorCode?: string;
errorMessage?: string;
}
export async function recordSyncResult(r: SyncResult): Promise<void> {
await db.insert(syncLog).values({
tenantId: r.tenantId, platform: r.platform,
status: r.status, durationMs: r.durationMs,
errorMessage: r.errorMessage ?? null,
// Add reportType / dateRange / errorCode columns via Phase 4 migration:
reportType: r.reportType, dateRange: r.dateRange, errorCode: r.errorCode ?? null,
} as never);
}Add to syncLog schema (Phase 4 migration):
reportType: text('report_type'),
dateRange: text('date_range'),
errorCode: text('error_code'),B4. Acceptance
- A test using Inngest’s local dev server triggers a cron event for
sync-google-account_healthand verifies a worker event is enqueued for each connected tenant with a selected account. - A worker invocation hits the (mocked) adapter, writes a
metric_cacherow, and async_logrow withstatus: 'success'. - A worker failing once writes a
failure_retryrow; the third failure writes afailure_finalrow.
Workstream C — Retry exhaustion → unhealthy state
C1. Schema
Add to platformCredentials (Phase 4 migration):
lastSyncSuccessAt: timestamp('last_sync_success_at'),
lastSyncFailureAt: timestamp('last_sync_failure_at'),
lastSyncErrorCode: text('last_sync_error_code'),Add to metricCache:
isStale: boolean('is_stale').default(false).notNull(),C2. Mark-unhealthy / mark-healthy
File: src/sync/health.ts
import { and, eq } from 'drizzle-orm';
import { db } from '../db/index.js';
import { platformCredentials, metricCache } from '../db/schema.js';
import { AdapterError } from '../adapters/errors.js';
import { writeAuditEvent } from '../security/audit-log.service.js';
export async function markUnhealthy(tenantId: string, platform: string, err: unknown): Promise<void> {
const code = err instanceof AdapterError ? err.code : 'internal';
await db.update(platformCredentials)
.set({ lastSyncFailureAt: new Date(), lastSyncErrorCode: code })
.where(and(eq(platformCredentials.tenantId, tenantId), eq(platformCredentials.platform, platform)));
// Mark all currently-cached rows for this tenant+platform as stale.
await db.update(metricCache).set({ isStale: true })
.where(and(eq(metricCache.tenantId, tenantId), eq(metricCache.platform, platform)));
// Background sync exhaustion — distinct from the request-path `mcp.tool_failed`
// emitted by the tool registry in Phase 1 §G1. Same code shape, different audience:
// dashboards/alerts can target `sync.exhausted` independently of tool-call failures.
await writeAuditEvent('sync.exhausted', 'failure', { tenantId, platform, code });
}
export async function markHealthy(tenantId: string, platform: string): Promise<void> {
await db.update(platformCredentials)
.set({ lastSyncSuccessAt: new Date(), lastSyncFailureAt: null, lastSyncErrorCode: null })
.where(and(eq(platformCredentials.tenantId, tenantId), eq(platformCredentials.platform, platform)));
await db.update(metricCache).set({ isStale: false })
.where(and(eq(metricCache.tenantId, tenantId), eq(metricCache.platform, platform)));
}The successful branch of refreshTenantReport calls markHealthy; the final-failure branch calls markUnhealthy. The markHealthy call is wired into the worker — see the §B2 patch for refreshTenantReport.
C3. Tool-side exposure
Add a small helper alongside markHealthy / markUnhealthy:
// src/sync/health.ts (extend)
export async function platformHealth(tenantId: string, platform: string): Promise<{ stale: boolean; lastFailureCode: string | null }> {
const [row] = await db.select({
lastFailureAt: platformCredentials.lastSyncFailureAt,
lastFailureCode: platformCredentials.lastSyncErrorCode,
}).from(platformCredentials).where(and(
eq(platformCredentials.tenantId, tenantId),
eq(platformCredentials.platform, platform),
));
if (!row) return { stale: false, lastFailureCode: null };
return { stale: row.lastFailureAt !== null, lastFailureCode: row.lastFailureCode ?? null };
}makeTool (Phase 2 §E1) annotates responses:
// after readOrFetch:
const health = await platformHealth(ctx.tenantId, input.platform);
return {
data, cache,
...(health.stale ? { stale: true, lastFailureCode: health.lastFailureCode } : {}),
};Tools never block on staleness — clients see the data and the warning together.
C4. Acceptance
- A worker that exhausts 3 retries →
lastSyncFailureAtset, allmetric_cacherows for that tenant+platform markedisStale: true. - A subsequent successful sync → flags cleared, rows un-staled.
- A tool call during the unhealthy window returns
{ data, cache: 'hit', stale: true, lastFailureCode: 'token_revoked' }.
Workstream D — Eager token refresh
D1. Function
File: src/sync/functions.ts (extend)
export const eagerTokenRefresh = inngest.createFunction(
{ id: 'eager-token-refresh', retries: 0 },
{ cron: '*/15 * * * *' }, // every 15 min
async ({ step }) => {
const soon = new Date(Date.now() + 60 * 60 * 1000); // expiring within 1h
const rows = await step.run('list-expiring', () =>
db.select({ tenantId: platformCredentials.tenantId, platform: platformCredentials.platform })
.from(platformCredentials)
.where(lt(platformCredentials.tokenExpiresAt, soon)),
);
if (rows.length > 0) await step.sendEvent('eager-refresh-fanout', rows.map(r => ({
name: 'sync/refresh-token' as const,
data: { tenantId: r.tenantId, platform: r.platform },
})));
},
);
export const refreshOneToken = inngest.createFunction(
{ id: 'refresh-one-token', retries: 2 },
{ event: 'sync/refresh-token' },
async ({ event }) => {
const { tenantId, platform } = event.data;
const adapter = getAdapter(platform);
await adapter.ensureValidToken(tenantId); // Phase 2 / 3 logic: refreshes if needed
},
);The Phase 2 lazy-refresh path stays — eager is belt-and-braces. If both fire near-simultaneously, the row-level lock on
UPDATE platform_credentialsserialises them and the second sees the freshly-rotated token.
D2. Acceptance
- Set a tenant’s
tokenExpiresAttonow + 30min. Triggereager-token-refreshmanually. AssertrefreshOneTokenfires for that tenant andtokenExpiresAtadvances. - A row with
tokenExpiresAt24h out is NOT in the eager-refresh fan-out.
Workstream E — GDPR purge + audit archive
E1. Cache purge
File: src/sync/functions.ts (extend)
export const purgeExpiredCache = inngest.createFunction(
{ id: 'gdpr-purge-cache', retries: 1 },
{ cron: '0 2 * * *' }, // 02:00 daily
async ({ step }) => {
const ninetyDaysAgo = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000);
const deleted = await step.run('delete-expired-cache', async () => {
const r = await db.delete(metricCache)
.where(or(lt(metricCache.expiresAt, new Date()), lt(metricCache.fetchedAt, ninetyDaysAgo)))
.returning({ id: metricCache.id });
return r.length;
});
await writeAuditEvent('gdpr.purge_cache', 'success', { deleted });
},
);Add
'gdpr.purge_cache','gdpr.purge_sync_log','gdpr.archive_audit', and'sync.exhausted'(the §C2 background-path event) to theAuditEventTypeunion inaudit-log.service.ts(Phase 1 file).
E2. sync_log purge
export const purgeSyncLog = inngest.createFunction(
{ id: 'gdpr-purge-sync-log', retries: 1 },
{ cron: '15 2 * * *' },
async ({ step }) => {
const cutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const deleted = await step.run('delete-old-sync', async () => {
const r = await db.delete(syncLog).where(lt(syncLog.createdAt, cutoff)).returning({ id: syncLog.id });
return r.length;
});
await writeAuditEvent('gdpr.purge_sync_log', 'success', { deleted });
},
);E3. Audit archive
Add a new table audit_log_archive in a Phase 4 migration with the same schema as audit_log. The archive table has no insert restrictions for mcp_app — it’s append-once write from the archive job, and read access is restricted via role.
export const auditLogArchive = pgTable('audit_log_archive', { /* same columns as audit_log */ });export const archiveAuditLog = inngest.createFunction(
{ id: 'gdpr-archive-audit', retries: 1 },
{ cron: '30 2 * * *' },
async ({ step }) => {
const cutoff = new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
const moved = await step.run('move-rows', async () => {
// Single SQL statement — atomic. CTE moves and deletes in one shot.
const r = await db.execute(sql`
WITH moved AS (
DELETE FROM audit_log
WHERE created_at < ${cutoff}
RETURNING *
)
INSERT INTO audit_log_archive SELECT * FROM moved RETURNING id
`);
return r.rowCount ?? 0;
});
await writeAuditEvent('gdpr.archive_audit', 'success', { moved });
},
);The CTE-based atomic move is critical: a non-atomic copy-then-delete loses rows on failure. Verify this works with the
mcp_approle’s INSERT-only policy onaudit_log— the archive write needs INSERT onaudit_log_archive, notaudit_log, so the existing REVOKE is fine.
E4. Grants for archive
GRANT SELECT, INSERT ON audit_log_archive TO mcp_app;
REVOKE UPDATE, DELETE ON audit_log_archive FROM mcp_app;
GRANT DELETE ON audit_log TO mcp_app; -- needed for the archive CTE; tighten in Phase 5 to "via this stored proc only"E5. Acceptance
- Insert a
metric_cacherow withfetchedAt91d ago →purgeExpiredCachedeletes it. - Insert a
sync_logrow withcreatedAt31d ago →purgeSyncLogdeletes it. - Insert an
audit_logrow withcreatedAt13mo ago →archiveAuditLogmoves it toaudit_log_archive. Total row count across both tables is preserved.
Workstream F — Sync metrics endpoint
F1. Service
File: src/sync/metrics.ts
import { and, eq, gte, sql } from 'drizzle-orm';
import { db } from '../db/index.js';
import { syncLog, platformCredentials } from '../db/schema.js';
export async function syncMetrics(opts: { sinceHours?: number } = {}) {
const since = new Date(Date.now() - (opts.sinceHours ?? 24) * 3600_000);
const rows = await db.select({
tenantId: syncLog.tenantId,
platform: syncLog.platform,
reportType: syncLog.reportType,
successes: sql<number>`count(*) filter (where ${syncLog.status} = 'success')`,
retries: sql<number>`count(*) filter (where ${syncLog.status} = 'failure_retry')`,
finals: sql<number>`count(*) filter (where ${syncLog.status} = 'failure_final')`,
avgDurationMs: sql<number>`coalesce(avg(${syncLog.durationMs}) filter (where ${syncLog.status} = 'success'), 0)::int`,
}).from(syncLog)
.where(gte(syncLog.createdAt, since))
.groupBy(syncLog.tenantId, syncLog.platform, syncLog.reportType);
// Join in current health state.
const credentials = await db.select({
tenantId: platformCredentials.tenantId,
platform: platformCredentials.platform,
lastSuccess: platformCredentials.lastSyncSuccessAt,
lastFailure: platformCredentials.lastSyncFailureAt,
failureCode: platformCredentials.lastSyncErrorCode,
}).from(platformCredentials);
return { window: { sinceHours: opts.sinceHours ?? 24, since }, syncs: rows, credentials };
}F2. Route
Add to src/auth/admin-routes.ts:
fastify.get('/admin/metrics/sync', async (req, reply) => {
if (req.headers['x-admin-token'] !== adminToken) return reply.code(401).send();
const Q = z.object({ sinceHours: z.coerce.number().int().min(1).max(168).optional() }).parse(req.query);
return syncMetrics(Q);
});F3. Acceptance
- After seeding 3 successful + 1 failed sync, the endpoint returns the right counts and a non-zero
avgDurationMs. - Unauthenticated request → 401.
- The
credentialsarray reflects the unhealthy state set by §C.
Workstream G — Heartbeat + dead-canary
G1. Heartbeats table
Phase 4 migration:
export const heartbeats = pgTable('heartbeats', {
source: text('source').primaryKey(), // 'inngest' for now
lastBeatAt: timestamp('last_beat_at').notNull(),
});G2. Heartbeat function
export const heartbeatInngest = inngest.createFunction(
{ id: 'heartbeat-inngest', retries: 0 },
{ cron: '*/5 * * * *' },
async () => {
await db.insert(heartbeats).values({ source: 'inngest', lastBeatAt: new Date() })
.onConflictDoUpdate({ target: heartbeats.source, set: { lastBeatAt: new Date() } });
},
);G3. Canary check
A single SQL query exposes liveness. GET /admin/health/inngest:
fastify.get('/admin/health/inngest', async (req, reply) => {
if (req.headers['x-admin-token'] !== adminToken) return reply.code(401).send();
const [row] = await db.select().from(heartbeats).where(eq(heartbeats.source, 'inngest'));
const ageMs = row ? Date.now() - row.lastBeatAt.getTime() : Infinity;
const alive = ageMs < 10 * 60_000; // 10-min threshold
reply.code(alive ? 200 : 503);
return { alive, lastBeatAt: row?.lastBeatAt ?? null, ageMs };
});Phase 5 will wire this to nginx/uptime monitoring (the architecture doc mentions external monitoring as part of SOC 2). For Phase 4 the endpoint is enough.
G4. Acceptance
- After server start, within 6 minutes the
heartbeatsrow exists and/admin/health/inngestreturns{ alive: true, ageMs: < 360000 }. - Manually setting
lastBeatAtto 11 minutes ago makes the endpoint return 503.
Workstream H — Tests
H1. Test files
| File | Covers |
|---|---|
tests/integration/inngest-handler.test.ts | A — signature verification, route mounting |
tests/integration/sync-fanout.test.ts | B — fan-out events, worker happy path |
tests/integration/sync-retry-exhausted.test.ts | C — markUnhealthy / markHealthy |
tests/integration/eager-token-refresh.test.ts | D |
tests/integration/gdpr-jobs.test.ts | E — all three retention jobs |
tests/integration/sync-metrics.test.ts | F |
tests/integration/heartbeat.test.ts | G |
H2. Inngest test harness
Use Inngest’s local dev server (npx inngest-cli dev) for integration tests. A small helper:
// tests/_helpers/inngest.ts
export async function triggerCron(functionId: string) {
// POST to the local inngest dev server's invoke endpoint
await fetch(`http://localhost:8288/v0/invoke/${functionId}`, { method: 'POST' });
}CI workflow gains a service container for inngest/inngest-dev (see Inngest’s docs).
H3. Adapter mocking
Same pattern as Phase 2/3 — mock at the fetch boundary; everything below runs against real Postgres + real envelope encryption. The Inngest worker code path is exercised end-to-end.
§11 — Definition of Done (full checklist)
A. Inngest client + signed webhook
-
INNGEST_SIGNING_KEYandINNGEST_EVENT_KEYinREQUIRED_SECRETS. -
/api/inngestmounted; unsigned POSTs return 401. - nginx/dev config exposes
/api/inngestto Inngest Cloud.
B. Sync fan-out
- One scheduled function per cadence row, all firing on the configured cron.
- Worker
refreshTenantReportretried 3 times, writessync_logrows for each attempt. - Cache rows are populated/updated by the worker; thundering-herd impossible (Phase 2 advisory locks still in play).
C. Retry exhaustion → unhealthy state
-
platform_credentials.lastSyncFailureAt/lastSyncErrorCodecolumns added. -
metric_cache.isStalecolumn added; flipped on/off bymarkUnhealthy/markHealthy. - Tools annotate responses with
stale: true+lastFailureCodewhen applicable.
D. Eager token refresh
- Every 15 minutes, tokens expiring within 1h are refreshed proactively.
- Lazy refresh from Phase 2 still works (defence in depth).
E. GDPR purge + audit archive
-
metric_cacherows expired or older than 90d hard-deleted nightly. -
sync_logolder than 30d hard-deleted nightly. -
audit_logolder than 12mo moved atomically toaudit_log_archive; total row count preserved. - All three jobs write
gdpr.*audit events.
F. Sync metrics endpoint
-
GET /admin/metrics/syncreturns counts + avg duration per (tenant, platform, report). -
?sinceHours=query param respected (default 24h). - Auth-protected.
G. Heartbeat + dead-canary
-
heartbeatstable populated every 5 min by the Inngest function. -
GET /admin/health/inngestreturns 200 fresh, 503 stale.
H. Tests
- All seven new test files pass with the Inngest dev server in CI.
§12 — Out of scope (deferred)
| Item | Phase |
|---|---|
External alerting on /admin/health/inngest (PagerDuty / OpsGenie) | 5 |
| Real Prometheus / OTel exporter (replaces Phase 2 in-memory + Phase 4 SQL counters) | 5 |
| Horizontal scaling of the Fastify cluster (Inngest concurrency tuning) | 5 |
Penetration test of /api/inngest (verify replay protection, signature spoofing) | 5 |
| Multi-region failover for the sync layer | 5 |
Stored-proc gate around audit_log DELETE (currently a broad GRANT for the archive CTE) | 5 |
§13 — Manual smoke test
# 0. Phase 3 stack running. Inngest signing key + event key seeded into secrets/.
# 1. Apply Phase 4 migrations
npm run db:migrate
psql "..." -c "GRANT DELETE ON audit_log TO mcp_app;"
psql "..." -c "GRANT SELECT, INSERT ON audit_log_archive TO mcp_app;"
# 2. Boot Inngest dev server
npx inngest-cli@latest dev -u http://127.0.0.1:3001/api/inngest &
# 3. Confirm Inngest discovered the functions
curl http://localhost:8288/v0/functions | jq '.[].slug' | sort
# → expect: sync-google-account_health, sync-meta-account_health, ..., refresh-tenant-report,
# refresh-one-token, eager-token-refresh, gdpr-purge-cache, gdpr-purge-sync-log,
# gdpr-archive-audit, heartbeat-inngest
# 4. Manually trigger account_health sync for Google
curl -X POST http://localhost:8288/v0/invoke/sync-google-account_health
sleep 5
psql "..." -c "SELECT count(*), max(created_at) FROM sync_log WHERE platform='google' AND report_type='account_health' AND created_at > now() - interval '1 minute';"
# → count > 0 (one row per connected tenant × dateRange combo)
# 5. Force a tenant unhealthy: temporarily revoke their Meta token in Meta UI, then trigger Meta sync.
curl -X POST http://localhost:8288/v0/invoke/sync-meta-account_health
sleep 30 # wait for retries to exhaust
psql "..." -c "SELECT tenant_id, last_sync_error_code FROM platform_credentials WHERE platform='meta';"
# → last_sync_error_code = 'token_revoked'
# 6. Tool call should now return stale data
curl -X POST http://127.0.0.1:3001/mcp -H "X-Api-Key: $KEY" -H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_account_health","arguments":{"platform":"meta","dateRange":"last_7_days"}}}'
# → response includes "stale":true, "lastFailureCode":"token_revoked"
# 7. Check sync metrics
curl -H "x-admin-token: $ADMIN_TOKEN" 'http://127.0.0.1:3001/admin/metrics/sync?sinceHours=1' | jq
# → grouped counts, avg duration, current credential health
# 8. Heartbeat liveness
curl -i -H "x-admin-token: $ADMIN_TOKEN" http://127.0.0.1:3001/admin/health/inngest
# → HTTP 200 within 6 minutes of starting; ageMs < 360000
# 9. GDPR purge — insert an old row and trigger
psql "..." -c "UPDATE metric_cache SET expires_at = now() - interval '1 day' WHERE id = (SELECT id FROM metric_cache LIMIT 1);"
curl -X POST http://localhost:8288/v0/invoke/gdpr-purge-cache
sleep 2
psql "..." -c "SELECT count(*) FROM metric_cache WHERE expires_at < now();"
# → 0
# 10. Archive a synthetic-old audit row
psql "..." -c "INSERT INTO audit_log (event_type, outcome, created_at) VALUES ('test', 'success', now() - interval '13 months');"
curl -X POST http://localhost:8288/v0/invoke/gdpr-archive-audit
sleep 2
psql "..." -c "SELECT count(*) FROM audit_log_archive WHERE event_type='test';"
# → 1
# 11. Restore Meta connection: complete OAuth again, run sync, expect markHealthy
curl -X POST http://localhost:8288/v0/invoke/sync-meta-account_health
sleep 5
psql "..." -c "SELECT last_sync_failure_at, last_sync_error_code FROM platform_credentials WHERE platform='meta';"
# → both NULL
# 12. Tool call returns fresh, no stale flag
curl -X POST http://127.0.0.1:3001/mcp -H "X-Api-Key: $KEY" -H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_account_health","arguments":{"platform":"meta","dateRange":"last_7_days"}}}'
# → no "stale" field in responseIf every step produces the expected outcome, Phase 4 is shipped. Move on to Phase 5 (production hardening + compliance).