mono/packages/kbot/cpp/orchestrator/test-ipc.mjs
2026-03-30 10:46:50 +02:00

181 lines
6.7 KiB
JavaScript

/**
* orchestrator/test-ipc.mjs
*
* Integration test: spawn the C++ worker in UDS mode, exchange messages, verify responses.
*
* Run: npm run test:ipc
*/
import { spawn } from 'node:child_process';
import { resolve, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import net from 'node:net';
import { randomUUID } from 'node:crypto';
import { existsSync, unlinkSync } from 'node:fs';
const __dirname = dirname(fileURLToPath(import.meta.url));
const IS_WIN = process.platform === 'win32';
const EXE_NAME = IS_WIN ? 'kbot.exe' : 'kbot';
const EXE = resolve(__dirname, '..', 'dist', EXE_NAME);
let passed = 0;
let failed = 0;
function assert(condition, label) {
if (condition) {
console.log(`${label}`);
passed++;
} else {
console.error(`${label}`);
failed++;
}
}
async function run() {
console.log('\n🔧 IPC [UDS] Integration Tests\n');
if (!existsSync(EXE)) {
console.error(`❌ Binary not found at ${EXE}`);
process.exit(1);
}
const CPP_UDS_ARG = IS_WIN ? '4001' : '/tmp/kbot-test-ipc.sock';
if (!IS_WIN && existsSync(CPP_UDS_ARG)) {
unlinkSync(CPP_UDS_ARG);
}
// ── 1. Spawn & ready ────────────────────────────────────────────────────
console.log('1. Spawn worker (UDS mode) and wait for ready signal');
const workerProc = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG], { stdio: 'pipe' });
workerProc.stderr.on('data', d => {
const txt = d.toString().trim();
if (txt) console.error(`[worker:stderr] ${txt}`);
});
let socket;
for (let i = 0; i < 15; i++) {
try {
await new Promise((res, rej) => {
if (IS_WIN) {
socket = net.connect({ port: 4001, host: '127.0.0.1' });
} else {
socket = net.connect(CPP_UDS_ARG);
}
socket.once('connect', res);
socket.once('error', rej);
});
break;
} catch (e) {
if (i === 14) throw e;
await new Promise(r => setTimeout(r, 400));
}
}
assert(true, 'Socket connected successfully');
// Pending request map: id → { resolve, reject, timer }
const pending = new Map();
let readyResolve;
const readyPromise = new Promise(res => { readyResolve = res; });
let buffer = Buffer.alloc(0);
socket.on('data', chunk => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= 4) {
const len = buffer.readUInt32LE(0);
if (buffer.length >= 4 + len) {
const payload = buffer.toString('utf8', 4, 4 + len);
buffer = buffer.subarray(4 + len);
try {
const msg = JSON.parse(payload);
if (msg.type === 'ready') {
readyResolve(msg);
} else if (msg.id && pending.has(msg.id)) {
const p = pending.get(msg.id);
clearTimeout(p.timer);
pending.delete(msg.id);
p.resolve(msg);
}
} catch (e) {
console.error('[orchestrator] frame parse error', e);
}
} else {
break;
}
}
});
function request(msg, timeoutMs = 5000) {
return new Promise((resolve, reject) => {
const id = msg.id || randomUUID();
msg.id = id;
const timer = setTimeout(() => {
pending.delete(id);
reject(new Error(`IPC request timed out`));
}, timeoutMs);
pending.set(id, { resolve, reject, timer });
const str = JSON.stringify(msg);
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32LE(Buffer.byteLength(str));
socket.write(lenBuf);
socket.write(str);
});
}
const readyMsg = await readyPromise;
assert(readyMsg.type === 'ready', 'Worker sends ready message on startup');
// ── 2. Ping / Pong ─────────────────────────────────────────────────────
console.log('2. Ping → Pong');
const pong = await request({ type: 'ping' });
assert(pong.type === 'pong', `Response type is "pong" (got "${pong.type}")`);
// ── 3. Job echo ─────────────────────────────────────────────────────────
console.log('3. Job → Job Result (echo payload)');
const payload = { action: 'resize', width: 1024, format: 'webp' };
const jobResult = await request({ type: 'job', payload });
assert(jobResult.type === 'job_result', `Response type is "job_result" (got "${jobResult.type}")`);
assert(
jobResult.payload?.action === 'resize' && jobResult.payload?.width === 1024,
'Payload echoed back correctly'
);
// ── 4. Unknown type → error ─────────────────────────────────────────────
console.log('4. Unknown type → error response');
const errResp = await request({ type: 'nonsense' });
assert(errResp.type === 'error', `Response type is "error" (got "${errResp.type}")`);
// ── 5. Multiple rapid requests ──────────────────────────────────────────
console.log('5. Multiple concurrent requests');
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(request({ type: 'ping', payload: { seq: i } }));
}
const results = await Promise.all(promises);
assert(results.length === 10, `All 10 responses received`);
assert(results.every(r => r.type === 'pong'), 'All responses are pong');
// ── 6. Graceful shutdown ────────────────────────────────────────────────
console.log('6. Graceful shutdown');
const shutdownRes = await request({ type: 'shutdown' });
assert(shutdownRes.type === 'shutdown_ack', `Shutdown acknowledged (got "${shutdownRes.type}")`);
// Wait a beat for process exit
await new Promise(r => setTimeout(r, 200));
socket.destroy();
assert(workerProc.exitCode === 0, `Worker exited with code 0 (got ${workerProc.exitCode})`);
// ── Summary ─────────────────────────────────────────────────────────────
console.log(`\n────────────────────────────────`);
console.log(` Passed: ${passed} Failed: ${failed}`);
console.log(`────────────────────────────────\n`);
process.exit(failed > 0 ? 1 : 0);
}
run().catch((err) => {
console.error('Test runner error:', err);
process.exit(1);
});