256 lines
8.8 KiB
JavaScript
256 lines
8.8 KiB
JavaScript
/**
|
||
* orchestrator/test-gridsearch-ipc-uds.mjs
|
||
*
|
||
* E2E test for Unix Domain Sockets / Windows Named Pipes!
|
||
* Spawns the worker in `--uds` mode and tests direct high-throughput
|
||
* lock-free JSON binary framing over a net.Socket.
|
||
*/
|
||
|
||
import { spawn } from 'node:child_process';
|
||
import { resolve, dirname, join } from 'node:path';
|
||
import { readFileSync, existsSync, unlinkSync } from 'node:fs';
|
||
import { fileURLToPath } from 'node:url';
|
||
import net from 'node:net';
|
||
import { tmpdir } from 'node:os';
|
||
|
||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||
const IS_WIN = process.platform === 'win32';
|
||
const EXE_NAME = IS_WIN ? 'polymech-cli.exe' : 'polymech-cli';
|
||
const EXE = resolve(__dirname, '..', 'dist', EXE_NAME);
|
||
const TEST_CANCEL = false;
|
||
|
||
if (!existsSync(EXE)) {
|
||
console.error(`❌ Binary not found at ${EXE}`);
|
||
process.exit(1);
|
||
}
|
||
|
||
const PIPE_NAME = 'polymech-test-uds';
|
||
const CPP_UDS_ARG = IS_WIN ? '4000' : join(tmpdir(), `${PIPE_NAME}.sock`);
|
||
|
||
if (!IS_WIN && existsSync(CPP_UDS_ARG)) {
|
||
unlinkSync(CPP_UDS_ARG);
|
||
}
|
||
|
||
console.log(`Binary: ${EXE}`);
|
||
console.log(`C++ Arg: ${CPP_UDS_ARG}\n`);
|
||
|
||
// ── Event collector ─────────────────────────────────────────────────────────
|
||
function createCollector() {
|
||
const events = {};
|
||
for (const t of ['grid-ready', 'waypoint-start', 'area', 'location',
|
||
'enrich-start', 'node', 'node-error', 'nodePage', 'job_result']) {
|
||
events[t] = [];
|
||
}
|
||
return {
|
||
events,
|
||
onComplete: null,
|
||
handler(msg) {
|
||
const t = msg.type;
|
||
if (events[t]) events[t].push(msg);
|
||
else events[t] = [msg];
|
||
|
||
const d = msg.data ?? {};
|
||
if (t === 'waypoint-start') {
|
||
process.stdout.write(`\r 🔍 Searching waypoint ${(d.index ?? 0) + 1}/${d.total ?? '?'}...`);
|
||
} else if (t === 'node') {
|
||
process.stdout.write(`\r 📧 Enriched: ${d.title?.substring(0, 40) ?? ''} `);
|
||
} else if (t === 'node-error') {
|
||
process.stdout.write(`\r ⚠️ Error: ${d.node?.title?.substring(0, 40) ?? ''} `);
|
||
} else if (t === 'job_result') {
|
||
console.log(`\n 🏁 Pipeline complete!`);
|
||
if (this.onComplete) this.onComplete(msg);
|
||
}
|
||
},
|
||
};
|
||
}
|
||
|
||
let passed = 0;
|
||
let failed = 0;
|
||
function assert(condition, label) {
|
||
if (condition) { console.log(` ✅ ${label}`); passed++; }
|
||
else { console.error(` ❌ ${label}`); failed++; }
|
||
}
|
||
|
||
async function run() {
|
||
console.log('🧪 Gridsearch UDS / Named Pipe E2E Test\n');
|
||
|
||
// 1. Spawn worker in UDS mode
|
||
console.log('1. Spawning remote C++ Taskflow Daemon');
|
||
const worker = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG, '--daemon'], { stdio: 'inherit' });
|
||
|
||
// Give the daemon a moment to boot
|
||
console.log('2. Connecting net.Socket with retries...');
|
||
|
||
let socket;
|
||
for (let i = 0; i < 15; i++) {
|
||
try {
|
||
await new Promise((resolve, reject) => {
|
||
if (IS_WIN) {
|
||
socket = net.connect({ port: 4000, host: '127.0.0.1' });
|
||
} else {
|
||
socket = net.connect(CPP_UDS_ARG);
|
||
}
|
||
socket.once('connect', resolve);
|
||
socket.once('error', reject);
|
||
});
|
||
console.log(' ✅ Socket Connected to UDS!');
|
||
break;
|
||
} catch (e) {
|
||
if (i === 14) throw e;
|
||
await new Promise(r => setTimeout(r, 500));
|
||
}
|
||
}
|
||
|
||
const collector = createCollector();
|
||
let buffer = Buffer.alloc(0);
|
||
|
||
// Buffer framing logic (length-prefixed streaming)
|
||
socket.on('data', (chunk) => {
|
||
buffer = Buffer.concat([buffer, chunk]);
|
||
while (buffer.length >= 4) {
|
||
const len = buffer.readUInt32LE(0);
|
||
if (buffer.length >= 4 + len) {
|
||
const payload = buffer.toString('utf8', 4, 4 + len);
|
||
buffer = buffer.subarray(4 + len);
|
||
try {
|
||
const msg = JSON.parse(payload);
|
||
collector.handler(msg);
|
||
} catch (e) {
|
||
console.error("JSON PARSE ERROR:", e, payload);
|
||
}
|
||
} else {
|
||
break; // Wait for more chunks
|
||
}
|
||
}
|
||
});
|
||
|
||
// 3. Send Gridsearch payload
|
||
const sampleConfig = JSON.parse(
|
||
readFileSync(resolve(__dirname, '..', 'config', 'gridsearch-bcn-universities.json'), 'utf8')
|
||
);
|
||
|
||
sampleConfig.configPath = resolve(__dirname, '..', 'config', 'postgres.toml');
|
||
sampleConfig.jobId = 'uds-test-cancel-abc';
|
||
|
||
console.log('3. Writing serialized IPC Payload over pipe...');
|
||
const jsonStr = JSON.stringify(sampleConfig);
|
||
const lenBuf = Buffer.alloc(4);
|
||
lenBuf.writeUInt32LE(Buffer.byteLength(jsonStr));
|
||
socket.write(lenBuf);
|
||
socket.write(jsonStr);
|
||
|
||
// Send cancellation after 5 seconds
|
||
if (TEST_CANCEL) {
|
||
setTimeout(() => {
|
||
console.log('\n\n--> Testing Dynamic Cancellation (Sending cancel event for uds-test-cancel-abc)...');
|
||
const cancelPayload = JSON.stringify({ action: "cancel", jobId: "uds-test-cancel-abc" });
|
||
const cancelLenBuf = Buffer.alloc(4);
|
||
cancelLenBuf.writeUInt32LE(Buffer.byteLength(cancelPayload));
|
||
socket.write(cancelLenBuf);
|
||
socket.write(cancelPayload);
|
||
}, 5000);
|
||
}
|
||
|
||
// 4. Wait for pipeline completion (job_result event) or timeout
|
||
console.log('\n4. Awaiting multi-threaded Execution Pipeline (can take minutes)...\n');
|
||
|
||
await new Promise((resolve) => {
|
||
collector.onComplete = () => {
|
||
// Send stop command to gracefully shut down the daemon
|
||
console.log(' 📤 Sending stop command to daemon...');
|
||
const stopPayload = JSON.stringify({ action: 'stop' });
|
||
const stopLen = Buffer.alloc(4);
|
||
stopLen.writeUInt32LE(Buffer.byteLength(stopPayload));
|
||
socket.write(stopLen);
|
||
socket.write(stopPayload);
|
||
setTimeout(resolve, 1000); // Give daemon a moment to ack
|
||
};
|
||
|
||
// Safety timeout
|
||
setTimeout(() => {
|
||
console.log('\n ⏰ Timeout reached (120s) — forcing shutdown.');
|
||
resolve();
|
||
}, 120000);
|
||
});
|
||
|
||
console.log('\n\n5. Event summary');
|
||
for (const [k, v] of Object.entries(collector.events)) {
|
||
console.log(` ${k}: ${v.length}`);
|
||
}
|
||
|
||
// Assertions
|
||
const ev = collector.events;
|
||
assert(ev['grid-ready'].length === 1, 'grid-ready emitted once');
|
||
assert(ev['waypoint-start'].length > 0, 'waypoint-start events received');
|
||
assert(ev['location'].length > 0, 'location events received');
|
||
assert(ev['enrich-start'].length === 1, 'enrich-start emitted once');
|
||
assert(ev['job_result'].length === 1, 'job_result emitted once');
|
||
|
||
// Check enrichment skip log (if present in log events)
|
||
const logEvents = ev['log'] ?? [];
|
||
const skipLog = logEvents.find(l =>
|
||
typeof l.data === 'string' && l.data.includes('already enriched')
|
||
);
|
||
const nodeCount = ev['node'].length + ev['node-error'].length;
|
||
if (skipLog) {
|
||
console.log(` ℹ️ Pre-enrich skip detected: ${skipLog.data}`);
|
||
assert(nodeCount === 0, 'no enrichment needed (all skipped)');
|
||
} else {
|
||
console.log(' ℹ️ No pre-enrich skips (all locations are new or unenriched)');
|
||
assert(nodeCount > 0, 'enrichment node events received');
|
||
}
|
||
|
||
// Check filterTypes assertions: all locations must have website + matching type
|
||
const FILTER_TYPE = 'Recycling center';
|
||
const locations = ev['location'];
|
||
const badWebsite = locations.filter(l => {
|
||
const loc = l.data?.location;
|
||
return !loc?.website;
|
||
});
|
||
|
||
assert(badWebsite.length === 0, `all locations have website (${badWebsite.length} missing)`);
|
||
|
||
const badType = locations.filter(l => {
|
||
const loc = l.data?.location;
|
||
const types = loc?.types ?? [];
|
||
const type = loc?.type ?? '';
|
||
return !types.includes(FILTER_TYPE) && type !== FILTER_TYPE;
|
||
});
|
||
if (badType.length > 0) {
|
||
console.log(` ❌ Mismatched locations:`);
|
||
badType.slice(0, 3).forEach(l => console.log(JSON.stringify(l.data?.location, null, 2)));
|
||
}
|
||
assert(badType.length === 0, `all locations match type "${FILTER_TYPE}" (${badType.length} mismatched)`);
|
||
|
||
const filterLog = logEvents.find(l =>
|
||
typeof l.data === 'string' && l.data.includes('locations removed')
|
||
);
|
||
if (filterLog) {
|
||
console.log(` ℹ️ Filter applied: ${filterLog.data}`);
|
||
}
|
||
|
||
const filterTypesLog = logEvents.filter(l =>
|
||
typeof l.data === 'string' && (l.data.includes('filterTypes:') || l.data.includes(' - '))
|
||
);
|
||
if (filterTypesLog.length > 0) {
|
||
console.log(` ℹ️ Parsed filterTypes in C++:`);
|
||
filterTypesLog.forEach(l => console.log(` ${l.data}`));
|
||
}
|
||
|
||
console.log(` ℹ️ Locations after filter: ${locations.length}`);
|
||
|
||
console.log('6. Cleanup');
|
||
socket.destroy();
|
||
worker.kill('SIGTERM');
|
||
|
||
console.log(`\n────────────────────────────────`);
|
||
console.log(` Passed: ${passed} Failed: ${failed}`);
|
||
console.log(`────────────────────────────────`);
|
||
process.exit(failed > 0 ? 1 : 0);
|
||
}
|
||
|
||
run().catch(e => {
|
||
console.error(e);
|
||
process.exit(1);
|
||
});
|