/** * orchestrator/test-gridsearch-ipc-uds.mjs * * E2E test for Unix Domain Sockets / Windows Named Pipes! * Spawns the worker in `--uds` mode and tests direct high-throughput * lock-free JSON binary framing over a net.Socket. */ import { spawn } from 'node:child_process'; import { resolve, dirname, join } from 'node:path'; import { readFileSync, existsSync, unlinkSync } from 'node:fs'; import { fileURLToPath } from 'node:url'; import net from 'node:net'; import { tmpdir } from 'node:os'; const __dirname = dirname(fileURLToPath(import.meta.url)); const IS_WIN = process.platform === 'win32'; const EXE_NAME = IS_WIN ? 'polymech-cli.exe' : 'polymech-cli'; const EXE = resolve(__dirname, '..', 'dist', EXE_NAME); const TEST_CANCEL = false; if (!existsSync(EXE)) { console.error(`❌ Binary not found at ${EXE}`); process.exit(1); } const PIPE_NAME = 'polymech-test-uds'; const CPP_UDS_ARG = IS_WIN ? '4000' : join(tmpdir(), `${PIPE_NAME}.sock`); if (!IS_WIN && existsSync(CPP_UDS_ARG)) { unlinkSync(CPP_UDS_ARG); } console.log(`Binary: ${EXE}`); console.log(`C++ Arg: ${CPP_UDS_ARG}\n`); // ── Event collector ───────────────────────────────────────────────────────── function createCollector() { const events = {}; for (const t of ['grid-ready', 'waypoint-start', 'area', 'location', 'enrich-start', 'node', 'node-error', 'nodePage', 'job_result']) { events[t] = []; } return { events, onComplete: null, handler(msg) { const t = msg.type; if (events[t]) events[t].push(msg); else events[t] = [msg]; const d = msg.data ?? {}; if (t === 'waypoint-start') { process.stdout.write(`\r 🔍 Searching waypoint ${(d.index ?? 0) + 1}/${d.total ?? '?'}...`); } else if (t === 'node') { process.stdout.write(`\r 📧 Enriched: ${d.title?.substring(0, 40) ?? ''} `); } else if (t === 'node-error') { process.stdout.write(`\r ⚠️ Error: ${d.node?.title?.substring(0, 40) ?? ''} `); } else if (t === 'job_result') { console.log(`\n 🏁 Pipeline complete!`); if (this.onComplete) this.onComplete(msg); } }, }; } let passed = 0; let failed = 0; function assert(condition, label) { if (condition) { console.log(` ✅ ${label}`); passed++; } else { console.error(` ❌ ${label}`); failed++; } } async function run() { console.log('🧪 Gridsearch UDS / Named Pipe E2E Test\n'); // 1. Spawn worker in UDS mode console.log('1. Spawning remote C++ Taskflow Daemon'); const worker = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG, '--daemon'], { stdio: 'inherit' }); // Give the daemon a moment to boot console.log('2. Connecting net.Socket with retries...'); let socket; for (let i = 0; i < 15; i++) { try { await new Promise((resolve, reject) => { if (IS_WIN) { socket = net.connect({ port: 4000, host: '127.0.0.1' }); } else { socket = net.connect(CPP_UDS_ARG); } socket.once('connect', resolve); socket.once('error', reject); }); console.log(' ✅ Socket Connected to UDS!'); break; } catch (e) { if (i === 14) throw e; await new Promise(r => setTimeout(r, 500)); } } const collector = createCollector(); let buffer = Buffer.alloc(0); // Buffer framing logic (length-prefixed streaming) socket.on('data', (chunk) => { buffer = Buffer.concat([buffer, chunk]); while (buffer.length >= 4) { const len = buffer.readUInt32LE(0); if (buffer.length >= 4 + len) { const payload = buffer.toString('utf8', 4, 4 + len); buffer = buffer.subarray(4 + len); try { const msg = JSON.parse(payload); collector.handler(msg); } catch (e) { console.error("JSON PARSE ERROR:", e, payload); } } else { break; // Wait for more chunks } } }); // 3. Send Gridsearch payload const sampleConfig = JSON.parse( readFileSync(resolve(__dirname, '..', 'config', 'gridsearch-bcn-universities.json'), 'utf8') ); sampleConfig.configPath = resolve(__dirname, '..', 'config', 'postgres.toml'); sampleConfig.jobId = 'uds-test-cancel-abc'; console.log('3. Writing serialized IPC Payload over pipe...'); const jsonStr = JSON.stringify(sampleConfig); const lenBuf = Buffer.alloc(4); lenBuf.writeUInt32LE(Buffer.byteLength(jsonStr)); socket.write(lenBuf); socket.write(jsonStr); // Send cancellation after 5 seconds if (TEST_CANCEL) { setTimeout(() => { console.log('\n\n--> Testing Dynamic Cancellation (Sending cancel event for uds-test-cancel-abc)...'); const cancelPayload = JSON.stringify({ action: "cancel", jobId: "uds-test-cancel-abc" }); const cancelLenBuf = Buffer.alloc(4); cancelLenBuf.writeUInt32LE(Buffer.byteLength(cancelPayload)); socket.write(cancelLenBuf); socket.write(cancelPayload); }, 5000); } // 4. Wait for pipeline completion (job_result event) or timeout console.log('\n4. Awaiting multi-threaded Execution Pipeline (can take minutes)...\n'); await new Promise((resolve) => { collector.onComplete = () => { // Send stop command to gracefully shut down the daemon console.log(' 📤 Sending stop command to daemon...'); const stopPayload = JSON.stringify({ action: 'stop' }); const stopLen = Buffer.alloc(4); stopLen.writeUInt32LE(Buffer.byteLength(stopPayload)); socket.write(stopLen); socket.write(stopPayload); setTimeout(resolve, 1000); // Give daemon a moment to ack }; // Safety timeout setTimeout(() => { console.log('\n ⏰ Timeout reached (120s) — forcing shutdown.'); resolve(); }, 120000); }); console.log('\n\n5. Event summary'); for (const [k, v] of Object.entries(collector.events)) { console.log(` ${k}: ${v.length}`); } // Assertions const ev = collector.events; assert(ev['grid-ready'].length === 1, 'grid-ready emitted once'); assert(ev['waypoint-start'].length > 0, 'waypoint-start events received'); assert(ev['location'].length > 0, 'location events received'); assert(ev['enrich-start'].length === 1, 'enrich-start emitted once'); assert(ev['job_result'].length === 1, 'job_result emitted once'); // Check enrichment skip log (if present in log events) const logEvents = ev['log'] ?? []; const skipLog = logEvents.find(l => typeof l.data === 'string' && l.data.includes('already enriched') ); const nodeCount = ev['node'].length + ev['node-error'].length; if (skipLog) { console.log(` ℹ️ Pre-enrich skip detected: ${skipLog.data}`); assert(nodeCount === 0, 'no enrichment needed (all skipped)'); } else { console.log(' ℹ️ No pre-enrich skips (all locations are new or unenriched)'); assert(nodeCount > 0, 'enrichment node events received'); } // Check filterTypes assertions: all locations must have website + matching type const FILTER_TYPE = 'Recycling center'; const locations = ev['location']; const badWebsite = locations.filter(l => { const loc = l.data?.location; return !loc?.website; }); assert(badWebsite.length === 0, `all locations have website (${badWebsite.length} missing)`); const badType = locations.filter(l => { const loc = l.data?.location; const types = loc?.types ?? []; const type = loc?.type ?? ''; return !types.includes(FILTER_TYPE) && type !== FILTER_TYPE; }); if (badType.length > 0) { console.log(` ❌ Mismatched locations:`); badType.slice(0, 3).forEach(l => console.log(JSON.stringify(l.data?.location, null, 2))); } assert(badType.length === 0, `all locations match type "${FILTER_TYPE}" (${badType.length} mismatched)`); const filterLog = logEvents.find(l => typeof l.data === 'string' && l.data.includes('locations removed') ); if (filterLog) { console.log(` ℹ️ Filter applied: ${filterLog.data}`); } const filterTypesLog = logEvents.filter(l => typeof l.data === 'string' && (l.data.includes('filterTypes:') || l.data.includes(' - ')) ); if (filterTypesLog.length > 0) { console.log(` ℹ️ Parsed filterTypes in C++:`); filterTypesLog.forEach(l => console.log(` ${l.data}`)); } console.log(` ℹ️ Locations after filter: ${locations.length}`); console.log('6. Cleanup'); socket.destroy(); worker.kill('SIGTERM'); console.log(`\n────────────────────────────────`); console.log(` Passed: ${passed} Failed: ${failed}`); console.log(`────────────────────────────────`); process.exit(failed > 0 ? 1 : 0); } run().catch(e => { console.error(e); process.exit(1); });