219 lines
7.2 KiB
JavaScript
219 lines
7.2 KiB
JavaScript
/**
|
|
* orchestrator/test-gridsearch-ipc-uds-meta.mjs
|
|
*
|
|
* E2E test for Unix Domain Sockets / Windows Named Pipes (Meta Enrichment)!
|
|
* Spawns the worker in `--uds` mode and tests direct high-throughput
|
|
* lock-free JSON binary framing over a net.Socket.
|
|
*/
|
|
|
|
import { spawn } from 'node:child_process';
|
|
import { resolve, dirname, join } from 'node:path';
|
|
import { readFileSync, existsSync, unlinkSync } from 'node:fs';
|
|
import { fileURLToPath } from 'node:url';
|
|
import net from 'node:net';
|
|
import { tmpdir } from 'node:os';
|
|
|
|
const __dirname = dirname(fileURLToPath(import.meta.url));
|
|
const IS_WIN = process.platform === 'win32';
|
|
const EXE_NAME = IS_WIN ? 'polymech-cli.exe' : 'polymech-cli';
|
|
const EXE = resolve(__dirname, '..', 'dist', EXE_NAME);
|
|
const TEST_CANCEL = false;
|
|
|
|
if (!existsSync(EXE)) {
|
|
console.error(`❌ Binary not found at ${EXE}`);
|
|
process.exit(1);
|
|
}
|
|
|
|
const PIPE_NAME = 'polymech-test-uds-meta';
|
|
const CPP_UDS_ARG = IS_WIN ? '4001' : join(tmpdir(), `${PIPE_NAME}.sock`);
|
|
|
|
if (!IS_WIN && existsSync(CPP_UDS_ARG)) {
|
|
unlinkSync(CPP_UDS_ARG);
|
|
}
|
|
|
|
console.log(`Binary: ${EXE}`);
|
|
console.log(`C++ Arg: ${CPP_UDS_ARG}\n`);
|
|
|
|
// ── Event collector ─────────────────────────────────────────────────────────
|
|
function createCollector() {
|
|
const events = {};
|
|
for (const t of ['grid-ready', 'waypoint-start', 'area', 'location',
|
|
'enrich-start', 'node', 'node-error', 'nodePage', 'job_result']) {
|
|
events[t] = [];
|
|
}
|
|
return {
|
|
events,
|
|
onComplete: null,
|
|
handler(msg) {
|
|
const t = msg.type;
|
|
if (events[t]) events[t].push(msg);
|
|
else events[t] = [msg];
|
|
|
|
const d = msg.data ?? {};
|
|
if (t === 'waypoint-start') {
|
|
process.stdout.write(`\r 🔍 Searching waypoint ${(d.index ?? 0) + 1}/${d.total ?? '?'}...`);
|
|
} else if (t === 'node') {
|
|
process.stdout.write(`\r 📧 Enriched: ${d.title?.substring(0, 40) ?? ''} `);
|
|
} else if (t === 'node-error') {
|
|
process.stdout.write(`\r ⚠️ Error: ${d.node?.title?.substring(0, 40) ?? ''} `);
|
|
} else if (t === 'job_result') {
|
|
console.log(`\n 🏁 Pipeline complete!`);
|
|
if (this.onComplete) this.onComplete(msg);
|
|
}
|
|
},
|
|
};
|
|
}
|
|
|
|
let passed = 0;
|
|
let failed = 0;
|
|
function assert(condition, label) {
|
|
if (condition) { console.log(` ✅ ${label}`); passed++; }
|
|
else { console.error(` ❌ ${label}`); failed++; }
|
|
}
|
|
|
|
async function run() {
|
|
console.log('🧪 Gridsearch UDS Meta E2E Test\n');
|
|
|
|
// 1. Spawn worker in UDS mode
|
|
console.log('1. Spawning remote C++ Taskflow Daemon');
|
|
const worker = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG, '--daemon'], { stdio: 'inherit' });
|
|
|
|
// Give the daemon a moment to boot
|
|
console.log('2. Connecting net.Socket with retries...');
|
|
|
|
let socket;
|
|
for (let i = 0; i < 15; i++) {
|
|
try {
|
|
await new Promise((resolve, reject) => {
|
|
if (IS_WIN) {
|
|
socket = net.connect({ port: 4001, host: '127.0.0.1' });
|
|
} else {
|
|
socket = net.connect(CPP_UDS_ARG);
|
|
}
|
|
socket.once('connect', resolve);
|
|
socket.once('error', reject);
|
|
});
|
|
console.log(' ✅ Socket Connected to UDS!');
|
|
break;
|
|
} catch (e) {
|
|
if (i === 14) throw e;
|
|
await new Promise(r => setTimeout(r, 500));
|
|
}
|
|
}
|
|
|
|
const collector = createCollector();
|
|
let buffer = Buffer.alloc(0);
|
|
|
|
// Buffer framing logic (length-prefixed streaming)
|
|
socket.on('data', (chunk) => {
|
|
buffer = Buffer.concat([buffer, chunk]);
|
|
while (buffer.length >= 4) {
|
|
const len = buffer.readUInt32LE(0);
|
|
if (buffer.length >= 4 + len) {
|
|
const payload = buffer.toString('utf8', 4, 4 + len);
|
|
buffer = buffer.subarray(4 + len);
|
|
try {
|
|
const msg = JSON.parse(payload);
|
|
collector.handler(msg);
|
|
} catch (e) {
|
|
console.error("JSON PARSE ERROR:", e, payload);
|
|
}
|
|
} else {
|
|
break; // Wait for more chunks
|
|
}
|
|
}
|
|
});
|
|
|
|
// 3. Send Gridsearch payload
|
|
// USE gridsearch-sample.json instead of gridsearch-bcn-universities.json
|
|
const sampleConfig = JSON.parse(
|
|
readFileSync(resolve(__dirname, '..', 'config', 'gridsearch-sample.json'), 'utf8')
|
|
);
|
|
|
|
sampleConfig.configPath = resolve(__dirname, '..', 'config', 'postgres.toml');
|
|
sampleConfig.jobId = 'uds-meta-test-abc';
|
|
sampleConfig.noCache = true; // force re-enrichment even if cached
|
|
|
|
console.log('3. Writing serialized IPC Payload over pipe...');
|
|
const jsonStr = JSON.stringify(sampleConfig);
|
|
const lenBuf = Buffer.alloc(4);
|
|
lenBuf.writeUInt32LE(Buffer.byteLength(jsonStr));
|
|
socket.write(lenBuf);
|
|
socket.write(jsonStr);
|
|
|
|
// 4. Wait for pipeline completion (job_result event) or timeout
|
|
console.log('\n4. Awaiting multi-threaded Execution Pipeline (can take minutes)...\n');
|
|
|
|
await new Promise((resolve) => {
|
|
collector.onComplete = () => {
|
|
// Send stop command to gracefully shut down the daemon
|
|
console.log(' 📤 Sending stop command to daemon...');
|
|
const stopPayload = JSON.stringify({ action: 'stop' });
|
|
const stopLen = Buffer.alloc(4);
|
|
stopLen.writeUInt32LE(Buffer.byteLength(stopPayload));
|
|
socket.write(stopLen);
|
|
socket.write(stopPayload);
|
|
setTimeout(resolve, 1000); // Give daemon a moment to ack
|
|
};
|
|
|
|
// Safety timeout
|
|
setTimeout(() => {
|
|
console.log('\n ⏰ Timeout reached (300s) — forcing shutdown.');
|
|
resolve();
|
|
}, 300000); // Wait up to 5 minutes
|
|
});
|
|
|
|
console.log('\n\n5. Event summary');
|
|
for (const [k, v] of Object.entries(collector.events)) {
|
|
console.log(` ${k}: ${v.length}`);
|
|
}
|
|
|
|
// Assertions
|
|
const ev = collector.events;
|
|
assert(ev['grid-ready'].length === 1, 'grid-ready emitted once');
|
|
assert(ev['waypoint-start'].length > 0, 'waypoint-start events received');
|
|
assert(ev['location'].length > 0, 'location events received');
|
|
assert(ev['enrich-start'].length === 1, 'enrich-start emitted once');
|
|
assert(ev['job_result'].length === 1, 'job_result emitted once');
|
|
|
|
// Verify social profiles and md body
|
|
const nodes = ev['node'];
|
|
let foundSocial = false;
|
|
let foundSiteMd = false;
|
|
|
|
for (const n of nodes) {
|
|
const d = n.data;
|
|
if (!d) continue;
|
|
|
|
if (d.socials && d.socials.length > 0) {
|
|
foundSocial = true;
|
|
}
|
|
|
|
if (d.sites && Array.isArray(d.sites) && d.sites.length > 0) {
|
|
foundSiteMd = true;
|
|
}
|
|
}
|
|
|
|
if (foundSocial) {
|
|
assert(foundSocial, 'At least one enriched node has social media profiles discovered');
|
|
} else {
|
|
console.log(' ⚠️ No social media profiles discovered in this run (data-dependent), but pipeline completed.');
|
|
}
|
|
|
|
assert(foundSiteMd, 'At least one enriched node has markdown sites mapped');
|
|
|
|
console.log('6. Cleanup');
|
|
socket.destroy();
|
|
worker.kill('SIGTERM');
|
|
|
|
console.log(`\n────────────────────────────────`);
|
|
console.log(` Passed: ${passed} Failed: ${failed}`);
|
|
console.log(`────────────────────────────────`);
|
|
process.exit(failed > 0 ? 1 : 0);
|
|
}
|
|
|
|
run().catch(e => {
|
|
console.error(e);
|
|
process.exit(1);
|
|
});
|