mono-cpp/orchestrator/test-gridsearch-ipc-uds.mjs
2026-03-28 13:11:29 +01:00

256 lines
8.8 KiB
JavaScript
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* orchestrator/test-gridsearch-ipc-uds.mjs
*
* E2E test for Unix Domain Sockets / Windows Named Pipes!
* Spawns the worker in `--uds` mode and tests direct high-throughput
* lock-free JSON binary framing over a net.Socket.
*/
import { spawn } from 'node:child_process';
import { resolve, dirname, join } from 'node:path';
import { readFileSync, existsSync, unlinkSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
import net from 'node:net';
import { tmpdir } from 'node:os';
const __dirname = dirname(fileURLToPath(import.meta.url));
const IS_WIN = process.platform === 'win32';
const EXE_NAME = IS_WIN ? 'polymech-cli.exe' : 'polymech-cli';
const EXE = resolve(__dirname, '..', 'dist', EXE_NAME);
const TEST_CANCEL = false;
if (!existsSync(EXE)) {
console.error(`❌ Binary not found at ${EXE}`);
process.exit(1);
}
const PIPE_NAME = 'polymech-test-uds';
const CPP_UDS_ARG = IS_WIN ? '4000' : join(tmpdir(), `${PIPE_NAME}.sock`);
if (!IS_WIN && existsSync(CPP_UDS_ARG)) {
unlinkSync(CPP_UDS_ARG);
}
console.log(`Binary: ${EXE}`);
console.log(`C++ Arg: ${CPP_UDS_ARG}\n`);
// ── Event collector ─────────────────────────────────────────────────────────
function createCollector() {
const events = {};
for (const t of ['grid-ready', 'waypoint-start', 'area', 'location',
'enrich-start', 'node', 'node-error', 'nodePage', 'job_result']) {
events[t] = [];
}
return {
events,
onComplete: null,
handler(msg) {
const t = msg.type;
if (events[t]) events[t].push(msg);
else events[t] = [msg];
const d = msg.data ?? {};
if (t === 'waypoint-start') {
process.stdout.write(`\r 🔍 Searching waypoint ${(d.index ?? 0) + 1}/${d.total ?? '?'}...`);
} else if (t === 'node') {
process.stdout.write(`\r 📧 Enriched: ${d.title?.substring(0, 40) ?? ''} `);
} else if (t === 'node-error') {
process.stdout.write(`\r ⚠️ Error: ${d.node?.title?.substring(0, 40) ?? ''} `);
} else if (t === 'job_result') {
console.log(`\n 🏁 Pipeline complete!`);
if (this.onComplete) this.onComplete(msg);
}
},
};
}
let passed = 0;
let failed = 0;
function assert(condition, label) {
if (condition) { console.log(`${label}`); passed++; }
else { console.error(`${label}`); failed++; }
}
async function run() {
console.log('🧪 Gridsearch UDS / Named Pipe E2E Test\n');
// 1. Spawn worker in UDS mode
console.log('1. Spawning remote C++ Taskflow Daemon');
const worker = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG, '--daemon'], { stdio: 'inherit' });
// Give the daemon a moment to boot
console.log('2. Connecting net.Socket with retries...');
let socket;
for (let i = 0; i < 15; i++) {
try {
await new Promise((resolve, reject) => {
if (IS_WIN) {
socket = net.connect({ port: 4000, host: '127.0.0.1' });
} else {
socket = net.connect(CPP_UDS_ARG);
}
socket.once('connect', resolve);
socket.once('error', reject);
});
console.log(' ✅ Socket Connected to UDS!');
break;
} catch (e) {
if (i === 14) throw e;
await new Promise(r => setTimeout(r, 500));
}
}
const collector = createCollector();
let buffer = Buffer.alloc(0);
// Buffer framing logic (length-prefixed streaming)
socket.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= 4) {
const len = buffer.readUInt32LE(0);
if (buffer.length >= 4 + len) {
const payload = buffer.toString('utf8', 4, 4 + len);
buffer = buffer.subarray(4 + len);
try {
const msg = JSON.parse(payload);
collector.handler(msg);
} catch (e) {
console.error("JSON PARSE ERROR:", e, payload);
}
} else {
break; // Wait for more chunks
}
}
});
// 3. Send Gridsearch payload
const sampleConfig = JSON.parse(
readFileSync(resolve(__dirname, '..', 'config', 'gridsearch-bcn-universities.json'), 'utf8')
);
sampleConfig.configPath = resolve(__dirname, '..', 'config', 'postgres.toml');
sampleConfig.jobId = 'uds-test-cancel-abc';
console.log('3. Writing serialized IPC Payload over pipe...');
const jsonStr = JSON.stringify(sampleConfig);
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32LE(Buffer.byteLength(jsonStr));
socket.write(lenBuf);
socket.write(jsonStr);
// Send cancellation after 5 seconds
if (TEST_CANCEL) {
setTimeout(() => {
console.log('\n\n--> Testing Dynamic Cancellation (Sending cancel event for uds-test-cancel-abc)...');
const cancelPayload = JSON.stringify({ action: "cancel", jobId: "uds-test-cancel-abc" });
const cancelLenBuf = Buffer.alloc(4);
cancelLenBuf.writeUInt32LE(Buffer.byteLength(cancelPayload));
socket.write(cancelLenBuf);
socket.write(cancelPayload);
}, 5000);
}
// 4. Wait for pipeline completion (job_result event) or timeout
console.log('\n4. Awaiting multi-threaded Execution Pipeline (can take minutes)...\n');
await new Promise((resolve) => {
collector.onComplete = () => {
// Send stop command to gracefully shut down the daemon
console.log(' 📤 Sending stop command to daemon...');
const stopPayload = JSON.stringify({ action: 'stop' });
const stopLen = Buffer.alloc(4);
stopLen.writeUInt32LE(Buffer.byteLength(stopPayload));
socket.write(stopLen);
socket.write(stopPayload);
setTimeout(resolve, 1000); // Give daemon a moment to ack
};
// Safety timeout
setTimeout(() => {
console.log('\n ⏰ Timeout reached (120s) — forcing shutdown.');
resolve();
}, 120000);
});
console.log('\n\n5. Event summary');
for (const [k, v] of Object.entries(collector.events)) {
console.log(` ${k}: ${v.length}`);
}
// Assertions
const ev = collector.events;
assert(ev['grid-ready'].length === 1, 'grid-ready emitted once');
assert(ev['waypoint-start'].length > 0, 'waypoint-start events received');
assert(ev['location'].length > 0, 'location events received');
assert(ev['enrich-start'].length === 1, 'enrich-start emitted once');
assert(ev['job_result'].length === 1, 'job_result emitted once');
// Check enrichment skip log (if present in log events)
const logEvents = ev['log'] ?? [];
const skipLog = logEvents.find(l =>
typeof l.data === 'string' && l.data.includes('already enriched')
);
const nodeCount = ev['node'].length + ev['node-error'].length;
if (skipLog) {
console.log(` Pre-enrich skip detected: ${skipLog.data}`);
assert(nodeCount === 0, 'no enrichment needed (all skipped)');
} else {
console.log(' No pre-enrich skips (all locations are new or unenriched)');
assert(nodeCount > 0, 'enrichment node events received');
}
// Check filterTypes assertions: all locations must have website + matching type
const FILTER_TYPE = 'Recycling center';
const locations = ev['location'];
const badWebsite = locations.filter(l => {
const loc = l.data?.location;
return !loc?.website;
});
assert(badWebsite.length === 0, `all locations have website (${badWebsite.length} missing)`);
const badType = locations.filter(l => {
const loc = l.data?.location;
const types = loc?.types ?? [];
const type = loc?.type ?? '';
return !types.includes(FILTER_TYPE) && type !== FILTER_TYPE;
});
if (badType.length > 0) {
console.log(` ❌ Mismatched locations:`);
badType.slice(0, 3).forEach(l => console.log(JSON.stringify(l.data?.location, null, 2)));
}
assert(badType.length === 0, `all locations match type "${FILTER_TYPE}" (${badType.length} mismatched)`);
const filterLog = logEvents.find(l =>
typeof l.data === 'string' && l.data.includes('locations removed')
);
if (filterLog) {
console.log(` Filter applied: ${filterLog.data}`);
}
const filterTypesLog = logEvents.filter(l =>
typeof l.data === 'string' && (l.data.includes('filterTypes:') || l.data.includes(' - '))
);
if (filterTypesLog.length > 0) {
console.log(` Parsed filterTypes in C++:`);
filterTypesLog.forEach(l => console.log(` ${l.data}`));
}
console.log(` Locations after filter: ${locations.length}`);
console.log('6. Cleanup');
socket.destroy();
worker.kill('SIGTERM');
console.log(`\n────────────────────────────────`);
console.log(` Passed: ${passed} Failed: ${failed}`);
console.log(`────────────────────────────────`);
process.exit(failed > 0 ? 1 : 0);
}
run().catch(e => {
console.error(e);
process.exit(1);
});