27 KiB
Running Products via Native Node.js Worker Threads
Moving heavy queues (like ImagesProduct crunching images via sharp, or LocationsProduct running grid searches) out of the main Event Loop is essential to preserve API performance and maintain a high Event Loop FPS.
We orchestrate this entirely within Node.js using the native worker_threads module, driven by a centralized JSON configuration. No PM2 dependency is required.
Architecture: Config-Driven Worker Spawning
The application topology is defined in server/config/products.json. The main thread reads this file on boot. If a product has "workers" > 0, the main thread spawns dedicated native Worker threads to handle its pg-boss background jobs — while still registering the product's HTTP routes on the main thread.
1. The Configuration Format (config/products.json)
Each product entry specifies:
name— maps to a key inPRODUCT_IMPORTSinregistry.tsenabled— whether to load the product at allworkers— how many native Worker threads to spawn (0 = run everything on the main thread)deps— informational dependency list
{
"products": [
{ "name": "images", "enabled": true, "workers": 1, "deps": ["serving", "storage"] },
{ "name": "videos", "enabled": true, "workers": 0, "deps": ["serving", "storage"] },
{ "name": "locations", "enabled": true, "workers": 0, "deps": ["serving", "storage"] },
{ "name": "serving", "enabled": true, "workers": 0, "deps": ["images"] },
{ "name": "email", "enabled": true, "workers": 0, "deps": [] },
{ "name": "openai", "enabled": true, "workers": 0, "deps": [] },
{ "name": "analytics", "enabled": true, "workers": 0, "deps": [] },
{ "name": "storage", "enabled": true, "workers": 0, "deps": [] },
{ "name": "ecommerce", "enabled": true, "workers": 0, "deps": ["images"] },
{ "name": "contacts", "enabled": true, "workers": 0, "deps": [] },
{ "name": "campaigns", "enabled": true, "workers": 0, "deps": ["contacts"] },
{ "name": "mcp", "enabled": true, "workers": 0, "deps": ["serving"] }
]
}
2. Main Thread: The Orchestrator (src/products/registry.ts)
Boot-up is split into two phases:
Phase 1 — registerProductRoutes(app): Reads products.json, lazy-imports only the enabled product modules via a PRODUCT_IMPORTS map (avoids importing everything on boot), instantiates them, and registers their HTTP routes on the Hono app.
Phase 2 — startProducts(boss): For each product:
- If
workers > 0, spawns native Worker threads (see §3). - Always calls
product.start(boss)on the main thread so the product can register pg-boss queue names and perform local init.
// Lazy imports — only loaded when the product is enabled
const PRODUCT_IMPORTS: Record<string, () => Promise<any>> = {
'images': () => import('./images/index.js'),
'videos': () => import('./videos/index.js'),
'locations': () => import('./locations/index.js'),
// ... all 12 products
};
export const startProducts = async (boss?: any) => {
for (const product of instances) {
const pConfig = product.__config;
if (pConfig && pConfig.workers > 0) {
const isDev = process.env.NODE_ENV !== 'production';
// Dev: uses vite-node wrapper to load TS directly
// Prod: uses pre-bundled worker.cjs
let workerEntry = isDev
? path.resolve(process.cwd(), 'src', 'worker_wrapper.mjs')
: path.resolve(process.cwd(), 'worker.cjs');
for (let i = 0; i < pConfig.workers; i++) {
const worker = new Worker(workerEntry, {
workerData: { productName: pConfig.name, workerScript }
});
nativeWorkers.push({ id: product.id, worker });
// Forward EventBus events from worker → main thread
worker.on('message', (msg) => {
if (msg?.type === 'event' && msg.name) {
EventBus.emit(msg.name, msg.data);
}
});
}
}
// Main-thread init (HTTP deps, caching, boss queue creation)
await product.start(boss);
}
};
3. Worker Entrypoint (src/worker.ts)
When a Worker thread boots, worker.ts is loaded. It reads workerData.productName, instantiates the matching product class, and starts only its pg-boss consumers. It does not start an HTTP server.
Key responsibilities:
- PG-Boss queue consumers — the product's
onStart(boss)registers workers for its queues. - IPC health checks — responds to
{ type: 'ping' }messages with{ type: 'pong', activeJobs, ... }. - IPC job dispatch — handles
{ type: 'job' }messages for synchronous request-response viadispatchToWorker(). - EventBus bridging — forwards
job:progress,job:complete, andjob:errorevents to the parent thread viaparentPort.postMessage().
// worker.ts (runs inside the Worker thread)
import { workerData, isMainThread, parentPort } from 'worker_threads';
if (isMainThread) throw new Error('Must run inside a Worker thread.');
const ProductClass = PRODUCT_CLASSES[workerData.productName];
const instance = new ProductClass();
// IPC: ping/pong + job dispatch
parentPort.on('message', async (msg) => {
if (msg.type === 'ping') return parentPort.postMessage({ type: 'pong', ... });
if (msg.type === 'job') { /* handleJob → postMessage result */ }
});
// Bridge internal events to parent thread
EventBus.on('job:progress', (data) => parentPort.postMessage({ type: 'event', name: 'job:progress', data }));
EventBus.on('job:complete', (data) => parentPort.postMessage({ type: 'event', name: 'job:complete', data }));
// Start isolated PG-Boss and bind the product
const workerBoss = await startBoss();
await instance.start(workerBoss);
4. Dev Mode: worker_wrapper.mjs
In dev, Worker threads can't inherit tsx hooks from the parent process. To support TypeScript directly, a plain .mjs bootstrap uses vite-node's programmatic API to load and execute worker.ts with full TS resolution:
// worker_wrapper.mjs
import { workerData } from 'node:worker_threads';
import { createServer } from 'vite';
import { ViteNodeRunner } from 'vite-node/client';
const server = await createServer({ /* hmr: false, @-alias setup */ });
const runner = new ViteNodeRunner({ root, base, fetchModule, resolveId });
await runner.executeFile(workerData.workerScript);
5. Smart Consumer Skipping
Products that support pg-boss workers (like LocationsProduct) use this pattern in onStart() to avoid double-consuming:
async onStart(boss?: PgBoss) {
const { isMainThread } = await import('node:worker_threads');
const workersConfig = this.__config?.workers ?? 0;
const shouldConsume = !isMainThread || workersConfig === 0;
for (const WorkerClass of this.workers) {
const worker = new WorkerClass();
await boss.createQueue(worker.queueName);
if (shouldConsume) {
await boss.work(worker.queueName, options, worker.handler.bind(worker));
}
}
}
If the product is running with dedicated Worker threads (workers > 0), the main thread skips consuming from pg-boss queues — only the Worker threads will consume them.
6. IPC Job Dispatch (src/commons/worker-ipc.ts)
For synchronous request-response between the main thread and worker threads (e.g., image processing called from an HTTP handler), there is a utility:
import { dispatchToWorker, hasWorker } from '@/commons/worker-ipc.js';
// Check if a live worker exists
if (await hasWorker('images')) {
const result = await dispatchToWorker('images', 'process_image', { buffer, ... }, [buffer]);
}
- Uses round-robin across multiple worker threads for the same product.
- Supports zero-copy
ArrayBuffertransfers via thetransferListparameter. - Has a configurable timeout (default 30s).
Base Classes
AbstractProduct (src/products/AbstractProduct.ts)
All products extend this. Provides:
start(boss)/stop()lifecycle hookshandleJob(action, msg)— for IPC job dispatch from worker threadshandleStream()— SSE streaming helper with cache-checkinggenerateHash()— deterministic deep-sorted SHA-256 hashing
AbstractWorker (src/jobs/boss/AbstractWorker.ts)
PG-Boss queue consumers extend this. Provides:
queueName— the pg-boss queue to consumeprocess(job)— override with business logiccalculateCost(job, result)— usage meteringhandler()— wrapsprocess()with error handling and emitsjob:complete/job:failed
Worker classes use the @Worker(queueName) decorator for registration.
Case Study: ImagesProduct — The Canonical Worker-Offloaded Product
ImagesProduct (src/products/images/index.ts) is currently the only product running with workers: 1 in production. It demonstrates the full IPC lifecycle — from HTTP request through worker dispatch to cached response. It does not use AbstractWorker or pg-boss queues; instead, it uses the synchronous IPC dispatch pattern via worker-ipc.ts.
The Hybrid Pattern: hasWorker + Inline Fallback
Every image processing path checks whether a live worker thread exists. If yes, the heavy sharp work is offloaded. If no (e.g., during tests, or if workers: 0 in config), it falls back to inline processing on the main thread:
// src/products/images/index.ts — _ensureCachedImage()
if (await hasWorker('images')) {
// Zero-copy transfer: copy Buffer into a transferable ArrayBuffer
const arrayBuffer = new ArrayBuffer(inputBuffer.length);
new Uint8Array(arrayBuffer).set(inputBuffer);
await dispatchToWorker('images', 'process_image', {
buffer: arrayBuffer, width, height, format, fit
}, [arrayBuffer]); // ← transfer list: moves memory, doesn't clone
} else {
// Inline fallback (same thread)
const pipeline = sharp(inputBuffer).resize({ width, height, fit }).toFormat(format);
await fs.writeFile(filepath, await pipeline.toBuffer());
}
This pattern is used in three HTTP handlers:
handlePostImage— file upload → resize → cache (or forward to Supabase Storage)handleRenderImage— URL → fetch → resize → serve as binary (used by lazy srcset URLs)handlePostResponsive/handleGetResponsive— generate multi-format, multi-size srcset variants
Worker-Side: handleJob() Actions
Inside the worker thread, the ImagesProduct instance receives IPC job messages and routes them by action:
// src/products/images/index.ts — handleJob()
async handleJob(action: string, msg: any): Promise<any> {
if (action === 'process_image') {
// Reconstruct Buffer from transferred ArrayBuffer
const inputBuffer = Buffer.from(msg.buffer);
await this.performProcessImage(inputBuffer, filepath, { width, height, format, fit });
return { filename };
}
if (action === 'render_image') {
// Supports square crop, contain fit, etc.
await this.performRenderImage(inputBuffer, filepath, { width, height, format, square, contain });
return { filename };
}
return super.handleJob(action, msg); // Throws for unknown actions
}
Both actions write the processed image to the shared cache/ directory on disk. The main thread then reads the file to serve or forward the response.
The Responsive Image Pipeline
The responsive endpoint generates multiple width × format variants (e.g., [180, 640, 1024, 2048] × [avif, webp]). It splits work between eager and lazy generation:
| Variant Width | Strategy | What Happens |
|---|---|---|
| ≤ 600px | Eager | Processed immediately (via worker or inline) and cached to disk. Returns direct cache URL. |
| > 600px | Lazy | Returns a dynamic /api/images/render?url=...&width=...&format=... URL. Processed on-demand when the browser requests it. |
This avoids eagerly generating large, rarely-used variants for every upload while ensuring small thumbnails are always instant.
Request Coalescing
When multiple concurrent requests reference the same source URL, fetchImageCoalesced() deduplicates them using an in-flight Map<string, Promise<Buffer>>. Only one HTTP fetch goes out; all callers share the same Promise.
Data Flow Summary
HTTP Request (main thread)
→ hasWorker('images')? ──yes──→ dispatchToWorker()
│ │
│ ├─ postMessage({ type:'job', action:'render_image', buffer }, [buffer])
│ │ ↓ (zero-copy ArrayBuffer transfer)
│ │ Worker Thread: handleJob('render_image', msg)
│ │ ↓
│ │ sharp(buffer).resize().toFormat().toFile(filepath)
│ │ ↓ (streams directly to disk)
│ └─ postMessage({ type:'job_result', result: { filename } })
│ ↓
│ main thread: fs.readFile(cache/hash.format)
│ ↓
│ return c.redirect() or c.body()
│
└──no──→ Inline: sharp().resize().toFile(filepath) → serve
Why this Pattern is Powerful
- Zero PM2 Dependency: Entirely native to Node.js. Containerization, Nexe builds — nothing changes.
- True Multi-Core Utilization:
worker_threadsrun on distinct OS threads. Settingworkers: 2forimagesdedicates two CPU cores to Sharp. - API Immunity: Workers have their own V8 heap and Event Loop. A massive image resize will have zero impact on the main API's Event Loop FPS.
- EventBus Bridging: Worker events (progress, completion) are forwarded to the main thread via IPC
postMessage, enabling real-time SSE streams to API clients. - Dev/Prod Parity: The
worker_wrapper.mjs+ vite-node setup means TypeScript runs natively in dev worker threads, while production uses pre-bundled JS — same behavior in both environments. - Round-Robin Dispatch: The
worker-ipc.tsutility distributes synchronous job requests across multiple threads, enabling true horizontal scaling within a single process.
Constraints & Gotchas (Lessons from Inngest + Our Benchmarks)
Node.js worker threads have real constraints that Go/Rust/Python developers would never expect. The Inngest post on worker threads formalizes these well. Here's how each constraint applies to our architecture:
1. Workers Are NOT Lightweight
Each worker thread is a full V8 isolate — its own heap, its own event loop. ~10 MB memory overhead per worker, with tens-of-milliseconds startup cost. This is why our products.json caps workers at 1-2 per product, and workers are spawned once at boot and persist for the process lifetime. We never create/destroy workers per-job.
2. You Can't Pass Logic — Only Messages
Unlike Go goroutines or Rust threads, you can't pass a function to new Worker(). The structured clone algorithm can't serialize functions. This is why:
- Our
EventBuslisteners live on the main thread — worker threads post{ type: 'event' }messages that get bridged to the main-thread EventBus - Pino
loggerinstances can't cross the boundary — worker threads use their own logger pg-bossconnections are per-thread — each worker establishes its own
3. Bundler Discovery Is Fragile
Bundlers (webpack) can't statically analyze new Worker(path). Our approach:
- Dev:
worker_wrapper.mjsuses vite-node'sViteNodeRunnerto resolve TypeScript at runtime - Prod:
build.shcompilesworker.ts→worker.cjsas a separate webpack entry point, and the registry uses__dirname + '/worker.cjs'— a plain string the bundler can't trace
Both paths are hardcoded and tested — no dynamic path construction that could break silently.
4. Dev-Mode vite-node Overhead (CRITICAL)
Benchmarked 2024-03-24, same 386KB JPEG source at 800px webp:
| Path | Encode Time | Notes |
|---|---|---|
| Worker thread (vite-node) | 3:265 (3.26s) | IPC + vite-node module transform overhead |
| Main thread (inline) | 0:140 (140ms) | Direct sharp call, no IPC |
~23× slower in dev mode via worker thread. The vite-node ViteNodeRunner inside the worker's V8 isolate adds massive overhead for module resolution and transformation. Sharp itself (native C++ addon) runs at the same speed — the cost is entirely in the JS wrapper.
In production with pre-bundled worker.cjs, the worker thread runs at near-native speed. The overhead is a dev-only artifact.
Practical implication: Consider setting
"workers": 0forimagesduring local development to avoid the vite-node penalty. The main thread handles 140ms encodes without impacting dev-server responsiveness.
5. No Respawning (Current Gap)
Inngest implements exponential backoff respawning — if a worker thread crashes (unhandled exception, OOM), the main thread detects the exit event and spins up a replacement with increasing delay.
We don't do this yet. If a worker thread dies, it's gone until a full server restart. The registry.ts spawner doesn't watch for exit events. This is acceptable for now because:
- Workers are simple (sharp pipeline, no external connections beyond pg-boss)
- Crashes are rare in production
- The inline fallback (
hasWorker() === false) means the main thread picks up the work
But for robustness, adding respawn-with-backoff to the worker spawner in registry.ts would be a good future improvement.
6. Elastic Autospawning & Tier-Based Limits (Grid Searches)
Monolithic jobs that process tens of thousands of items (e.g., massive Grid Searches) expose a flaw in static worker pools: head-of-line blocking. If all workers are occupied by a massive Enterprise search, Free/Pro users starve.
To solve this we use an Elastic Autospawn / Fan-Out Architecture:
- Fan-Out (Map-Reduce): Instead of processing 10,000 grid cells in a single Node.js worker loop, an Orchestrator job enumerates the area and splits it into 10,000 individual
gridsearch-celljobs pushed to PG-Boss. - Tier-Based Queue Routing/Throttling: We use PG-Boss
singletonKey(tied touserId) and tier-based concurrency limits (e.g.,teamConcurrency: 5for Pro vs20for Enterprise) to ensure fairness at the database queue level. - Distributed SSE (Pub/Sub): Because micro-jobs fan out across multiple elastic workers, tying SSE to a local
EventBusviaparentPortfails. Instead, workers emit progress via PostgresNOTIFYor Supabase Realtime channels. The main API process (handling the SSE route) usesLISTENto receive events from any worker on any machine, bridging them back to the user's HTTP stream.
Exploring Native (Rust/C++) Replacements
Given the constraints of V8 Isolates (10MB overhead, slow startup, lack of shared memory serialization), a viable future replacement for CPU-bound or massively concurrent products (like images or locations grid searches) is replacing Node.js worker_threads with Per-Product Rust or C++ implementations (Binaries or N-API).
If a Native (Rust/C++) worker is implemented:
- Fast Autospawn: Native binaries spawn in under 1ms. If compiled as an N-API native module (via
napi-rsornode-addon-apifor C++), worker execution is effectively instantaneous function calls avoiding V8 Isolate boot. - IPC Performance:
- Subprocesses communicating via raw UNIX socket or
stdoutstreams provide near-native memory transfer without structured-clone serialization bounds. - N-API bindings allow direct zero-copy memory (SharedArrayBuffer) access between the main Thread JavaScript and native execution.
- Subprocesses communicating via raw UNIX socket or
- Memory Efficiency: A single Native concurrency pipeline scaling to 10,000 asynchronous grid cells uses a fraction of the RAM of dozens of isolated Node.js context engines.
Side-By-Side Comparison
| Feature | Node.js worker_threads |
Rust (N-API / Subprocess) | C++ (N-API / Subprocess) |
|---|---|---|---|
| Startup Time | ~30-50ms (V8 Isolate boot) | <1ms (Native / Binary spawn) | <1ms (Native / Binary spawn) |
| Memory per Instance | High (~10-30MB baseline) | Minimal (<2MB) | Minimal (<2MB) |
| IPC Performance | Slow (postMessage Structured Clone) |
High (Zero-Copy SharedArrayBuffer or MsgPack UDS) | High (Zero-Copy SharedArrayBuffer or MsgPack UDS) |
| Autospawning | Poor (Spiking spawns causes OOM) | Excellent | Excellent |
| Development Speed | Fastest | Slower (Strict compiler, borrow checker) | Slower (Manual compilation, header management) |
| Memory Safety | High (V8 Engine) | High (Compiler-enforced lifetimes) | Lower (Prone to segfaults / memory leaks) |
| Ecosystem (Parallelism) | Limited (libuv threadpool) | Best-in-class (Tokio, Rayon) | Strong (std::thread, Boost) |
7. Messaging: Internal & External Workers (Protobuf)
When moving to an Elastic Autospawn architecture with Native workers, the serialization format and communication transport become the most crucial factors for performance and system integrity.
Why Protobuf?
While MessagePack over Unix Domain Sockets works, Protocol Buffers (Protobuf) offers several distinct advantages, especially when scaling from "Internal Subprocesses" to "External Distributed Workers":
- Strict Type Contracts: Both Node.js (TypeScript) and Native (Rust/C++) share the exact same
.protoschema. If a payload field is required, the compiler ensures it exists. If the Node.js API changes a field structure, the Native worker fails to compile, preventing silent production parsing errors. - Backwards Compatibility: Protobuf is inherently designed for evolving APIs without breaking older workers.
- RPC Native (gRPC): As we expand from Internal Workers on the same machine to External Workers on entirely different physical servers, Protobuf naturally upgrades into gRPC with zero serialization changes.
The "Dual Model" Architecture
The beauty of standardizing on Protobuf is that the exact same serialization code is used regardless of where the worker lives.
1. Internal Workers (Local IPC via Subprocesses)
- The Scenario: The main Node.js API process spawns a native Rust/C++ executable as a child process on the same machine.
- The Transport: Unix Domain Sockets (UDS) / Named Pipes or Standard I/O (stdio). UDS is preferred because it's full-duplex and avoids Node's
stdoutbuffering constraints. - How it works:
- Node.js encodes the
JobPayloadmessage using the compiledprotobufjsTypeScript library. - Node.js writes the binary payload to the local UNIX Domain Socket (e.g.,
/tmp/worker_grid_123.sock). Because UDS is a TCP-like stream, payloads must be length-prefixed (e.g., 4 bytes for length, followed by the Protobuf bytes) so the receiver knows when the message ends. - The Rust/C++ subprocess reads the length prefix, reads the exact byte count, and uses
prost(Rust) or the Google Protobuf C++ library to deserialize instantly. - The worker executes the CPU-heavy logic, serializes the
JobResult, prefixes the length, and streams it back.
- Node.js encodes the
2. External Workers (Distributed Execution)
- The Scenario: Fanning out 10,000 Grid Search cells across dozens of physical worker nodes to prevent local CPU exhaustion.
- The Transport: Pg-Boss / Postgres (or gRPC).
- How it works:
- The Queue: The main Node.js process encodes the job payload via Protobuf and saves the raw bytes (or Base64-encoded bytes) into the
pgboss.jobtable. - The Fleet: Hundreds of external Rust/C++ worker nodes connect directly to the database layer (or via a gRPC interface) pulling jobs.
- The Decoding: The remote execution node pulls the binary payload and deserializes the Protobuf bytes. Since the schema is strict, all external workers instantly understand the payload, ensuring perfect schema synchronization across the heterogeneous distributed fleet.
- The Queue: The main Node.js process encodes the job payload via Protobuf and saves the raw bytes (or Base64-encoded bytes) into the
8. Storage & Database Integrations for Native Workers
Transitioning to Native Autospawning workers heavily impacts how the database and storage layers scale, specifically around connection pooling, payload limits, and blob storage.
Connection Limits (Supavisor)
If 5,000 autospawned native processes all open distinct libpq connections to Postgres, the database will instantly lock up with FATAL: too many clients.
The Rule: All native workers (whether internal executables or external nodes) must connect to Postgres via a connection pooler like Supavisor or PgBouncer, which transparently multiplexes thousands of transient client connections onto a handful of persistent database connections.
Event Bus Limits (Postgres NOTIFY)
As established, we use LISTEN / NOTIFY to bridge Server-Sent Events (SSE) from the Native workers back to the Node.js API stream.
The Constraint: Postgres NOTIFY string payloads are hard-limited to 8000 bytes. You cannot emit massive JSON/Protobuf result arrays over NOTIFY. It must only contain progression percentages or tiny metadata.
Returning Artifacts & Large Results
When a Native worker finishes crunching data, it needs to save the result.
- Small Results (JSON/Protobuf < 1MB):
- The native worker calls the
pg-boss.complete(jobId, protobuf_bytes)equivalent, storing the payload back in thepgboss.jobtable.
- The native worker calls the
- Tabular Results (Big Data):
- e.g., 50,000 scraped locations from a massive grid cell. The native worker uses the incredibly fast SQL
COPYcommand (bulk insert) to slam the data directly into a dedicated Postgres table (e.g.,places), and completes thepg-bossjob with an empty payload.
- e.g., 50,000 scraped locations from a massive grid cell. The native worker uses the incredibly fast SQL
- Huge Blobs (Images / Videos / AI Models):
- The native worker does not touch Postgres for blobs. The Node API orchestrator pre-signs a Supabase Storage Upload URL and embeds it in the job payload. The Native worker generates the 50MB file and streams it via
libcurldirectly to S3/Supabase Storage, completely bypassing the database stack.
- The native worker does not touch Postgres for blobs. The Node API orchestrator pre-signs a Supabase Storage Upload URL and embeds it in the job payload. The Native worker generates the 50MB file and streams it via
9. Next-level Abstracting: Embedded Scripting (Lua/WASM)
While writing the infrastructure layer (UDS reading, Protobuf decoding, Postgres connection pooling) in strictly-typed Native code (Rust/C++) is essential for performance, writing volatile business logic (like search heuristics) in C++ hurts developer velocity and requires constant recompilations.
To solve this we use the Native Host + Embedded Scripting pattern:
- The Architecture: We compile a standalone Native Executable (the "Host") in Rust or C++. This host statically embeds a lightweight scripting engine (like LuaJIT or a WASM runtime like Wasmtime).
- Execution: The Native Host safely handles all the heavy lifting—reading Unix Domain Sockets, managing DB connections, and parsing Protobuf. Once the payload is ready, it passes it into the embedded Lua state or WASM function instance.
- The Benefit: Developers write the actual product logic in high-level Lua (or AssemblyScript for WASM). It executes wildly faster than Node.js (LuaJIT approaches raw C speed) while maintaining the tiny
<2MBmemory footprint, but allows for instant hot-reloading of the scripts without ever running a C++ compiler.