kbot cpp testing
This commit is contained in:
parent
b0916df0f5
commit
f0385f41ec
4
packages/kbot/cpp/.gitignore
vendored
4
packages/kbot/cpp/.gitignore
vendored
@ -29,4 +29,8 @@ Thumbs.db
|
||||
cache/
|
||||
config/postgres.toml
|
||||
dist
|
||||
|
||||
# Orchestrator reports (cwd/tests/*)
|
||||
tests/*.json
|
||||
tests/*.md
|
||||
src/cmd_grid*.cpp
|
||||
|
||||
@ -105,7 +105,7 @@ add_executable(${PROJECT_NAME}
|
||||
# Output file name is kbot.exe / kbot (not kbot-cli)
|
||||
set_target_properties(${PROJECT_NAME} PROPERTIES OUTPUT_NAME "kbot")
|
||||
|
||||
target_link_libraries(${PROJECT_NAME} PRIVATE CLI11::CLI11 tomlplusplus::tomlplusplus logger html postgres http json polymech ipc search kbot)
|
||||
target_link_libraries(${PROJECT_NAME} PRIVATE CLI11::CLI11 tomlplusplus::tomlplusplus logger html postgres http json polymech ipc kbot)
|
||||
|
||||
target_include_directories(${PROJECT_NAME} PRIVATE
|
||||
${asio_SOURCE_DIR}/asio/include
|
||||
|
||||
73
packages/kbot/cpp/orchestrator/presets.js
Normal file
73
packages/kbot/cpp/orchestrator/presets.js
Normal file
@ -0,0 +1,73 @@
|
||||
/**
|
||||
* orchestrator/presets.js — defaults for IPC integration tests (extend here as suites grow).
|
||||
*/
|
||||
|
||||
import { resolve } from 'node:path';
|
||||
|
||||
export const platform = {
|
||||
isWin: process.platform === 'win32',
|
||||
};
|
||||
|
||||
/** Dist binary name for the current OS. */
|
||||
export function exeName() {
|
||||
return platform.isWin ? 'kbot.exe' : 'kbot';
|
||||
}
|
||||
|
||||
/** Absolute path to kbot binary given orchestrator/ directory (where test-ipc.mjs lives). */
|
||||
export function distExePath(orchestratorDir) {
|
||||
return resolve(orchestratorDir, '..', 'dist', exeName());
|
||||
}
|
||||
|
||||
/** UDS / TCP listen argument passed to `kbot worker --uds <arg>`. */
|
||||
export const uds = {
|
||||
tcpPort: 4001,
|
||||
unixPath: '/tmp/kbot-test-ipc.sock',
|
||||
/** Value for `--uds` on this OS (Windows: port string; Unix: socket path). */
|
||||
workerArg() {
|
||||
return platform.isWin ? String(this.tcpPort) : this.unixPath;
|
||||
},
|
||||
/** Options for `net.connect` to reach the worker. */
|
||||
connectOpts(cppUdsArg) {
|
||||
return platform.isWin
|
||||
? { port: this.tcpPort, host: '127.0.0.1' }
|
||||
: cppUdsArg;
|
||||
},
|
||||
};
|
||||
|
||||
/** Millisecond timeouts — tune per step in new tests. */
|
||||
export const timeouts = {
|
||||
ipcDefault: 5000,
|
||||
kbotAi: 180_000,
|
||||
connectAttempts: 15,
|
||||
connectRetryMs: 400,
|
||||
postShutdownMs: 200,
|
||||
};
|
||||
|
||||
export const router = {
|
||||
default: 'openrouter',
|
||||
fromEnv() {
|
||||
return process.env.KBOT_ROUTER || this.default;
|
||||
},
|
||||
};
|
||||
|
||||
/** Stock prompts and assertions helpers for LLM smoke tests. */
|
||||
export const prompts = {
|
||||
germanyCapital: 'What is the capital of Germany? Reply in one short sentence.',
|
||||
};
|
||||
|
||||
/** Build `kbot-ai` IPC payload from env + presets (OpenRouter-friendly defaults). */
|
||||
export function kbotAiPayloadFromEnv() {
|
||||
const payload = {
|
||||
prompt: process.env.KBOT_IPC_PROMPT || prompts.germanyCapital,
|
||||
router: router.fromEnv(),
|
||||
};
|
||||
if (process.env.KBOT_IPC_MODEL) {
|
||||
payload.model = process.env.KBOT_IPC_MODEL;
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
/** True when using the default Germany prompt (for optional Berlin assertion). */
|
||||
export function usingDefaultGermanyPrompt() {
|
||||
return !process.env.KBOT_IPC_PROMPT;
|
||||
}
|
||||
296
packages/kbot/cpp/orchestrator/reports.js
Normal file
296
packages/kbot/cpp/orchestrator/reports.js
Normal file
@ -0,0 +1,296 @@
|
||||
/**
|
||||
* orchestrator/reports.js — JSON + Markdown reports under cwd/tests/
|
||||
*
|
||||
* File pattern (logical): test-name::hh:mm
|
||||
* On-disk: test-name__HH-mm.json / .md (Windows: no `:` in filenames)
|
||||
*/
|
||||
|
||||
import { mkdir, writeFile } from 'node:fs/promises';
|
||||
import { join, dirname } from 'node:path';
|
||||
import os from 'node:os';
|
||||
import { performance } from 'node:perf_hooks';
|
||||
import { resourceUsage } from 'node:process';
|
||||
|
||||
const WIN_BAD = /[<>:"/\\|?*\x00-\x1f]/g;
|
||||
|
||||
/** Strip characters invalid in Windows / POSIX filenames. */
|
||||
export function sanitizeTestName(name) {
|
||||
const s = String(name).trim().replace(WIN_BAD, '_').replace(/\s+/g, '_');
|
||||
return s || 'test';
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Date} [now]
|
||||
* @returns {{ hh: string, mm: string, label: string }}
|
||||
*/
|
||||
export function timeParts(now = new Date()) {
|
||||
const hh = String(now.getHours()).padStart(2, '0');
|
||||
const mm = String(now.getMinutes()).padStart(2, '0');
|
||||
return { hh, mm, label: `${hh}:${mm}` };
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} testName
|
||||
* @param {string} ext — including dot, e.g. '.json'
|
||||
* @param {{ cwd?: string, now?: Date }} [options]
|
||||
*/
|
||||
export function reportFilePathWithExt(testName, ext, options = {}) {
|
||||
const cwd = options.cwd ?? process.cwd();
|
||||
const now = options.now ?? new Date();
|
||||
const base = sanitizeTestName(testName);
|
||||
const { hh, mm } = timeParts(now);
|
||||
const file = `${base}__${hh}-${mm}${ext}`;
|
||||
return join(cwd, 'tests', file);
|
||||
}
|
||||
|
||||
export function reportFilePath(testName, options = {}) {
|
||||
return reportFilePathWithExt(testName, '.json', options);
|
||||
}
|
||||
|
||||
export function reportMarkdownPath(testName, options = {}) {
|
||||
return reportFilePathWithExt(testName, '.md', options);
|
||||
}
|
||||
|
||||
function formatBytes(n) {
|
||||
if (typeof n !== 'number' || Number.isNaN(n)) return String(n);
|
||||
const u = ['B', 'KB', 'MB', 'GB'];
|
||||
let i = 0;
|
||||
let x = n;
|
||||
while (x >= 1024 && i < u.length - 1) {
|
||||
x /= 1024;
|
||||
i++;
|
||||
}
|
||||
return `${x < 10 && i > 0 ? x.toFixed(1) : Math.round(x)} ${u[i]}`;
|
||||
}
|
||||
|
||||
/** Snapshot of host / OS (cheap; call anytime). */
|
||||
export function hostSnapshot() {
|
||||
const cpus = os.cpus();
|
||||
const total = os.totalmem();
|
||||
const free = os.freemem();
|
||||
return {
|
||||
hostname: os.hostname(),
|
||||
platform: os.platform(),
|
||||
arch: os.arch(),
|
||||
release: os.release(),
|
||||
cpuCount: cpus.length,
|
||||
cpuModel: cpus[0]?.model?.trim() ?? '',
|
||||
totalMemBytes: total,
|
||||
freeMemBytes: free,
|
||||
usedMemBytes: total - free,
|
||||
loadAvg: os.loadavg(),
|
||||
osUptimeSec: os.uptime(),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Call at test start; then call `.finalize()` at end for wall + CPU delta + memory.
|
||||
*/
|
||||
export function createMetricsCollector() {
|
||||
const cpu0 = process.cpuUsage();
|
||||
const perf0 = performance.now();
|
||||
const wall0 = Date.now();
|
||||
|
||||
return {
|
||||
hostSnapshot,
|
||||
|
||||
finalize() {
|
||||
const cpu = process.cpuUsage(cpu0);
|
||||
const perf1 = performance.now();
|
||||
let ru = null;
|
||||
try {
|
||||
ru = resourceUsage();
|
||||
} catch {
|
||||
/* older runtimes */
|
||||
}
|
||||
return {
|
||||
durationWallMs: Math.round((perf1 - perf0) * 1000) / 1000,
|
||||
durationClockMs: Date.now() - wall0,
|
||||
cpuUserUs: cpu.user,
|
||||
cpuSystemUs: cpu.system,
|
||||
cpuUserMs: cpu.user / 1000,
|
||||
cpuSystemMs: cpu.system / 1000,
|
||||
memory: process.memoryUsage(),
|
||||
resourceUsage: ru,
|
||||
pid: process.pid,
|
||||
node: process.version,
|
||||
processUptimeSec: process.uptime(),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Record<string, unknown>} payload
|
||||
* @returns {string}
|
||||
*/
|
||||
export function renderMarkdownReport(payload) {
|
||||
const meta = payload.meta ?? {};
|
||||
const m = payload.metrics ?? {};
|
||||
const host = m.host ?? {};
|
||||
const timing = m.timing ?? {};
|
||||
const proc = m.process ?? {};
|
||||
const tStart = timing.startedAt ?? payload.startedAt;
|
||||
const tEnd = timing.finishedAt ?? payload.finishedAt;
|
||||
|
||||
const lines = [];
|
||||
|
||||
lines.push(`# Test report: ${meta.displayName ?? meta.testName ?? 'run'}`);
|
||||
lines.push('');
|
||||
lines.push('## Summary');
|
||||
lines.push('');
|
||||
lines.push(`| Key | Value |`);
|
||||
lines.push(`| --- | --- |`);
|
||||
lines.push(`| Result | ${payload.ok === true ? 'PASS' : payload.ok === false ? 'FAIL' : '—'} |`);
|
||||
if (payload.passed != null) lines.push(`| Assertions passed | ${payload.passed} |`);
|
||||
if (payload.failed != null) lines.push(`| Assertions failed | ${payload.failed} |`);
|
||||
if (payload.ipcLlm != null) lines.push(`| IPC LLM step | ${payload.ipcLlm ? 'enabled' : 'skipped'} |`);
|
||||
lines.push(`| CWD | \`${String(meta.cwd ?? '').replace(/`/g, "'")}\` |`);
|
||||
lines.push('');
|
||||
|
||||
lines.push('## Timing');
|
||||
lines.push('');
|
||||
lines.push(`| Metric | Value |`);
|
||||
lines.push(`| --- | --- |`);
|
||||
if (tStart) lines.push(`| Started (ISO) | ${tStart} |`);
|
||||
if (tEnd) lines.push(`| Finished (ISO) | ${tEnd} |`);
|
||||
if (proc.durationWallMs != null) lines.push(`| Wall time (perf) | ${proc.durationWallMs} ms |`);
|
||||
if (proc.durationClockMs != null) lines.push(`| Wall time (clock) | ${proc.durationClockMs} ms |`);
|
||||
lines.push('');
|
||||
|
||||
lines.push('## Process (Node)');
|
||||
lines.push('');
|
||||
lines.push(`| Metric | Value |`);
|
||||
lines.push(`| --- | --- |`);
|
||||
if (proc.pid != null) lines.push(`| PID | ${proc.pid} |`);
|
||||
if (proc.node) lines.push(`| Node | ${proc.node} |`);
|
||||
if (proc.processUptimeSec != null) lines.push(`| process.uptime() | ${proc.processUptimeSec.toFixed(3)} s |`);
|
||||
if (proc.cpuUserMs != null && proc.cpuSystemMs != null) {
|
||||
lines.push(`| CPU user (process.cpuUsage Δ) | ${proc.cpuUserMs.toFixed(3)} ms (${proc.cpuUserUs ?? '—'} µs) |`);
|
||||
lines.push(`| CPU system (process.cpuUsage Δ) | ${proc.cpuSystemMs.toFixed(3)} ms (${proc.cpuSystemUs ?? '—'} µs) |`);
|
||||
}
|
||||
const ru = proc.resourceUsage;
|
||||
if (ru && typeof ru === 'object') {
|
||||
if (ru.userCPUTime != null) {
|
||||
lines.push(`| CPU user (resourceUsage) | ${(ru.userCPUTime / 1000).toFixed(3)} ms |`);
|
||||
}
|
||||
if (ru.systemCPUTime != null) {
|
||||
lines.push(`| CPU system (resourceUsage) | ${(ru.systemCPUTime / 1000).toFixed(3)} ms |`);
|
||||
}
|
||||
if (ru.maxRSS != null) {
|
||||
lines.push(`| Max RSS (resourceUsage) | ${formatBytes(ru.maxRSS * 1024)} |`);
|
||||
}
|
||||
}
|
||||
const mem = proc.memory;
|
||||
if (mem && typeof mem === 'object') {
|
||||
lines.push(`| RSS | ${formatBytes(mem.rss)} (${mem.rss} B) |`);
|
||||
lines.push(`| Heap used | ${formatBytes(mem.heapUsed)} |`);
|
||||
lines.push(`| Heap total | ${formatBytes(mem.heapTotal)} |`);
|
||||
lines.push(`| External | ${formatBytes(mem.external)} |`);
|
||||
if (mem.arrayBuffers != null) lines.push(`| Array buffers | ${formatBytes(mem.arrayBuffers)} |`);
|
||||
}
|
||||
lines.push('');
|
||||
|
||||
lines.push('## Host');
|
||||
lines.push('');
|
||||
lines.push(`| Metric | Value |`);
|
||||
lines.push(`| --- | --- |`);
|
||||
if (host.hostname) lines.push(`| Hostname | ${host.hostname} |`);
|
||||
if (host.platform) lines.push(`| OS | ${host.platform} ${host.release ?? ''} |`);
|
||||
if (host.arch) lines.push(`| Arch | ${host.arch} |`);
|
||||
if (host.cpuCount != null) lines.push(`| CPUs | ${host.cpuCount} |`);
|
||||
if (host.cpuModel) lines.push(`| CPU model | ${host.cpuModel} |`);
|
||||
if (host.totalMemBytes != null) {
|
||||
lines.push(`| RAM total | ${formatBytes(host.totalMemBytes)} |`);
|
||||
lines.push(`| RAM free | ${formatBytes(host.freeMemBytes)} |`);
|
||||
lines.push(`| RAM used | ${formatBytes(host.usedMemBytes)} |`);
|
||||
}
|
||||
if (host.loadAvg && host.loadAvg.length) {
|
||||
lines.push(`| Load avg (1/5/15) | ${host.loadAvg.map((x) => x.toFixed(2)).join(' / ')} |`);
|
||||
}
|
||||
if (host.osUptimeSec != null) lines.push(`| OS uptime | ${(host.osUptimeSec / 3600).toFixed(2)} h |`);
|
||||
lines.push('');
|
||||
|
||||
if (payload.env && typeof payload.env === 'object') {
|
||||
lines.push('## Environment (selected)');
|
||||
lines.push('');
|
||||
lines.push(`| Variable | Value |`);
|
||||
lines.push(`| --- | --- |`);
|
||||
for (const [k, v] of Object.entries(payload.env)) {
|
||||
lines.push(`| \`${k}\` | ${v === null || v === undefined ? '—' : String(v)} |`);
|
||||
}
|
||||
lines.push('');
|
||||
}
|
||||
|
||||
if (payload.error) {
|
||||
lines.push('## Error');
|
||||
lines.push('');
|
||||
lines.push('```');
|
||||
lines.push(String(payload.error));
|
||||
lines.push('```');
|
||||
lines.push('');
|
||||
}
|
||||
|
||||
lines.push('---');
|
||||
lines.push(`*Written ${meta.writtenAt ?? new Date().toISOString()}*`);
|
||||
lines.push('');
|
||||
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
/**
|
||||
* Build metrics block for JSON + MD (host snapshot + process finalize).
|
||||
*/
|
||||
export function buildMetricsBundle(collector, startedAtIso, finishedAtIso) {
|
||||
const host = collector.hostSnapshot();
|
||||
const processMetrics = collector.finalize();
|
||||
return {
|
||||
timing: {
|
||||
startedAt: startedAtIso,
|
||||
finishedAt: finishedAtIso,
|
||||
},
|
||||
host,
|
||||
process: processMetrics,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} testName
|
||||
* @param {Record<string, unknown>} data — merged into payload (meta + metrics added)
|
||||
* @param {{ cwd?: string, now?: Date, metrics?: object }} [options]
|
||||
* @returns {Promise<{ jsonPath: string, mdPath: string }>}
|
||||
*/
|
||||
export async function writeTestReports(testName, data, options = {}) {
|
||||
const cwd = options.cwd ?? process.cwd();
|
||||
const now = options.now ?? new Date();
|
||||
const jsonPath = reportFilePath(testName, { cwd, now });
|
||||
const mdPath = reportMarkdownPath(testName, { cwd, now });
|
||||
const { hh, mm, label } = timeParts(now);
|
||||
|
||||
const base = sanitizeTestName(testName);
|
||||
const payload = {
|
||||
meta: {
|
||||
testName: base,
|
||||
displayName: `${base}::${label}`,
|
||||
cwd,
|
||||
writtenAt: now.toISOString(),
|
||||
jsonFile: jsonPath,
|
||||
mdFile: mdPath,
|
||||
},
|
||||
...data,
|
||||
};
|
||||
|
||||
await mkdir(dirname(jsonPath), { recursive: true });
|
||||
await writeFile(jsonPath, JSON.stringify(payload, null, 2), 'utf8');
|
||||
|
||||
const md = renderMarkdownReport(payload);
|
||||
await writeFile(mdPath, md, 'utf8');
|
||||
|
||||
return { jsonPath, mdPath };
|
||||
}
|
||||
|
||||
/** @deprecated Prefer writeTestReports */
|
||||
export async function writeJsonReport(testName, data, options = {}) {
|
||||
const { jsonPath } = await writeTestReports(testName, data, options);
|
||||
return jsonPath;
|
||||
}
|
||||
175
packages/kbot/cpp/orchestrator/test-commons.js
Normal file
175
packages/kbot/cpp/orchestrator/test-commons.js
Normal file
@ -0,0 +1,175 @@
|
||||
/**
|
||||
* orchestrator/test-commons.js — shared helpers for IPC orchestrator tests.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
/** kbot-ai live call runs unless KBOT_IPC_LLM is explicitly disabled. */
|
||||
export function ipcLlmEnabled() {
|
||||
const v = process.env.KBOT_IPC_LLM;
|
||||
if (v === undefined || v === '') return true;
|
||||
const s = String(v).trim().toLowerCase();
|
||||
if (s === '0' || s === 'false' || s === 'no' || s === 'off') return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Counters for a test run (create one per process / suite). */
|
||||
export function createAssert() {
|
||||
let passed = 0;
|
||||
let failed = 0;
|
||||
|
||||
function assert(condition, label) {
|
||||
if (condition) {
|
||||
console.log(` ✅ ${label}`);
|
||||
passed++;
|
||||
} else {
|
||||
console.error(` ❌ ${label}`);
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
assert,
|
||||
get passed() {
|
||||
return passed;
|
||||
},
|
||||
get failed() {
|
||||
return failed;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/** Normalize IPC payload (object or JSON string). */
|
||||
export function payloadObj(msg) {
|
||||
const p = msg?.payload;
|
||||
if (p == null) return null;
|
||||
if (typeof p === 'string') {
|
||||
try {
|
||||
return JSON.parse(p);
|
||||
} catch {
|
||||
return { raw: p };
|
||||
}
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
/** Print LLM job_result so it is easy to spot (stdout, not mixed with worker stderr). */
|
||||
export function logKbotAiResponse(stepLabel, msg) {
|
||||
const p = payloadObj(msg);
|
||||
const text = p?.text != null ? String(p.text) : '';
|
||||
const err = p?.error != null ? String(p.error) : '';
|
||||
const maxRaw = process.env.KBOT_IPC_LLM_LOG_MAX;
|
||||
const max =
|
||||
maxRaw === undefined || maxRaw === ''
|
||||
? Infinity
|
||||
: Number.parseInt(maxRaw, 10);
|
||||
|
||||
console.log('');
|
||||
console.log(` ┌── ${stepLabel} ──────────────────────────────────────────`);
|
||||
console.log(` │ type: ${msg?.type ?? '?'}`);
|
||||
if (p && typeof p === 'object') {
|
||||
console.log(` │ status: ${p.status ?? '?'}`);
|
||||
if (p.mode != null) console.log(` │ mode: ${p.mode}`);
|
||||
if (p.router != null) console.log(` │ router: ${p.router}`);
|
||||
if (p.model != null) console.log(` │ model: ${p.model}`);
|
||||
}
|
||||
if (err) {
|
||||
const showErr =
|
||||
Number.isFinite(max) && err.length > max
|
||||
? `${err.slice(0, max)}… [truncated, ${err.length} chars]`
|
||||
: err;
|
||||
console.log(` │ error: ${showErr.replace(/\n/g, '\n │ ')}`);
|
||||
}
|
||||
if (text) {
|
||||
let body = text;
|
||||
let note = '';
|
||||
if (Number.isFinite(max) && text.length > max) {
|
||||
body = text.slice(0, max);
|
||||
note = `\n │ … [truncated: ${text.length} chars total; set KBOT_IPC_LLM_LOG_MAX= to adjust]`;
|
||||
}
|
||||
console.log(' │ text:');
|
||||
for (const line of body.split('\n')) {
|
||||
console.log(` │ ${line}`);
|
||||
}
|
||||
if (note) console.log(note);
|
||||
} else if (!err) {
|
||||
console.log(' │ (no text in payload)');
|
||||
}
|
||||
console.log(' └────────────────────────────────────────────────────────────');
|
||||
console.log('');
|
||||
}
|
||||
|
||||
/**
|
||||
* Length-prefixed JSON framing used by the C++ UDS worker.
|
||||
* Call `attach()` once to wire `socket.on('data', ...)`.
|
||||
*/
|
||||
export function createIpcClient(socket) {
|
||||
const pending = new Map();
|
||||
let readyResolve;
|
||||
const readyPromise = new Promise((res) => {
|
||||
readyResolve = res;
|
||||
});
|
||||
|
||||
let buffer = Buffer.alloc(0);
|
||||
|
||||
function onData(chunk) {
|
||||
buffer = Buffer.concat([buffer, chunk]);
|
||||
while (buffer.length >= 4) {
|
||||
const len = buffer.readUInt32LE(0);
|
||||
if (buffer.length >= 4 + len) {
|
||||
const payload = buffer.toString('utf8', 4, 4 + len);
|
||||
buffer = buffer.subarray(4 + len);
|
||||
try {
|
||||
const msg = JSON.parse(payload);
|
||||
if (msg.type === 'ready') {
|
||||
readyResolve(msg);
|
||||
} else if (msg.id && pending.has(msg.id)) {
|
||||
const p = pending.get(msg.id);
|
||||
clearTimeout(p.timer);
|
||||
pending.delete(msg.id);
|
||||
p.resolve(msg);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[orchestrator] frame parse error', e);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function request(msg, timeoutMs = 5000) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const id = msg.id || randomUUID();
|
||||
msg.id = id;
|
||||
const timer = setTimeout(() => {
|
||||
pending.delete(id);
|
||||
reject(new Error(`IPC request timed out`));
|
||||
}, timeoutMs);
|
||||
pending.set(id, { resolve, reject, timer });
|
||||
|
||||
const str = JSON.stringify(msg);
|
||||
const lenBuf = Buffer.alloc(4);
|
||||
lenBuf.writeUInt32LE(Buffer.byteLength(str));
|
||||
socket.write(lenBuf);
|
||||
socket.write(str);
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
pending,
|
||||
readyPromise,
|
||||
request,
|
||||
attach() {
|
||||
socket.on('data', onData);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/** Forward worker stderr lines to console (prefixed). */
|
||||
export function pipeWorkerStderr(workerProc, label = '[worker:stderr]') {
|
||||
workerProc.stderr.on('data', (d) => {
|
||||
const txt = d.toString().trim();
|
||||
if (txt) console.error(`${label} ${txt}`);
|
||||
});
|
||||
}
|
||||
@ -4,34 +4,58 @@
|
||||
* Integration test: spawn the C++ worker in UDS mode, exchange messages, verify responses.
|
||||
*
|
||||
* Run: npm run test:ipc
|
||||
*
|
||||
* Env:
|
||||
* KBOT_IPC_LLM — real LLM step is on by default; set to 0 / false / no / off to skip (CI / offline).
|
||||
* KBOT_ROUTER — router (default: openrouter; same defaults as C++ LLMClient / CLI)
|
||||
* KBOT_IPC_MODEL — optional model id (e.g. openrouter slug); else C++ default for that router
|
||||
* KBOT_IPC_PROMPT — custom prompt (default: capital of Germany; asserts "berlin" in reply)
|
||||
* KBOT_IPC_LLM_LOG_MAX — max chars to print for LLM text (default: unlimited)
|
||||
*
|
||||
* Shared: presets.js, test-commons.js, reports.js
|
||||
* Report: cwd/tests/test-ipc__HH-mm.{json,md} (see reports.js)
|
||||
*/
|
||||
|
||||
import { spawn } from 'node:child_process';
|
||||
import { resolve, dirname } from 'node:path';
|
||||
import { dirname } from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import net from 'node:net';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { existsSync, unlinkSync } from 'node:fs';
|
||||
|
||||
import {
|
||||
distExePath,
|
||||
platform,
|
||||
uds,
|
||||
timeouts,
|
||||
kbotAiPayloadFromEnv,
|
||||
usingDefaultGermanyPrompt,
|
||||
} from './presets.js';
|
||||
import {
|
||||
createAssert,
|
||||
payloadObj,
|
||||
logKbotAiResponse,
|
||||
ipcLlmEnabled,
|
||||
createIpcClient,
|
||||
pipeWorkerStderr,
|
||||
} from './test-commons.js';
|
||||
import {
|
||||
createMetricsCollector,
|
||||
buildMetricsBundle,
|
||||
writeTestReports,
|
||||
} from './reports.js';
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const IS_WIN = process.platform === 'win32';
|
||||
const EXE_NAME = IS_WIN ? 'kbot.exe' : 'kbot';
|
||||
const EXE = resolve(__dirname, '..', 'dist', EXE_NAME);
|
||||
const EXE = distExePath(__dirname);
|
||||
const stats = createAssert();
|
||||
const { assert } = stats;
|
||||
|
||||
let passed = 0;
|
||||
let failed = 0;
|
||||
|
||||
function assert(condition, label) {
|
||||
if (condition) {
|
||||
console.log(` ✅ ${label}`);
|
||||
passed++;
|
||||
} else {
|
||||
console.error(` ❌ ${label}`);
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
/** Set at run start for error reports */
|
||||
let ipcRunStartedAt = null;
|
||||
let ipcMetricsCollector = null;
|
||||
|
||||
async function run() {
|
||||
ipcMetricsCollector = createMetricsCollector();
|
||||
ipcRunStartedAt = new Date().toISOString();
|
||||
console.log('\n🔧 IPC [UDS] Integration Tests\n');
|
||||
|
||||
if (!existsSync(EXE)) {
|
||||
@ -39,102 +63,47 @@ async function run() {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const CPP_UDS_ARG = IS_WIN ? '4001' : '/tmp/kbot-test-ipc.sock';
|
||||
if (!IS_WIN && existsSync(CPP_UDS_ARG)) {
|
||||
const CPP_UDS_ARG = uds.workerArg();
|
||||
if (!platform.isWin && existsSync(CPP_UDS_ARG)) {
|
||||
unlinkSync(CPP_UDS_ARG);
|
||||
}
|
||||
|
||||
// ── 1. Spawn & ready ────────────────────────────────────────────────────
|
||||
console.log('1. Spawn worker (UDS mode) and wait for ready signal');
|
||||
const workerProc = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG], { stdio: 'pipe' });
|
||||
|
||||
workerProc.stderr.on('data', d => {
|
||||
const txt = d.toString().trim();
|
||||
if (txt) console.error(`[worker:stderr] ${txt}`);
|
||||
});
|
||||
pipeWorkerStderr(workerProc);
|
||||
|
||||
let socket;
|
||||
for (let i = 0; i < 15; i++) {
|
||||
for (let i = 0; i < timeouts.connectAttempts; i++) {
|
||||
try {
|
||||
await new Promise((res, rej) => {
|
||||
if (IS_WIN) {
|
||||
socket = net.connect({ port: 4001, host: '127.0.0.1' });
|
||||
} else {
|
||||
socket = net.connect(CPP_UDS_ARG);
|
||||
}
|
||||
socket = net.connect(uds.connectOpts(CPP_UDS_ARG));
|
||||
socket.once('connect', res);
|
||||
socket.once('error', rej);
|
||||
});
|
||||
break;
|
||||
} catch (e) {
|
||||
if (i === 14) throw e;
|
||||
await new Promise(r => setTimeout(r, 400));
|
||||
if (i === timeouts.connectAttempts - 1) throw e;
|
||||
await new Promise((r) => setTimeout(r, timeouts.connectRetryMs));
|
||||
}
|
||||
}
|
||||
assert(true, 'Socket connected successfully');
|
||||
|
||||
// Pending request map: id → { resolve, reject, timer }
|
||||
const pending = new Map();
|
||||
let readyResolve;
|
||||
const readyPromise = new Promise(res => { readyResolve = res; });
|
||||
const ipc = createIpcClient(socket);
|
||||
ipc.attach();
|
||||
|
||||
let buffer = Buffer.alloc(0);
|
||||
socket.on('data', chunk => {
|
||||
buffer = Buffer.concat([buffer, chunk]);
|
||||
while (buffer.length >= 4) {
|
||||
const len = buffer.readUInt32LE(0);
|
||||
if (buffer.length >= 4 + len) {
|
||||
const payload = buffer.toString('utf8', 4, 4 + len);
|
||||
buffer = buffer.subarray(4 + len);
|
||||
try {
|
||||
const msg = JSON.parse(payload);
|
||||
if (msg.type === 'ready') {
|
||||
readyResolve(msg);
|
||||
} else if (msg.id && pending.has(msg.id)) {
|
||||
const p = pending.get(msg.id);
|
||||
clearTimeout(p.timer);
|
||||
pending.delete(msg.id);
|
||||
p.resolve(msg);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[orchestrator] frame parse error', e);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
function request(msg, timeoutMs = 5000) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const id = msg.id || randomUUID();
|
||||
msg.id = id;
|
||||
const timer = setTimeout(() => {
|
||||
pending.delete(id);
|
||||
reject(new Error(`IPC request timed out`));
|
||||
}, timeoutMs);
|
||||
pending.set(id, { resolve, reject, timer });
|
||||
|
||||
const str = JSON.stringify(msg);
|
||||
const lenBuf = Buffer.alloc(4);
|
||||
lenBuf.writeUInt32LE(Buffer.byteLength(str));
|
||||
socket.write(lenBuf);
|
||||
socket.write(str);
|
||||
});
|
||||
}
|
||||
|
||||
const readyMsg = await readyPromise;
|
||||
const readyMsg = await ipc.readyPromise;
|
||||
assert(readyMsg.type === 'ready', 'Worker sends ready message on startup');
|
||||
|
||||
// ── 2. Ping / Pong ─────────────────────────────────────────────────────
|
||||
console.log('2. Ping → Pong');
|
||||
const pong = await request({ type: 'ping' });
|
||||
const pong = await ipc.request({ type: 'ping' }, timeouts.ipcDefault);
|
||||
assert(pong.type === 'pong', `Response type is "pong" (got "${pong.type}")`);
|
||||
|
||||
// ── 3. Job echo ─────────────────────────────────────────────────────────
|
||||
console.log('3. Job → Job Result (echo payload)');
|
||||
const payload = { action: 'resize', width: 1024, format: 'webp' };
|
||||
const jobResult = await request({ type: 'job', payload });
|
||||
const jobResult = await ipc.request({ type: 'job', payload }, timeouts.ipcDefault);
|
||||
assert(jobResult.type === 'job_result', `Response type is "job_result" (got "${jobResult.type}")`);
|
||||
assert(
|
||||
jobResult.payload?.action === 'resize' && jobResult.payload?.width === 1024,
|
||||
@ -143,38 +112,114 @@ async function run() {
|
||||
|
||||
// ── 4. Unknown type → error ─────────────────────────────────────────────
|
||||
console.log('4. Unknown type → error response');
|
||||
const errResp = await request({ type: 'nonsense' });
|
||||
const errResp = await ipc.request({ type: 'nonsense' }, timeouts.ipcDefault);
|
||||
assert(errResp.type === 'error', `Response type is "error" (got "${errResp.type}")`);
|
||||
|
||||
// ── 5. Multiple rapid requests ──────────────────────────────────────────
|
||||
console.log('5. Multiple concurrent requests');
|
||||
const promises = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
promises.push(request({ type: 'ping', payload: { seq: i } }));
|
||||
promises.push(ipc.request({ type: 'ping', payload: { seq: i } }, timeouts.ipcDefault));
|
||||
}
|
||||
const results = await Promise.all(promises);
|
||||
assert(results.length === 10, `All 10 responses received`);
|
||||
assert(results.every(r => r.type === 'pong'), 'All responses are pong');
|
||||
assert(results.every((r) => r.type === 'pong'), 'All responses are pong');
|
||||
|
||||
// ── 6. Graceful shutdown ────────────────────────────────────────────────
|
||||
console.log('6. Graceful shutdown');
|
||||
const shutdownRes = await request({ type: 'shutdown' });
|
||||
// ── 6. kbot-ai — real LLM (optional via ipcLlmEnabled) ─────────────────
|
||||
if (ipcLlmEnabled()) {
|
||||
const aiPayload = kbotAiPayloadFromEnv();
|
||||
const r = aiPayload.router;
|
||||
console.log(`6. kbot-ai → real LLM (router=${r}, timeout 3m)`);
|
||||
const live = await ipc.request(
|
||||
{
|
||||
type: 'kbot-ai',
|
||||
payload: aiPayload,
|
||||
},
|
||||
timeouts.kbotAi
|
||||
);
|
||||
assert(live.type === 'job_result', `LLM response type job_result (got "${live.type}")`);
|
||||
const lp = payloadObj(live);
|
||||
assert(lp?.status === 'success', `payload status success (got "${lp?.status}")`);
|
||||
assert(
|
||||
typeof lp?.text === 'string' && lp.text.trim().length >= 3,
|
||||
`assistant text present (length ${(lp?.text || '').length})`
|
||||
);
|
||||
if (usingDefaultGermanyPrompt()) {
|
||||
assert(
|
||||
/berlin/i.test(String(lp?.text || '')),
|
||||
'assistant text mentions Berlin (capital of Germany)'
|
||||
);
|
||||
}
|
||||
logKbotAiResponse('kbot-ai response', live);
|
||||
} else {
|
||||
console.log('6. kbot-ai — skipped (KBOT_IPC_LLM=0/false/no/off; default is to run live LLM)');
|
||||
}
|
||||
|
||||
// ── 7. Graceful shutdown ────────────────────────────────────────────────
|
||||
console.log('7. Graceful shutdown');
|
||||
const shutdownRes = await ipc.request({ type: 'shutdown' }, timeouts.ipcDefault);
|
||||
assert(shutdownRes.type === 'shutdown_ack', `Shutdown acknowledged (got "${shutdownRes.type}")`);
|
||||
|
||||
// Wait a beat for process exit
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
await new Promise((r) => setTimeout(r, timeouts.postShutdownMs));
|
||||
socket.destroy();
|
||||
assert(workerProc.exitCode === 0, `Worker exited with code 0 (got ${workerProc.exitCode})`);
|
||||
|
||||
// ── Summary ─────────────────────────────────────────────────────────────
|
||||
console.log(`\n────────────────────────────────`);
|
||||
console.log(` Passed: ${passed} Failed: ${failed}`);
|
||||
console.log(` Passed: ${stats.passed} Failed: ${stats.failed}`);
|
||||
console.log(`────────────────────────────────\n`);
|
||||
|
||||
process.exit(failed > 0 ? 1 : 0);
|
||||
try {
|
||||
const finishedAt = new Date().toISOString();
|
||||
const { jsonPath, mdPath } = await writeTestReports(
|
||||
'test-ipc',
|
||||
{
|
||||
startedAt: ipcRunStartedAt,
|
||||
finishedAt,
|
||||
passed: stats.passed,
|
||||
failed: stats.failed,
|
||||
ok: stats.failed === 0,
|
||||
ipcLlm: ipcLlmEnabled(),
|
||||
env: {
|
||||
KBOT_IPC_LLM: process.env.KBOT_IPC_LLM ?? null,
|
||||
KBOT_ROUTER: process.env.KBOT_ROUTER ?? null,
|
||||
KBOT_IPC_MODEL: process.env.KBOT_IPC_MODEL ?? null,
|
||||
KBOT_IPC_PROMPT: process.env.KBOT_IPC_PROMPT ?? null,
|
||||
},
|
||||
metrics: buildMetricsBundle(ipcMetricsCollector, ipcRunStartedAt, finishedAt),
|
||||
},
|
||||
{ cwd: process.cwd() }
|
||||
);
|
||||
console.log(` 📄 Report JSON: ${jsonPath}`);
|
||||
console.log(` 📄 Report MD: ${mdPath}\n`);
|
||||
} catch (e) {
|
||||
console.error(' ⚠️ Failed to write report:', e?.message ?? e);
|
||||
}
|
||||
|
||||
process.exit(stats.failed > 0 ? 1 : 0);
|
||||
}
|
||||
|
||||
run().catch((err) => {
|
||||
run().catch(async (err) => {
|
||||
console.error('Test runner error:', err);
|
||||
try {
|
||||
const finishedAt = new Date().toISOString();
|
||||
const c = ipcMetricsCollector ?? createMetricsCollector();
|
||||
const started = ipcRunStartedAt ?? finishedAt;
|
||||
await writeTestReports(
|
||||
'test-ipc',
|
||||
{
|
||||
startedAt: started,
|
||||
finishedAt,
|
||||
error: String(err?.stack ?? err),
|
||||
passed: stats.passed,
|
||||
failed: stats.failed,
|
||||
ok: false,
|
||||
metrics: buildMetricsBundle(c, started, finishedAt),
|
||||
},
|
||||
{ cwd: process.cwd() }
|
||||
);
|
||||
} catch (_) {
|
||||
/* ignore */
|
||||
}
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
{
|
||||
"name": "kbot-cpp",
|
||||
"version": "1.0.0",
|
||||
"type": "module",
|
||||
"description": "KBot C++ CLI built with CMake.",
|
||||
"directories": {
|
||||
"test": "tests"
|
||||
|
||||
@ -31,7 +31,7 @@ LLMClient::LLMClient(const KBotOptions& opts)
|
||||
else if (router_ == "openai") model_ = "gpt-4o";
|
||||
else if (router_ == "deepseek") model_ = "deepseek-chat";
|
||||
else if (router_ == "huggingface") model_ = "meta-llama/2";
|
||||
else if (router_ == "ollama") model_ = "leonard";
|
||||
else if (router_ == "ollama") model_ = "llama3.2";
|
||||
else if (router_ == "fireworks") model_ = "llama-v2-70b-chat";
|
||||
else if (router_ == "gemini") model_ = "gemini-1.5-pro";
|
||||
else if (router_ == "xai") model_ = "grok-1";
|
||||
|
||||
2
packages/kbot/cpp/scripts/qwen3_4b.sh
Normal file
2
packages/kbot/cpp/scripts/qwen3_4b.sh
Normal file
@ -0,0 +1,2 @@
|
||||
ollama run qwen2.5-coder:latest
|
||||
|
||||
9
packages/kbot/cpp/scripts/run-7b.sh
Normal file
9
packages/kbot/cpp/scripts/run-7b.sh
Normal file
@ -0,0 +1,9 @@
|
||||
./llama-server.exe \
|
||||
--hf-repo paultimothymooney/Qwen2.5-7B-Instruct-Q4_K_M-GGUF \
|
||||
--hf-file qwen2.5-7b-instruct-q4_k_m.gguf \
|
||||
-t 16 \
|
||||
-c 2048 \
|
||||
-b 512 \
|
||||
--temp 0.2 \
|
||||
--top-p 0.9 \
|
||||
--port 8888
|
||||
4
packages/kbot/cpp/scripts/setup-7b.sh
Normal file
4
packages/kbot/cpp/scripts/setup-7b.sh
Normal file
@ -0,0 +1,4 @@
|
||||
mkdir -p models/qwen2.5-7b
|
||||
cd models/qwen2.5-7b
|
||||
|
||||
curl -L -O https://huggingface.co/Qwen/Qwen2.5-7B-Instruct-GGUF/resolve/main/qwen2.5-7b-instruct-q4_k_m.gguf
|
||||
370
packages/kbot/cpp/src/cmd_kbot_uds.cpp
Normal file
370
packages/kbot/cpp/src/cmd_kbot_uds.cpp
Normal file
@ -0,0 +1,370 @@
|
||||
// cmd_kbot_uds.cpp — UDS/TCP worker for KBot LLM IPC (length-prefixed JSON frames).
|
||||
// Framing matches orchestrator tests: [uint32_le length][utf-8 JSON object with id, type, payload].
|
||||
|
||||
#include "cmd_kbot.h"
|
||||
#include "concurrentqueue.h"
|
||||
#include "logger/logger.h"
|
||||
#include "rapidjson/document.h"
|
||||
#include "rapidjson/stringbuffer.h"
|
||||
#include "rapidjson/writer.h"
|
||||
#include <asio.hpp>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdio>
|
||||
#include <mutex>
|
||||
#include <spdlog/sinks/base_sink.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <taskflow/taskflow.hpp>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace polymech {
|
||||
namespace {
|
||||
|
||||
#ifdef _WIN32
|
||||
using ipc_endpoint = asio::ip::tcp::endpoint;
|
||||
using ipc_acceptor = asio::ip::tcp::acceptor;
|
||||
using ipc_socket = asio::ip::tcp::socket;
|
||||
#else
|
||||
using ipc_endpoint = asio::local::stream_protocol::endpoint;
|
||||
using ipc_acceptor = asio::local::stream_protocol::acceptor;
|
||||
using ipc_socket = asio::local::stream_protocol::socket;
|
||||
#endif
|
||||
|
||||
std::shared_ptr<ipc_socket> g_active_uds_socket;
|
||||
std::mutex g_uds_socket_mutex;
|
||||
|
||||
std::string json_escape_log_line(const std::string &s) {
|
||||
rapidjson::StringBuffer buf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> w(buf);
|
||||
w.String(s.c_str(), static_cast<rapidjson::SizeType>(s.length()));
|
||||
std::string out(buf.GetString(), buf.GetSize());
|
||||
if (out.size() >= 2 && out.front() == '"' && out.back() == '"')
|
||||
return out.substr(1, out.size() - 2);
|
||||
return out;
|
||||
}
|
||||
|
||||
template <typename Mutex>
|
||||
class kbot_uds_sink : public spdlog::sinks::base_sink<Mutex> {
|
||||
protected:
|
||||
void sink_it_(const spdlog::details::log_msg &msg) override {
|
||||
spdlog::memory_buf_t formatted;
|
||||
this->formatter_->format(msg, formatted);
|
||||
std::string text = fmt::to_string(formatted);
|
||||
if (!text.empty() && text.back() == '\n')
|
||||
text.pop_back();
|
||||
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
if (!g_active_uds_socket)
|
||||
return;
|
||||
try {
|
||||
std::string escaped = json_escape_log_line(text);
|
||||
std::string frame = "{\"type\":\"log\",\"data\":\"" + escaped + "\"}";
|
||||
uint32_t len = static_cast<uint32_t>(frame.size());
|
||||
asio::write(*g_active_uds_socket, asio::buffer(&len, 4));
|
||||
asio::write(*g_active_uds_socket, asio::buffer(frame));
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
void flush_() override {}
|
||||
};
|
||||
|
||||
using kbot_uds_sink_mt = kbot_uds_sink<std::mutex>;
|
||||
|
||||
struct KbotUdsJob {
|
||||
std::string payload;
|
||||
std::string job_id;
|
||||
std::shared_ptr<ipc_socket> socket;
|
||||
std::shared_ptr<std::atomic<bool>> cancel_token;
|
||||
};
|
||||
|
||||
std::string request_id_string(const rapidjson::Document &doc) {
|
||||
if (doc.HasMember("id") && doc["id"].IsString())
|
||||
return doc["id"].GetString();
|
||||
if (doc.HasMember("jobId") && doc["jobId"].IsString())
|
||||
return doc["jobId"].GetString();
|
||||
return "kbot-uds-" +
|
||||
std::to_string(
|
||||
std::chrono::system_clock::now().time_since_epoch().count());
|
||||
}
|
||||
|
||||
void write_raw_frame(const std::shared_ptr<ipc_socket> &sock,
|
||||
const std::string &json_body) {
|
||||
uint32_t len = static_cast<uint32_t>(json_body.size());
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
asio::write(*sock, asio::buffer(&len, 4));
|
||||
asio::write(*sock, asio::buffer(json_body));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int run_cmd_kbot_uds(const std::string &pipe_path) {
|
||||
logger::info("Starting KBot UDS on " + pipe_path);
|
||||
std::atomic<bool> running{true};
|
||||
asio::io_context io_context;
|
||||
std::shared_ptr<ipc_acceptor> acceptor;
|
||||
|
||||
try {
|
||||
#ifdef _WIN32
|
||||
int port = 4000;
|
||||
try {
|
||||
port = std::stoi(pipe_path);
|
||||
} catch (...) {
|
||||
}
|
||||
ipc_endpoint ep(asio::ip::tcp::v4(), static_cast<unsigned short>(port));
|
||||
acceptor = std::make_shared<ipc_acceptor>(io_context, ep);
|
||||
logger::info("KBot UDS: bound TCP 127.0.0.1:" + std::to_string(port));
|
||||
#else
|
||||
std::remove(pipe_path.c_str());
|
||||
ipc_endpoint ep(pipe_path);
|
||||
acceptor = std::make_shared<ipc_acceptor>(io_context, ep);
|
||||
#endif
|
||||
} catch (const std::exception &e) {
|
||||
logger::error(std::string("KBot UDS bind failed: ") + e.what());
|
||||
return 1;
|
||||
}
|
||||
|
||||
const int k_frame_max = 50 * 1024 * 1024;
|
||||
const int k_queue_depth_max = 10000;
|
||||
int threads = static_cast<int>(std::thread::hardware_concurrency());
|
||||
if (threads <= 0)
|
||||
threads = 2;
|
||||
tf::Executor executor(threads);
|
||||
|
||||
moodycamel::ConcurrentQueue<KbotUdsJob> queue;
|
||||
|
||||
auto log_sink = std::make_shared<kbot_uds_sink_mt>();
|
||||
log_sink->set_pattern("%^%l%$ %v");
|
||||
spdlog::default_logger()->sinks().push_back(log_sink);
|
||||
|
||||
std::thread uds_job_queue_thread([&]() {
|
||||
KbotUdsJob job;
|
||||
while (running.load()) {
|
||||
if (!queue.try_dequeue(job)) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
continue;
|
||||
}
|
||||
|
||||
tf::Taskflow tf;
|
||||
tf.emplace([job]() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
g_active_uds_socket = job.socket;
|
||||
}
|
||||
|
||||
kbot::KBotCallbacks cb;
|
||||
cb.onEvent = [sock = job.socket, jid = job.job_id](
|
||||
const std::string &type, const std::string &json) {
|
||||
try {
|
||||
std::string resolved_id =
|
||||
(type == "job_result" || type == "error") ? jid : "0";
|
||||
std::string msg = "{\"id\":\"" + resolved_id +
|
||||
"\",\"type\":\"" + type + "\",\"payload\":" +
|
||||
json + "}";
|
||||
uint32_t len = static_cast<uint32_t>(msg.size());
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
asio::write(*sock, asio::buffer(&len, 4));
|
||||
asio::write(*sock, asio::buffer(msg));
|
||||
} catch (...) {
|
||||
}
|
||||
};
|
||||
|
||||
rapidjson::Document doc;
|
||||
doc.Parse(job.payload.c_str());
|
||||
if (doc.HasParseError()) {
|
||||
cb.onEvent("error", "\"invalid JSON payload\"");
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
g_active_uds_socket.reset();
|
||||
return;
|
||||
}
|
||||
|
||||
std::string job_type;
|
||||
if (doc.HasMember("type") && doc["type"].IsString())
|
||||
job_type = doc["type"].GetString();
|
||||
|
||||
if (job_type == "job") {
|
||||
std::string payload_str = "{}";
|
||||
if (doc.HasMember("payload")) {
|
||||
rapidjson::StringBuffer sbuf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(sbuf);
|
||||
doc["payload"].Accept(writer);
|
||||
payload_str = sbuf.GetString();
|
||||
}
|
||||
cb.onEvent("job_result", payload_str);
|
||||
} else if (job_type == "kbot-ai") {
|
||||
kbot::KBotCallbacks ai_cb;
|
||||
ai_cb.onEvent = [&cb](const std::string &t, const std::string &j) {
|
||||
cb.onEvent(t, j);
|
||||
};
|
||||
std::string payload_str = "{}";
|
||||
if (doc.HasMember("payload")) {
|
||||
if (doc["payload"].IsString()) {
|
||||
payload_str = doc["payload"].GetString();
|
||||
} else {
|
||||
rapidjson::StringBuffer sbuf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(sbuf);
|
||||
doc["payload"].Accept(writer);
|
||||
payload_str = sbuf.GetString();
|
||||
}
|
||||
}
|
||||
polymech::run_kbot_ai_ipc(payload_str, job.job_id, ai_cb);
|
||||
} else if (job_type == "kbot-run") {
|
||||
kbot::KBotCallbacks run_cb;
|
||||
run_cb.onEvent = [&cb](const std::string &t, const std::string &j) {
|
||||
cb.onEvent(t, j);
|
||||
};
|
||||
std::string payload_str = "{}";
|
||||
if (doc.HasMember("payload")) {
|
||||
if (doc["payload"].IsString()) {
|
||||
payload_str = doc["payload"].GetString();
|
||||
} else {
|
||||
rapidjson::StringBuffer sbuf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(sbuf);
|
||||
doc["payload"].Accept(writer);
|
||||
payload_str = sbuf.GetString();
|
||||
}
|
||||
}
|
||||
polymech::run_kbot_run_ipc(payload_str, job.job_id, run_cb);
|
||||
} else {
|
||||
rapidjson::StringBuffer sbuf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> w(sbuf);
|
||||
w.StartObject();
|
||||
w.Key("message");
|
||||
std::string m = "unsupported type: " + job_type;
|
||||
w.String(m.c_str(), static_cast<rapidjson::SizeType>(m.size()));
|
||||
w.EndObject();
|
||||
cb.onEvent("error", sbuf.GetString());
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
g_active_uds_socket.reset();
|
||||
}
|
||||
});
|
||||
|
||||
executor.run(tf).wait();
|
||||
}
|
||||
});
|
||||
|
||||
logger::info("KBot UDS ready; waiting for connections…");
|
||||
while (running.load()) {
|
||||
auto socket = std::make_shared<ipc_socket>(io_context);
|
||||
asio::error_code ec;
|
||||
acceptor->accept(*socket, ec);
|
||||
if (ec || !running.load())
|
||||
break;
|
||||
|
||||
logger::info("KBot UDS client connected");
|
||||
|
||||
std::thread(
|
||||
[socket, &queue, &running, acceptor, k_frame_max, k_queue_depth_max]() {
|
||||
std::unordered_map<std::string, std::shared_ptr<std::atomic<bool>>>
|
||||
socket_jobs;
|
||||
|
||||
try {
|
||||
{
|
||||
std::string ready =
|
||||
R"({"id":"0","type":"ready","payload":{}})";
|
||||
uint32_t rlen = static_cast<uint32_t>(ready.size());
|
||||
std::lock_guard<std::mutex> lock(g_uds_socket_mutex);
|
||||
asio::write(*socket, asio::buffer(&rlen, 4));
|
||||
asio::write(*socket, asio::buffer(ready));
|
||||
}
|
||||
|
||||
while (true) {
|
||||
uint32_t len = 0;
|
||||
asio::read(*socket, asio::buffer(&len, 4));
|
||||
if (len == 0 ||
|
||||
len > static_cast<uint32_t>(k_frame_max))
|
||||
break;
|
||||
|
||||
std::string raw(len, '\0');
|
||||
asio::read(*socket, asio::buffer(raw.data(), len));
|
||||
|
||||
rapidjson::Document doc;
|
||||
doc.Parse(raw.c_str());
|
||||
|
||||
if (!doc.HasParseError()) {
|
||||
std::string action;
|
||||
if (doc.HasMember("action") && doc["action"].IsString())
|
||||
action = doc["action"].GetString();
|
||||
else if (doc.HasMember("type") && doc["type"].IsString())
|
||||
action = doc["type"].GetString();
|
||||
|
||||
auto id_for = [&doc]() -> std::string {
|
||||
if (doc.HasMember("id") && doc["id"].IsString())
|
||||
return doc["id"].GetString();
|
||||
return "0";
|
||||
};
|
||||
|
||||
if (action == "ping") {
|
||||
std::string res_id = id_for();
|
||||
std::string ack = "{\"id\":\"" + res_id +
|
||||
"\",\"type\":\"pong\",\"payload\":{}}";
|
||||
write_raw_frame(socket, ack);
|
||||
continue;
|
||||
}
|
||||
if (action == "nonsense") {
|
||||
std::string res_id = id_for();
|
||||
std::string ack = "{\"id\":\"" + res_id +
|
||||
"\",\"type\":\"error\",\"payload\":{}}";
|
||||
write_raw_frame(socket, ack);
|
||||
continue;
|
||||
}
|
||||
if (action == "cancel") {
|
||||
if (doc.HasMember("jobId") && doc["jobId"].IsString()) {
|
||||
std::string jid = doc["jobId"].GetString();
|
||||
if (socket_jobs.count(jid) && socket_jobs[jid]) {
|
||||
*socket_jobs[jid] = true;
|
||||
std::string ack =
|
||||
"{\"type\":\"cancel_ack\",\"data\":\"" + jid + "\"}";
|
||||
write_raw_frame(socket, ack);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (action == "stop" || action == "shutdown") {
|
||||
logger::info("KBot UDS: shutdown requested");
|
||||
std::string res_id = id_for();
|
||||
std::string ack = "{\"id\":\"" + res_id +
|
||||
"\",\"type\":\"shutdown_ack\","
|
||||
"\"payload\":{}}";
|
||||
write_raw_frame(socket, ack);
|
||||
running.store(false);
|
||||
try {
|
||||
acceptor->close();
|
||||
} catch (...) {
|
||||
}
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string jid = request_id_string(doc);
|
||||
auto cancel_token = std::make_shared<std::atomic<bool>>(false);
|
||||
socket_jobs[jid] = cancel_token;
|
||||
|
||||
KbotUdsJob job{raw, jid, socket, cancel_token};
|
||||
while (queue.size_approx() >=
|
||||
static_cast<size_t>(k_queue_depth_max)) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
queue.enqueue(std::move(job));
|
||||
}
|
||||
} catch (const std::exception &) {
|
||||
for (auto &kv : socket_jobs) {
|
||||
if (kv.second)
|
||||
*kv.second = true;
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
|
||||
running.store(false);
|
||||
uds_job_queue_thread.join();
|
||||
return 0;
|
||||
}
|
||||
|
||||
} // namespace polymech
|
||||
Loading…
Reference in New Issue
Block a user