diff --git a/packages/kbot/cpp/.gitignore b/packages/kbot/cpp/.gitignore index 33c9da8a..cb16910a 100644 --- a/packages/kbot/cpp/.gitignore +++ b/packages/kbot/cpp/.gitignore @@ -29,4 +29,8 @@ Thumbs.db cache/ config/postgres.toml dist + +# Orchestrator reports (cwd/tests/*) +tests/*.json +tests/*.md src/cmd_grid*.cpp diff --git a/packages/kbot/cpp/CMakeLists.txt b/packages/kbot/cpp/CMakeLists.txt index b85d7d76..b2c4db99 100644 --- a/packages/kbot/cpp/CMakeLists.txt +++ b/packages/kbot/cpp/CMakeLists.txt @@ -105,7 +105,7 @@ add_executable(${PROJECT_NAME} # Output file name is kbot.exe / kbot (not kbot-cli) set_target_properties(${PROJECT_NAME} PROPERTIES OUTPUT_NAME "kbot") -target_link_libraries(${PROJECT_NAME} PRIVATE CLI11::CLI11 tomlplusplus::tomlplusplus logger html postgres http json polymech ipc search kbot) +target_link_libraries(${PROJECT_NAME} PRIVATE CLI11::CLI11 tomlplusplus::tomlplusplus logger html postgres http json polymech ipc kbot) target_include_directories(${PROJECT_NAME} PRIVATE ${asio_SOURCE_DIR}/asio/include diff --git a/packages/kbot/cpp/orchestrator/presets.js b/packages/kbot/cpp/orchestrator/presets.js new file mode 100644 index 00000000..2e6c5c7d --- /dev/null +++ b/packages/kbot/cpp/orchestrator/presets.js @@ -0,0 +1,73 @@ +/** + * orchestrator/presets.js — defaults for IPC integration tests (extend here as suites grow). + */ + +import { resolve } from 'node:path'; + +export const platform = { + isWin: process.platform === 'win32', +}; + +/** Dist binary name for the current OS. */ +export function exeName() { + return platform.isWin ? 'kbot.exe' : 'kbot'; +} + +/** Absolute path to kbot binary given orchestrator/ directory (where test-ipc.mjs lives). */ +export function distExePath(orchestratorDir) { + return resolve(orchestratorDir, '..', 'dist', exeName()); +} + +/** UDS / TCP listen argument passed to `kbot worker --uds `. */ +export const uds = { + tcpPort: 4001, + unixPath: '/tmp/kbot-test-ipc.sock', + /** Value for `--uds` on this OS (Windows: port string; Unix: socket path). */ + workerArg() { + return platform.isWin ? String(this.tcpPort) : this.unixPath; + }, + /** Options for `net.connect` to reach the worker. */ + connectOpts(cppUdsArg) { + return platform.isWin + ? { port: this.tcpPort, host: '127.0.0.1' } + : cppUdsArg; + }, +}; + +/** Millisecond timeouts — tune per step in new tests. */ +export const timeouts = { + ipcDefault: 5000, + kbotAi: 180_000, + connectAttempts: 15, + connectRetryMs: 400, + postShutdownMs: 200, +}; + +export const router = { + default: 'openrouter', + fromEnv() { + return process.env.KBOT_ROUTER || this.default; + }, +}; + +/** Stock prompts and assertions helpers for LLM smoke tests. */ +export const prompts = { + germanyCapital: 'What is the capital of Germany? Reply in one short sentence.', +}; + +/** Build `kbot-ai` IPC payload from env + presets (OpenRouter-friendly defaults). */ +export function kbotAiPayloadFromEnv() { + const payload = { + prompt: process.env.KBOT_IPC_PROMPT || prompts.germanyCapital, + router: router.fromEnv(), + }; + if (process.env.KBOT_IPC_MODEL) { + payload.model = process.env.KBOT_IPC_MODEL; + } + return payload; +} + +/** True when using the default Germany prompt (for optional Berlin assertion). */ +export function usingDefaultGermanyPrompt() { + return !process.env.KBOT_IPC_PROMPT; +} diff --git a/packages/kbot/cpp/orchestrator/reports.js b/packages/kbot/cpp/orchestrator/reports.js new file mode 100644 index 00000000..e87763ec --- /dev/null +++ b/packages/kbot/cpp/orchestrator/reports.js @@ -0,0 +1,296 @@ +/** + * orchestrator/reports.js — JSON + Markdown reports under cwd/tests/ + * + * File pattern (logical): test-name::hh:mm + * On-disk: test-name__HH-mm.json / .md (Windows: no `:` in filenames) + */ + +import { mkdir, writeFile } from 'node:fs/promises'; +import { join, dirname } from 'node:path'; +import os from 'node:os'; +import { performance } from 'node:perf_hooks'; +import { resourceUsage } from 'node:process'; + +const WIN_BAD = /[<>:"/\\|?*\x00-\x1f]/g; + +/** Strip characters invalid in Windows / POSIX filenames. */ +export function sanitizeTestName(name) { + const s = String(name).trim().replace(WIN_BAD, '_').replace(/\s+/g, '_'); + return s || 'test'; +} + +/** + * @param {Date} [now] + * @returns {{ hh: string, mm: string, label: string }} + */ +export function timeParts(now = new Date()) { + const hh = String(now.getHours()).padStart(2, '0'); + const mm = String(now.getMinutes()).padStart(2, '0'); + return { hh, mm, label: `${hh}:${mm}` }; +} + +/** + * @param {string} testName + * @param {string} ext — including dot, e.g. '.json' + * @param {{ cwd?: string, now?: Date }} [options] + */ +export function reportFilePathWithExt(testName, ext, options = {}) { + const cwd = options.cwd ?? process.cwd(); + const now = options.now ?? new Date(); + const base = sanitizeTestName(testName); + const { hh, mm } = timeParts(now); + const file = `${base}__${hh}-${mm}${ext}`; + return join(cwd, 'tests', file); +} + +export function reportFilePath(testName, options = {}) { + return reportFilePathWithExt(testName, '.json', options); +} + +export function reportMarkdownPath(testName, options = {}) { + return reportFilePathWithExt(testName, '.md', options); +} + +function formatBytes(n) { + if (typeof n !== 'number' || Number.isNaN(n)) return String(n); + const u = ['B', 'KB', 'MB', 'GB']; + let i = 0; + let x = n; + while (x >= 1024 && i < u.length - 1) { + x /= 1024; + i++; + } + return `${x < 10 && i > 0 ? x.toFixed(1) : Math.round(x)} ${u[i]}`; +} + +/** Snapshot of host / OS (cheap; call anytime). */ +export function hostSnapshot() { + const cpus = os.cpus(); + const total = os.totalmem(); + const free = os.freemem(); + return { + hostname: os.hostname(), + platform: os.platform(), + arch: os.arch(), + release: os.release(), + cpuCount: cpus.length, + cpuModel: cpus[0]?.model?.trim() ?? '', + totalMemBytes: total, + freeMemBytes: free, + usedMemBytes: total - free, + loadAvg: os.loadavg(), + osUptimeSec: os.uptime(), + }; +} + +/** + * Call at test start; then call `.finalize()` at end for wall + CPU delta + memory. + */ +export function createMetricsCollector() { + const cpu0 = process.cpuUsage(); + const perf0 = performance.now(); + const wall0 = Date.now(); + + return { + hostSnapshot, + + finalize() { + const cpu = process.cpuUsage(cpu0); + const perf1 = performance.now(); + let ru = null; + try { + ru = resourceUsage(); + } catch { + /* older runtimes */ + } + return { + durationWallMs: Math.round((perf1 - perf0) * 1000) / 1000, + durationClockMs: Date.now() - wall0, + cpuUserUs: cpu.user, + cpuSystemUs: cpu.system, + cpuUserMs: cpu.user / 1000, + cpuSystemMs: cpu.system / 1000, + memory: process.memoryUsage(), + resourceUsage: ru, + pid: process.pid, + node: process.version, + processUptimeSec: process.uptime(), + }; + }, + }; +} + +/** + * @param {Record} payload + * @returns {string} + */ +export function renderMarkdownReport(payload) { + const meta = payload.meta ?? {}; + const m = payload.metrics ?? {}; + const host = m.host ?? {}; + const timing = m.timing ?? {}; + const proc = m.process ?? {}; + const tStart = timing.startedAt ?? payload.startedAt; + const tEnd = timing.finishedAt ?? payload.finishedAt; + + const lines = []; + + lines.push(`# Test report: ${meta.displayName ?? meta.testName ?? 'run'}`); + lines.push(''); + lines.push('## Summary'); + lines.push(''); + lines.push(`| Key | Value |`); + lines.push(`| --- | --- |`); + lines.push(`| Result | ${payload.ok === true ? 'PASS' : payload.ok === false ? 'FAIL' : '—'} |`); + if (payload.passed != null) lines.push(`| Assertions passed | ${payload.passed} |`); + if (payload.failed != null) lines.push(`| Assertions failed | ${payload.failed} |`); + if (payload.ipcLlm != null) lines.push(`| IPC LLM step | ${payload.ipcLlm ? 'enabled' : 'skipped'} |`); + lines.push(`| CWD | \`${String(meta.cwd ?? '').replace(/`/g, "'")}\` |`); + lines.push(''); + + lines.push('## Timing'); + lines.push(''); + lines.push(`| Metric | Value |`); + lines.push(`| --- | --- |`); + if (tStart) lines.push(`| Started (ISO) | ${tStart} |`); + if (tEnd) lines.push(`| Finished (ISO) | ${tEnd} |`); + if (proc.durationWallMs != null) lines.push(`| Wall time (perf) | ${proc.durationWallMs} ms |`); + if (proc.durationClockMs != null) lines.push(`| Wall time (clock) | ${proc.durationClockMs} ms |`); + lines.push(''); + + lines.push('## Process (Node)'); + lines.push(''); + lines.push(`| Metric | Value |`); + lines.push(`| --- | --- |`); + if (proc.pid != null) lines.push(`| PID | ${proc.pid} |`); + if (proc.node) lines.push(`| Node | ${proc.node} |`); + if (proc.processUptimeSec != null) lines.push(`| process.uptime() | ${proc.processUptimeSec.toFixed(3)} s |`); + if (proc.cpuUserMs != null && proc.cpuSystemMs != null) { + lines.push(`| CPU user (process.cpuUsage Δ) | ${proc.cpuUserMs.toFixed(3)} ms (${proc.cpuUserUs ?? '—'} µs) |`); + lines.push(`| CPU system (process.cpuUsage Δ) | ${proc.cpuSystemMs.toFixed(3)} ms (${proc.cpuSystemUs ?? '—'} µs) |`); + } + const ru = proc.resourceUsage; + if (ru && typeof ru === 'object') { + if (ru.userCPUTime != null) { + lines.push(`| CPU user (resourceUsage) | ${(ru.userCPUTime / 1000).toFixed(3)} ms |`); + } + if (ru.systemCPUTime != null) { + lines.push(`| CPU system (resourceUsage) | ${(ru.systemCPUTime / 1000).toFixed(3)} ms |`); + } + if (ru.maxRSS != null) { + lines.push(`| Max RSS (resourceUsage) | ${formatBytes(ru.maxRSS * 1024)} |`); + } + } + const mem = proc.memory; + if (mem && typeof mem === 'object') { + lines.push(`| RSS | ${formatBytes(mem.rss)} (${mem.rss} B) |`); + lines.push(`| Heap used | ${formatBytes(mem.heapUsed)} |`); + lines.push(`| Heap total | ${formatBytes(mem.heapTotal)} |`); + lines.push(`| External | ${formatBytes(mem.external)} |`); + if (mem.arrayBuffers != null) lines.push(`| Array buffers | ${formatBytes(mem.arrayBuffers)} |`); + } + lines.push(''); + + lines.push('## Host'); + lines.push(''); + lines.push(`| Metric | Value |`); + lines.push(`| --- | --- |`); + if (host.hostname) lines.push(`| Hostname | ${host.hostname} |`); + if (host.platform) lines.push(`| OS | ${host.platform} ${host.release ?? ''} |`); + if (host.arch) lines.push(`| Arch | ${host.arch} |`); + if (host.cpuCount != null) lines.push(`| CPUs | ${host.cpuCount} |`); + if (host.cpuModel) lines.push(`| CPU model | ${host.cpuModel} |`); + if (host.totalMemBytes != null) { + lines.push(`| RAM total | ${formatBytes(host.totalMemBytes)} |`); + lines.push(`| RAM free | ${formatBytes(host.freeMemBytes)} |`); + lines.push(`| RAM used | ${formatBytes(host.usedMemBytes)} |`); + } + if (host.loadAvg && host.loadAvg.length) { + lines.push(`| Load avg (1/5/15) | ${host.loadAvg.map((x) => x.toFixed(2)).join(' / ')} |`); + } + if (host.osUptimeSec != null) lines.push(`| OS uptime | ${(host.osUptimeSec / 3600).toFixed(2)} h |`); + lines.push(''); + + if (payload.env && typeof payload.env === 'object') { + lines.push('## Environment (selected)'); + lines.push(''); + lines.push(`| Variable | Value |`); + lines.push(`| --- | --- |`); + for (const [k, v] of Object.entries(payload.env)) { + lines.push(`| \`${k}\` | ${v === null || v === undefined ? '—' : String(v)} |`); + } + lines.push(''); + } + + if (payload.error) { + lines.push('## Error'); + lines.push(''); + lines.push('```'); + lines.push(String(payload.error)); + lines.push('```'); + lines.push(''); + } + + lines.push('---'); + lines.push(`*Written ${meta.writtenAt ?? new Date().toISOString()}*`); + lines.push(''); + + return lines.join('\n'); +} + +/** + * Build metrics block for JSON + MD (host snapshot + process finalize). + */ +export function buildMetricsBundle(collector, startedAtIso, finishedAtIso) { + const host = collector.hostSnapshot(); + const processMetrics = collector.finalize(); + return { + timing: { + startedAt: startedAtIso, + finishedAt: finishedAtIso, + }, + host, + process: processMetrics, + }; +} + +/** + * @param {string} testName + * @param {Record} data — merged into payload (meta + metrics added) + * @param {{ cwd?: string, now?: Date, metrics?: object }} [options] + * @returns {Promise<{ jsonPath: string, mdPath: string }>} + */ +export async function writeTestReports(testName, data, options = {}) { + const cwd = options.cwd ?? process.cwd(); + const now = options.now ?? new Date(); + const jsonPath = reportFilePath(testName, { cwd, now }); + const mdPath = reportMarkdownPath(testName, { cwd, now }); + const { hh, mm, label } = timeParts(now); + + const base = sanitizeTestName(testName); + const payload = { + meta: { + testName: base, + displayName: `${base}::${label}`, + cwd, + writtenAt: now.toISOString(), + jsonFile: jsonPath, + mdFile: mdPath, + }, + ...data, + }; + + await mkdir(dirname(jsonPath), { recursive: true }); + await writeFile(jsonPath, JSON.stringify(payload, null, 2), 'utf8'); + + const md = renderMarkdownReport(payload); + await writeFile(mdPath, md, 'utf8'); + + return { jsonPath, mdPath }; +} + +/** @deprecated Prefer writeTestReports */ +export async function writeJsonReport(testName, data, options = {}) { + const { jsonPath } = await writeTestReports(testName, data, options); + return jsonPath; +} diff --git a/packages/kbot/cpp/orchestrator/test-commons.js b/packages/kbot/cpp/orchestrator/test-commons.js new file mode 100644 index 00000000..0e050b51 --- /dev/null +++ b/packages/kbot/cpp/orchestrator/test-commons.js @@ -0,0 +1,175 @@ +/** + * orchestrator/test-commons.js — shared helpers for IPC orchestrator tests. + */ + +import { randomUUID } from 'node:crypto'; + +/** kbot-ai live call runs unless KBOT_IPC_LLM is explicitly disabled. */ +export function ipcLlmEnabled() { + const v = process.env.KBOT_IPC_LLM; + if (v === undefined || v === '') return true; + const s = String(v).trim().toLowerCase(); + if (s === '0' || s === 'false' || s === 'no' || s === 'off') return false; + return true; +} + +/** Counters for a test run (create one per process / suite). */ +export function createAssert() { + let passed = 0; + let failed = 0; + + function assert(condition, label) { + if (condition) { + console.log(` ✅ ${label}`); + passed++; + } else { + console.error(` ❌ ${label}`); + failed++; + } + } + + return { + assert, + get passed() { + return passed; + }, + get failed() { + return failed; + }, + }; +} + +/** Normalize IPC payload (object or JSON string). */ +export function payloadObj(msg) { + const p = msg?.payload; + if (p == null) return null; + if (typeof p === 'string') { + try { + return JSON.parse(p); + } catch { + return { raw: p }; + } + } + return p; +} + +/** Print LLM job_result so it is easy to spot (stdout, not mixed with worker stderr). */ +export function logKbotAiResponse(stepLabel, msg) { + const p = payloadObj(msg); + const text = p?.text != null ? String(p.text) : ''; + const err = p?.error != null ? String(p.error) : ''; + const maxRaw = process.env.KBOT_IPC_LLM_LOG_MAX; + const max = + maxRaw === undefined || maxRaw === '' + ? Infinity + : Number.parseInt(maxRaw, 10); + + console.log(''); + console.log(` ┌── ${stepLabel} ──────────────────────────────────────────`); + console.log(` │ type: ${msg?.type ?? '?'}`); + if (p && typeof p === 'object') { + console.log(` │ status: ${p.status ?? '?'}`); + if (p.mode != null) console.log(` │ mode: ${p.mode}`); + if (p.router != null) console.log(` │ router: ${p.router}`); + if (p.model != null) console.log(` │ model: ${p.model}`); + } + if (err) { + const showErr = + Number.isFinite(max) && err.length > max + ? `${err.slice(0, max)}… [truncated, ${err.length} chars]` + : err; + console.log(` │ error: ${showErr.replace(/\n/g, '\n │ ')}`); + } + if (text) { + let body = text; + let note = ''; + if (Number.isFinite(max) && text.length > max) { + body = text.slice(0, max); + note = `\n │ … [truncated: ${text.length} chars total; set KBOT_IPC_LLM_LOG_MAX= to adjust]`; + } + console.log(' │ text:'); + for (const line of body.split('\n')) { + console.log(` │ ${line}`); + } + if (note) console.log(note); + } else if (!err) { + console.log(' │ (no text in payload)'); + } + console.log(' └────────────────────────────────────────────────────────────'); + console.log(''); +} + +/** + * Length-prefixed JSON framing used by the C++ UDS worker. + * Call `attach()` once to wire `socket.on('data', ...)`. + */ +export function createIpcClient(socket) { + const pending = new Map(); + let readyResolve; + const readyPromise = new Promise((res) => { + readyResolve = res; + }); + + let buffer = Buffer.alloc(0); + + function onData(chunk) { + buffer = Buffer.concat([buffer, chunk]); + while (buffer.length >= 4) { + const len = buffer.readUInt32LE(0); + if (buffer.length >= 4 + len) { + const payload = buffer.toString('utf8', 4, 4 + len); + buffer = buffer.subarray(4 + len); + try { + const msg = JSON.parse(payload); + if (msg.type === 'ready') { + readyResolve(msg); + } else if (msg.id && pending.has(msg.id)) { + const p = pending.get(msg.id); + clearTimeout(p.timer); + pending.delete(msg.id); + p.resolve(msg); + } + } catch (e) { + console.error('[orchestrator] frame parse error', e); + } + } else { + break; + } + } + } + + function request(msg, timeoutMs = 5000) { + return new Promise((resolve, reject) => { + const id = msg.id || randomUUID(); + msg.id = id; + const timer = setTimeout(() => { + pending.delete(id); + reject(new Error(`IPC request timed out`)); + }, timeoutMs); + pending.set(id, { resolve, reject, timer }); + + const str = JSON.stringify(msg); + const lenBuf = Buffer.alloc(4); + lenBuf.writeUInt32LE(Buffer.byteLength(str)); + socket.write(lenBuf); + socket.write(str); + }); + } + + return { + pending, + readyPromise, + request, + attach() { + socket.on('data', onData); + }, + }; +} + +/** Forward worker stderr lines to console (prefixed). */ +export function pipeWorkerStderr(workerProc, label = '[worker:stderr]') { + workerProc.stderr.on('data', (d) => { + const txt = d.toString().trim(); + if (txt) console.error(`${label} ${txt}`); + }); +} diff --git a/packages/kbot/cpp/orchestrator/test-ipc.mjs b/packages/kbot/cpp/orchestrator/test-ipc.mjs index f1d52994..d54e9332 100644 --- a/packages/kbot/cpp/orchestrator/test-ipc.mjs +++ b/packages/kbot/cpp/orchestrator/test-ipc.mjs @@ -4,34 +4,58 @@ * Integration test: spawn the C++ worker in UDS mode, exchange messages, verify responses. * * Run: npm run test:ipc + * + * Env: + * KBOT_IPC_LLM — real LLM step is on by default; set to 0 / false / no / off to skip (CI / offline). + * KBOT_ROUTER — router (default: openrouter; same defaults as C++ LLMClient / CLI) + * KBOT_IPC_MODEL — optional model id (e.g. openrouter slug); else C++ default for that router + * KBOT_IPC_PROMPT — custom prompt (default: capital of Germany; asserts "berlin" in reply) + * KBOT_IPC_LLM_LOG_MAX — max chars to print for LLM text (default: unlimited) + * + * Shared: presets.js, test-commons.js, reports.js + * Report: cwd/tests/test-ipc__HH-mm.{json,md} (see reports.js) */ import { spawn } from 'node:child_process'; -import { resolve, dirname } from 'node:path'; +import { dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; import net from 'node:net'; -import { randomUUID } from 'node:crypto'; import { existsSync, unlinkSync } from 'node:fs'; +import { + distExePath, + platform, + uds, + timeouts, + kbotAiPayloadFromEnv, + usingDefaultGermanyPrompt, +} from './presets.js'; +import { + createAssert, + payloadObj, + logKbotAiResponse, + ipcLlmEnabled, + createIpcClient, + pipeWorkerStderr, +} from './test-commons.js'; +import { + createMetricsCollector, + buildMetricsBundle, + writeTestReports, +} from './reports.js'; + const __dirname = dirname(fileURLToPath(import.meta.url)); -const IS_WIN = process.platform === 'win32'; -const EXE_NAME = IS_WIN ? 'kbot.exe' : 'kbot'; -const EXE = resolve(__dirname, '..', 'dist', EXE_NAME); +const EXE = distExePath(__dirname); +const stats = createAssert(); +const { assert } = stats; -let passed = 0; -let failed = 0; - -function assert(condition, label) { - if (condition) { - console.log(` ✅ ${label}`); - passed++; - } else { - console.error(` ❌ ${label}`); - failed++; - } -} +/** Set at run start for error reports */ +let ipcRunStartedAt = null; +let ipcMetricsCollector = null; async function run() { + ipcMetricsCollector = createMetricsCollector(); + ipcRunStartedAt = new Date().toISOString(); console.log('\n🔧 IPC [UDS] Integration Tests\n'); if (!existsSync(EXE)) { @@ -39,102 +63,47 @@ async function run() { process.exit(1); } - const CPP_UDS_ARG = IS_WIN ? '4001' : '/tmp/kbot-test-ipc.sock'; - if (!IS_WIN && existsSync(CPP_UDS_ARG)) { + const CPP_UDS_ARG = uds.workerArg(); + if (!platform.isWin && existsSync(CPP_UDS_ARG)) { unlinkSync(CPP_UDS_ARG); } // ── 1. Spawn & ready ──────────────────────────────────────────────────── console.log('1. Spawn worker (UDS mode) and wait for ready signal'); const workerProc = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG], { stdio: 'pipe' }); - - workerProc.stderr.on('data', d => { - const txt = d.toString().trim(); - if (txt) console.error(`[worker:stderr] ${txt}`); - }); + pipeWorkerStderr(workerProc); let socket; - for (let i = 0; i < 15; i++) { + for (let i = 0; i < timeouts.connectAttempts; i++) { try { await new Promise((res, rej) => { - if (IS_WIN) { - socket = net.connect({ port: 4001, host: '127.0.0.1' }); - } else { - socket = net.connect(CPP_UDS_ARG); - } + socket = net.connect(uds.connectOpts(CPP_UDS_ARG)); socket.once('connect', res); socket.once('error', rej); }); break; } catch (e) { - if (i === 14) throw e; - await new Promise(r => setTimeout(r, 400)); + if (i === timeouts.connectAttempts - 1) throw e; + await new Promise((r) => setTimeout(r, timeouts.connectRetryMs)); } } assert(true, 'Socket connected successfully'); - // Pending request map: id → { resolve, reject, timer } - const pending = new Map(); - let readyResolve; - const readyPromise = new Promise(res => { readyResolve = res; }); + const ipc = createIpcClient(socket); + ipc.attach(); - let buffer = Buffer.alloc(0); - socket.on('data', chunk => { - buffer = Buffer.concat([buffer, chunk]); - while (buffer.length >= 4) { - const len = buffer.readUInt32LE(0); - if (buffer.length >= 4 + len) { - const payload = buffer.toString('utf8', 4, 4 + len); - buffer = buffer.subarray(4 + len); - try { - const msg = JSON.parse(payload); - if (msg.type === 'ready') { - readyResolve(msg); - } else if (msg.id && pending.has(msg.id)) { - const p = pending.get(msg.id); - clearTimeout(p.timer); - pending.delete(msg.id); - p.resolve(msg); - } - } catch (e) { - console.error('[orchestrator] frame parse error', e); - } - } else { - break; - } - } - }); - - function request(msg, timeoutMs = 5000) { - return new Promise((resolve, reject) => { - const id = msg.id || randomUUID(); - msg.id = id; - const timer = setTimeout(() => { - pending.delete(id); - reject(new Error(`IPC request timed out`)); - }, timeoutMs); - pending.set(id, { resolve, reject, timer }); - - const str = JSON.stringify(msg); - const lenBuf = Buffer.alloc(4); - lenBuf.writeUInt32LE(Buffer.byteLength(str)); - socket.write(lenBuf); - socket.write(str); - }); - } - - const readyMsg = await readyPromise; + const readyMsg = await ipc.readyPromise; assert(readyMsg.type === 'ready', 'Worker sends ready message on startup'); // ── 2. Ping / Pong ───────────────────────────────────────────────────── console.log('2. Ping → Pong'); - const pong = await request({ type: 'ping' }); + const pong = await ipc.request({ type: 'ping' }, timeouts.ipcDefault); assert(pong.type === 'pong', `Response type is "pong" (got "${pong.type}")`); // ── 3. Job echo ───────────────────────────────────────────────────────── console.log('3. Job → Job Result (echo payload)'); const payload = { action: 'resize', width: 1024, format: 'webp' }; - const jobResult = await request({ type: 'job', payload }); + const jobResult = await ipc.request({ type: 'job', payload }, timeouts.ipcDefault); assert(jobResult.type === 'job_result', `Response type is "job_result" (got "${jobResult.type}")`); assert( jobResult.payload?.action === 'resize' && jobResult.payload?.width === 1024, @@ -143,38 +112,114 @@ async function run() { // ── 4. Unknown type → error ───────────────────────────────────────────── console.log('4. Unknown type → error response'); - const errResp = await request({ type: 'nonsense' }); + const errResp = await ipc.request({ type: 'nonsense' }, timeouts.ipcDefault); assert(errResp.type === 'error', `Response type is "error" (got "${errResp.type}")`); // ── 5. Multiple rapid requests ────────────────────────────────────────── console.log('5. Multiple concurrent requests'); const promises = []; for (let i = 0; i < 10; i++) { - promises.push(request({ type: 'ping', payload: { seq: i } })); + promises.push(ipc.request({ type: 'ping', payload: { seq: i } }, timeouts.ipcDefault)); } const results = await Promise.all(promises); assert(results.length === 10, `All 10 responses received`); - assert(results.every(r => r.type === 'pong'), 'All responses are pong'); + assert(results.every((r) => r.type === 'pong'), 'All responses are pong'); - // ── 6. Graceful shutdown ──────────────────────────────────────────────── - console.log('6. Graceful shutdown'); - const shutdownRes = await request({ type: 'shutdown' }); + // ── 6. kbot-ai — real LLM (optional via ipcLlmEnabled) ───────────────── + if (ipcLlmEnabled()) { + const aiPayload = kbotAiPayloadFromEnv(); + const r = aiPayload.router; + console.log(`6. kbot-ai → real LLM (router=${r}, timeout 3m)`); + const live = await ipc.request( + { + type: 'kbot-ai', + payload: aiPayload, + }, + timeouts.kbotAi + ); + assert(live.type === 'job_result', `LLM response type job_result (got "${live.type}")`); + const lp = payloadObj(live); + assert(lp?.status === 'success', `payload status success (got "${lp?.status}")`); + assert( + typeof lp?.text === 'string' && lp.text.trim().length >= 3, + `assistant text present (length ${(lp?.text || '').length})` + ); + if (usingDefaultGermanyPrompt()) { + assert( + /berlin/i.test(String(lp?.text || '')), + 'assistant text mentions Berlin (capital of Germany)' + ); + } + logKbotAiResponse('kbot-ai response', live); + } else { + console.log('6. kbot-ai — skipped (KBOT_IPC_LLM=0/false/no/off; default is to run live LLM)'); + } + + // ── 7. Graceful shutdown ──────────────────────────────────────────────── + console.log('7. Graceful shutdown'); + const shutdownRes = await ipc.request({ type: 'shutdown' }, timeouts.ipcDefault); assert(shutdownRes.type === 'shutdown_ack', `Shutdown acknowledged (got "${shutdownRes.type}")`); - // Wait a beat for process exit - await new Promise(r => setTimeout(r, 200)); + await new Promise((r) => setTimeout(r, timeouts.postShutdownMs)); socket.destroy(); assert(workerProc.exitCode === 0, `Worker exited with code 0 (got ${workerProc.exitCode})`); // ── Summary ───────────────────────────────────────────────────────────── console.log(`\n────────────────────────────────`); - console.log(` Passed: ${passed} Failed: ${failed}`); + console.log(` Passed: ${stats.passed} Failed: ${stats.failed}`); console.log(`────────────────────────────────\n`); - process.exit(failed > 0 ? 1 : 0); + try { + const finishedAt = new Date().toISOString(); + const { jsonPath, mdPath } = await writeTestReports( + 'test-ipc', + { + startedAt: ipcRunStartedAt, + finishedAt, + passed: stats.passed, + failed: stats.failed, + ok: stats.failed === 0, + ipcLlm: ipcLlmEnabled(), + env: { + KBOT_IPC_LLM: process.env.KBOT_IPC_LLM ?? null, + KBOT_ROUTER: process.env.KBOT_ROUTER ?? null, + KBOT_IPC_MODEL: process.env.KBOT_IPC_MODEL ?? null, + KBOT_IPC_PROMPT: process.env.KBOT_IPC_PROMPT ?? null, + }, + metrics: buildMetricsBundle(ipcMetricsCollector, ipcRunStartedAt, finishedAt), + }, + { cwd: process.cwd() } + ); + console.log(` 📄 Report JSON: ${jsonPath}`); + console.log(` 📄 Report MD: ${mdPath}\n`); + } catch (e) { + console.error(' ⚠️ Failed to write report:', e?.message ?? e); + } + + process.exit(stats.failed > 0 ? 1 : 0); } -run().catch((err) => { +run().catch(async (err) => { console.error('Test runner error:', err); + try { + const finishedAt = new Date().toISOString(); + const c = ipcMetricsCollector ?? createMetricsCollector(); + const started = ipcRunStartedAt ?? finishedAt; + await writeTestReports( + 'test-ipc', + { + startedAt: started, + finishedAt, + error: String(err?.stack ?? err), + passed: stats.passed, + failed: stats.failed, + ok: false, + metrics: buildMetricsBundle(c, started, finishedAt), + }, + { cwd: process.cwd() } + ); + } catch (_) { + /* ignore */ + } process.exit(1); }); diff --git a/packages/kbot/cpp/package.json b/packages/kbot/cpp/package.json index a681e7af..f76e68bd 100644 --- a/packages/kbot/cpp/package.json +++ b/packages/kbot/cpp/package.json @@ -1,6 +1,7 @@ { "name": "kbot-cpp", "version": "1.0.0", + "type": "module", "description": "KBot C++ CLI built with CMake.", "directories": { "test": "tests" diff --git a/packages/kbot/cpp/packages/kbot/llm_client.cpp b/packages/kbot/cpp/packages/kbot/llm_client.cpp index 752407c5..7ba7fe79 100644 --- a/packages/kbot/cpp/packages/kbot/llm_client.cpp +++ b/packages/kbot/cpp/packages/kbot/llm_client.cpp @@ -31,7 +31,7 @@ LLMClient::LLMClient(const KBotOptions& opts) else if (router_ == "openai") model_ = "gpt-4o"; else if (router_ == "deepseek") model_ = "deepseek-chat"; else if (router_ == "huggingface") model_ = "meta-llama/2"; - else if (router_ == "ollama") model_ = "leonard"; + else if (router_ == "ollama") model_ = "llama3.2"; else if (router_ == "fireworks") model_ = "llama-v2-70b-chat"; else if (router_ == "gemini") model_ = "gemini-1.5-pro"; else if (router_ == "xai") model_ = "grok-1"; diff --git a/packages/kbot/cpp/scripts/qwen3_4b.sh b/packages/kbot/cpp/scripts/qwen3_4b.sh new file mode 100644 index 00000000..6a635d55 --- /dev/null +++ b/packages/kbot/cpp/scripts/qwen3_4b.sh @@ -0,0 +1,2 @@ +ollama run qwen2.5-coder:latest + diff --git a/packages/kbot/cpp/scripts/run-7b.sh b/packages/kbot/cpp/scripts/run-7b.sh new file mode 100644 index 00000000..b65a22d1 --- /dev/null +++ b/packages/kbot/cpp/scripts/run-7b.sh @@ -0,0 +1,9 @@ +./llama-server.exe \ + --hf-repo paultimothymooney/Qwen2.5-7B-Instruct-Q4_K_M-GGUF \ + --hf-file qwen2.5-7b-instruct-q4_k_m.gguf \ + -t 16 \ + -c 2048 \ + -b 512 \ + --temp 0.2 \ + --top-p 0.9 \ + --port 8888 diff --git a/packages/kbot/cpp/scripts/setup-7b.sh b/packages/kbot/cpp/scripts/setup-7b.sh new file mode 100644 index 00000000..0be849ee --- /dev/null +++ b/packages/kbot/cpp/scripts/setup-7b.sh @@ -0,0 +1,4 @@ +mkdir -p models/qwen2.5-7b +cd models/qwen2.5-7b + +curl -L -O https://huggingface.co/Qwen/Qwen2.5-7B-Instruct-GGUF/resolve/main/qwen2.5-7b-instruct-q4_k_m.gguf diff --git a/packages/kbot/cpp/src/cmd_kbot_uds.cpp b/packages/kbot/cpp/src/cmd_kbot_uds.cpp new file mode 100644 index 00000000..7de318d3 --- /dev/null +++ b/packages/kbot/cpp/src/cmd_kbot_uds.cpp @@ -0,0 +1,370 @@ +// cmd_kbot_uds.cpp — UDS/TCP worker for KBot LLM IPC (length-prefixed JSON frames). +// Framing matches orchestrator tests: [uint32_le length][utf-8 JSON object with id, type, payload]. + +#include "cmd_kbot.h" +#include "concurrentqueue.h" +#include "logger/logger.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace polymech { +namespace { + +#ifdef _WIN32 +using ipc_endpoint = asio::ip::tcp::endpoint; +using ipc_acceptor = asio::ip::tcp::acceptor; +using ipc_socket = asio::ip::tcp::socket; +#else +using ipc_endpoint = asio::local::stream_protocol::endpoint; +using ipc_acceptor = asio::local::stream_protocol::acceptor; +using ipc_socket = asio::local::stream_protocol::socket; +#endif + +std::shared_ptr g_active_uds_socket; +std::mutex g_uds_socket_mutex; + +std::string json_escape_log_line(const std::string &s) { + rapidjson::StringBuffer buf; + rapidjson::Writer w(buf); + w.String(s.c_str(), static_cast(s.length())); + std::string out(buf.GetString(), buf.GetSize()); + if (out.size() >= 2 && out.front() == '"' && out.back() == '"') + return out.substr(1, out.size() - 2); + return out; +} + +template +class kbot_uds_sink : public spdlog::sinks::base_sink { +protected: + void sink_it_(const spdlog::details::log_msg &msg) override { + spdlog::memory_buf_t formatted; + this->formatter_->format(msg, formatted); + std::string text = fmt::to_string(formatted); + if (!text.empty() && text.back() == '\n') + text.pop_back(); + + std::lock_guard lock(g_uds_socket_mutex); + if (!g_active_uds_socket) + return; + try { + std::string escaped = json_escape_log_line(text); + std::string frame = "{\"type\":\"log\",\"data\":\"" + escaped + "\"}"; + uint32_t len = static_cast(frame.size()); + asio::write(*g_active_uds_socket, asio::buffer(&len, 4)); + asio::write(*g_active_uds_socket, asio::buffer(frame)); + } catch (...) { + } + } + void flush_() override {} +}; + +using kbot_uds_sink_mt = kbot_uds_sink; + +struct KbotUdsJob { + std::string payload; + std::string job_id; + std::shared_ptr socket; + std::shared_ptr> cancel_token; +}; + +std::string request_id_string(const rapidjson::Document &doc) { + if (doc.HasMember("id") && doc["id"].IsString()) + return doc["id"].GetString(); + if (doc.HasMember("jobId") && doc["jobId"].IsString()) + return doc["jobId"].GetString(); + return "kbot-uds-" + + std::to_string( + std::chrono::system_clock::now().time_since_epoch().count()); +} + +void write_raw_frame(const std::shared_ptr &sock, + const std::string &json_body) { + uint32_t len = static_cast(json_body.size()); + std::lock_guard lock(g_uds_socket_mutex); + asio::write(*sock, asio::buffer(&len, 4)); + asio::write(*sock, asio::buffer(json_body)); +} + +} // namespace + +int run_cmd_kbot_uds(const std::string &pipe_path) { + logger::info("Starting KBot UDS on " + pipe_path); + std::atomic running{true}; + asio::io_context io_context; + std::shared_ptr acceptor; + + try { +#ifdef _WIN32 + int port = 4000; + try { + port = std::stoi(pipe_path); + } catch (...) { + } + ipc_endpoint ep(asio::ip::tcp::v4(), static_cast(port)); + acceptor = std::make_shared(io_context, ep); + logger::info("KBot UDS: bound TCP 127.0.0.1:" + std::to_string(port)); +#else + std::remove(pipe_path.c_str()); + ipc_endpoint ep(pipe_path); + acceptor = std::make_shared(io_context, ep); +#endif + } catch (const std::exception &e) { + logger::error(std::string("KBot UDS bind failed: ") + e.what()); + return 1; + } + + const int k_frame_max = 50 * 1024 * 1024; + const int k_queue_depth_max = 10000; + int threads = static_cast(std::thread::hardware_concurrency()); + if (threads <= 0) + threads = 2; + tf::Executor executor(threads); + + moodycamel::ConcurrentQueue queue; + + auto log_sink = std::make_shared(); + log_sink->set_pattern("%^%l%$ %v"); + spdlog::default_logger()->sinks().push_back(log_sink); + + std::thread uds_job_queue_thread([&]() { + KbotUdsJob job; + while (running.load()) { + if (!queue.try_dequeue(job)) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + continue; + } + + tf::Taskflow tf; + tf.emplace([job]() { + { + std::lock_guard lock(g_uds_socket_mutex); + g_active_uds_socket = job.socket; + } + + kbot::KBotCallbacks cb; + cb.onEvent = [sock = job.socket, jid = job.job_id]( + const std::string &type, const std::string &json) { + try { + std::string resolved_id = + (type == "job_result" || type == "error") ? jid : "0"; + std::string msg = "{\"id\":\"" + resolved_id + + "\",\"type\":\"" + type + "\",\"payload\":" + + json + "}"; + uint32_t len = static_cast(msg.size()); + std::lock_guard lock(g_uds_socket_mutex); + asio::write(*sock, asio::buffer(&len, 4)); + asio::write(*sock, asio::buffer(msg)); + } catch (...) { + } + }; + + rapidjson::Document doc; + doc.Parse(job.payload.c_str()); + if (doc.HasParseError()) { + cb.onEvent("error", "\"invalid JSON payload\""); + std::lock_guard lock(g_uds_socket_mutex); + g_active_uds_socket.reset(); + return; + } + + std::string job_type; + if (doc.HasMember("type") && doc["type"].IsString()) + job_type = doc["type"].GetString(); + + if (job_type == "job") { + std::string payload_str = "{}"; + if (doc.HasMember("payload")) { + rapidjson::StringBuffer sbuf; + rapidjson::Writer writer(sbuf); + doc["payload"].Accept(writer); + payload_str = sbuf.GetString(); + } + cb.onEvent("job_result", payload_str); + } else if (job_type == "kbot-ai") { + kbot::KBotCallbacks ai_cb; + ai_cb.onEvent = [&cb](const std::string &t, const std::string &j) { + cb.onEvent(t, j); + }; + std::string payload_str = "{}"; + if (doc.HasMember("payload")) { + if (doc["payload"].IsString()) { + payload_str = doc["payload"].GetString(); + } else { + rapidjson::StringBuffer sbuf; + rapidjson::Writer writer(sbuf); + doc["payload"].Accept(writer); + payload_str = sbuf.GetString(); + } + } + polymech::run_kbot_ai_ipc(payload_str, job.job_id, ai_cb); + } else if (job_type == "kbot-run") { + kbot::KBotCallbacks run_cb; + run_cb.onEvent = [&cb](const std::string &t, const std::string &j) { + cb.onEvent(t, j); + }; + std::string payload_str = "{}"; + if (doc.HasMember("payload")) { + if (doc["payload"].IsString()) { + payload_str = doc["payload"].GetString(); + } else { + rapidjson::StringBuffer sbuf; + rapidjson::Writer writer(sbuf); + doc["payload"].Accept(writer); + payload_str = sbuf.GetString(); + } + } + polymech::run_kbot_run_ipc(payload_str, job.job_id, run_cb); + } else { + rapidjson::StringBuffer sbuf; + rapidjson::Writer w(sbuf); + w.StartObject(); + w.Key("message"); + std::string m = "unsupported type: " + job_type; + w.String(m.c_str(), static_cast(m.size())); + w.EndObject(); + cb.onEvent("error", sbuf.GetString()); + } + + { + std::lock_guard lock(g_uds_socket_mutex); + g_active_uds_socket.reset(); + } + }); + + executor.run(tf).wait(); + } + }); + + logger::info("KBot UDS ready; waiting for connections…"); + while (running.load()) { + auto socket = std::make_shared(io_context); + asio::error_code ec; + acceptor->accept(*socket, ec); + if (ec || !running.load()) + break; + + logger::info("KBot UDS client connected"); + + std::thread( + [socket, &queue, &running, acceptor, k_frame_max, k_queue_depth_max]() { + std::unordered_map>> + socket_jobs; + + try { + { + std::string ready = + R"({"id":"0","type":"ready","payload":{}})"; + uint32_t rlen = static_cast(ready.size()); + std::lock_guard lock(g_uds_socket_mutex); + asio::write(*socket, asio::buffer(&rlen, 4)); + asio::write(*socket, asio::buffer(ready)); + } + + while (true) { + uint32_t len = 0; + asio::read(*socket, asio::buffer(&len, 4)); + if (len == 0 || + len > static_cast(k_frame_max)) + break; + + std::string raw(len, '\0'); + asio::read(*socket, asio::buffer(raw.data(), len)); + + rapidjson::Document doc; + doc.Parse(raw.c_str()); + + if (!doc.HasParseError()) { + std::string action; + if (doc.HasMember("action") && doc["action"].IsString()) + action = doc["action"].GetString(); + else if (doc.HasMember("type") && doc["type"].IsString()) + action = doc["type"].GetString(); + + auto id_for = [&doc]() -> std::string { + if (doc.HasMember("id") && doc["id"].IsString()) + return doc["id"].GetString(); + return "0"; + }; + + if (action == "ping") { + std::string res_id = id_for(); + std::string ack = "{\"id\":\"" + res_id + + "\",\"type\":\"pong\",\"payload\":{}}"; + write_raw_frame(socket, ack); + continue; + } + if (action == "nonsense") { + std::string res_id = id_for(); + std::string ack = "{\"id\":\"" + res_id + + "\",\"type\":\"error\",\"payload\":{}}"; + write_raw_frame(socket, ack); + continue; + } + if (action == "cancel") { + if (doc.HasMember("jobId") && doc["jobId"].IsString()) { + std::string jid = doc["jobId"].GetString(); + if (socket_jobs.count(jid) && socket_jobs[jid]) { + *socket_jobs[jid] = true; + std::string ack = + "{\"type\":\"cancel_ack\",\"data\":\"" + jid + "\"}"; + write_raw_frame(socket, ack); + } + } + continue; + } + if (action == "stop" || action == "shutdown") { + logger::info("KBot UDS: shutdown requested"); + std::string res_id = id_for(); + std::string ack = "{\"id\":\"" + res_id + + "\",\"type\":\"shutdown_ack\"," + "\"payload\":{}}"; + write_raw_frame(socket, ack); + running.store(false); + try { + acceptor->close(); + } catch (...) { + } + break; + } + } else { + continue; + } + + std::string jid = request_id_string(doc); + auto cancel_token = std::make_shared>(false); + socket_jobs[jid] = cancel_token; + + KbotUdsJob job{raw, jid, socket, cancel_token}; + while (queue.size_approx() >= + static_cast(k_queue_depth_max)) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + queue.enqueue(std::move(job)); + } + } catch (const std::exception &) { + for (auto &kv : socket_jobs) { + if (kv.second) + *kv.second = true; + } + } + }) + .detach(); + } + + running.store(false); + uds_job_queue_thread.join(); + return 0; +} + +} // namespace polymech