mono-cpp/orchestrator/test-gridsearch-ipc-uds-meta.mjs
2026-03-28 13:11:29 +01:00

219 lines
7.2 KiB
JavaScript

/**
* orchestrator/test-gridsearch-ipc-uds-meta.mjs
*
* E2E test for Unix Domain Sockets / Windows Named Pipes (Meta Enrichment)!
* Spawns the worker in `--uds` mode and tests direct high-throughput
* lock-free JSON binary framing over a net.Socket.
*/
import { spawn } from 'node:child_process';
import { resolve, dirname, join } from 'node:path';
import { readFileSync, existsSync, unlinkSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
import net from 'node:net';
import { tmpdir } from 'node:os';
const __dirname = dirname(fileURLToPath(import.meta.url));
const IS_WIN = process.platform === 'win32';
const EXE_NAME = IS_WIN ? 'polymech-cli.exe' : 'polymech-cli';
const EXE = resolve(__dirname, '..', 'dist', EXE_NAME);
const TEST_CANCEL = false;
if (!existsSync(EXE)) {
console.error(`❌ Binary not found at ${EXE}`);
process.exit(1);
}
const PIPE_NAME = 'polymech-test-uds-meta';
const CPP_UDS_ARG = IS_WIN ? '4001' : join(tmpdir(), `${PIPE_NAME}.sock`);
if (!IS_WIN && existsSync(CPP_UDS_ARG)) {
unlinkSync(CPP_UDS_ARG);
}
console.log(`Binary: ${EXE}`);
console.log(`C++ Arg: ${CPP_UDS_ARG}\n`);
// ── Event collector ─────────────────────────────────────────────────────────
function createCollector() {
const events = {};
for (const t of ['grid-ready', 'waypoint-start', 'area', 'location',
'enrich-start', 'node', 'node-error', 'nodePage', 'job_result']) {
events[t] = [];
}
return {
events,
onComplete: null,
handler(msg) {
const t = msg.type;
if (events[t]) events[t].push(msg);
else events[t] = [msg];
const d = msg.data ?? {};
if (t === 'waypoint-start') {
process.stdout.write(`\r 🔍 Searching waypoint ${(d.index ?? 0) + 1}/${d.total ?? '?'}...`);
} else if (t === 'node') {
process.stdout.write(`\r 📧 Enriched: ${d.title?.substring(0, 40) ?? ''} `);
} else if (t === 'node-error') {
process.stdout.write(`\r ⚠️ Error: ${d.node?.title?.substring(0, 40) ?? ''} `);
} else if (t === 'job_result') {
console.log(`\n 🏁 Pipeline complete!`);
if (this.onComplete) this.onComplete(msg);
}
},
};
}
let passed = 0;
let failed = 0;
function assert(condition, label) {
if (condition) { console.log(`${label}`); passed++; }
else { console.error(`${label}`); failed++; }
}
async function run() {
console.log('🧪 Gridsearch UDS Meta E2E Test\n');
// 1. Spawn worker in UDS mode
console.log('1. Spawning remote C++ Taskflow Daemon');
const worker = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG, '--daemon'], { stdio: 'inherit' });
// Give the daemon a moment to boot
console.log('2. Connecting net.Socket with retries...');
let socket;
for (let i = 0; i < 15; i++) {
try {
await new Promise((resolve, reject) => {
if (IS_WIN) {
socket = net.connect({ port: 4001, host: '127.0.0.1' });
} else {
socket = net.connect(CPP_UDS_ARG);
}
socket.once('connect', resolve);
socket.once('error', reject);
});
console.log(' ✅ Socket Connected to UDS!');
break;
} catch (e) {
if (i === 14) throw e;
await new Promise(r => setTimeout(r, 500));
}
}
const collector = createCollector();
let buffer = Buffer.alloc(0);
// Buffer framing logic (length-prefixed streaming)
socket.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= 4) {
const len = buffer.readUInt32LE(0);
if (buffer.length >= 4 + len) {
const payload = buffer.toString('utf8', 4, 4 + len);
buffer = buffer.subarray(4 + len);
try {
const msg = JSON.parse(payload);
collector.handler(msg);
} catch (e) {
console.error("JSON PARSE ERROR:", e, payload);
}
} else {
break; // Wait for more chunks
}
}
});
// 3. Send Gridsearch payload
// USE gridsearch-sample.json instead of gridsearch-bcn-universities.json
const sampleConfig = JSON.parse(
readFileSync(resolve(__dirname, '..', 'config', 'gridsearch-sample.json'), 'utf8')
);
sampleConfig.configPath = resolve(__dirname, '..', 'config', 'postgres.toml');
sampleConfig.jobId = 'uds-meta-test-abc';
sampleConfig.noCache = true; // force re-enrichment even if cached
console.log('3. Writing serialized IPC Payload over pipe...');
const jsonStr = JSON.stringify(sampleConfig);
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32LE(Buffer.byteLength(jsonStr));
socket.write(lenBuf);
socket.write(jsonStr);
// 4. Wait for pipeline completion (job_result event) or timeout
console.log('\n4. Awaiting multi-threaded Execution Pipeline (can take minutes)...\n');
await new Promise((resolve) => {
collector.onComplete = () => {
// Send stop command to gracefully shut down the daemon
console.log(' 📤 Sending stop command to daemon...');
const stopPayload = JSON.stringify({ action: 'stop' });
const stopLen = Buffer.alloc(4);
stopLen.writeUInt32LE(Buffer.byteLength(stopPayload));
socket.write(stopLen);
socket.write(stopPayload);
setTimeout(resolve, 1000); // Give daemon a moment to ack
};
// Safety timeout
setTimeout(() => {
console.log('\n ⏰ Timeout reached (300s) — forcing shutdown.');
resolve();
}, 300000); // Wait up to 5 minutes
});
console.log('\n\n5. Event summary');
for (const [k, v] of Object.entries(collector.events)) {
console.log(` ${k}: ${v.length}`);
}
// Assertions
const ev = collector.events;
assert(ev['grid-ready'].length === 1, 'grid-ready emitted once');
assert(ev['waypoint-start'].length > 0, 'waypoint-start events received');
assert(ev['location'].length > 0, 'location events received');
assert(ev['enrich-start'].length === 1, 'enrich-start emitted once');
assert(ev['job_result'].length === 1, 'job_result emitted once');
// Verify social profiles and md body
const nodes = ev['node'];
let foundSocial = false;
let foundSiteMd = false;
for (const n of nodes) {
const d = n.data;
if (!d) continue;
if (d.socials && d.socials.length > 0) {
foundSocial = true;
}
if (d.sites && Array.isArray(d.sites) && d.sites.length > 0) {
foundSiteMd = true;
}
}
if (foundSocial) {
assert(foundSocial, 'At least one enriched node has social media profiles discovered');
} else {
console.log(' ⚠️ No social media profiles discovered in this run (data-dependent), but pipeline completed.');
}
assert(foundSiteMd, 'At least one enriched node has markdown sites mapped');
console.log('6. Cleanup');
socket.destroy();
worker.kill('SIGTERM');
console.log(`\n────────────────────────────────`);
console.log(` Passed: ${passed} Failed: ${failed}`);
console.log(`────────────────────────────────`);
process.exit(failed > 0 ? 1 : 0);
}
run().catch(e => {
console.error(e);
process.exit(1);
});