mono/packages/kbot/cpp/orchestrator/test-ipc-classifier.mjs

408 lines
13 KiB
JavaScript

/**
* orchestrator/test-ipc-classifier.mjs
*
* IPC + local llama: one kbot-ai call — semantic distance from anchor "machine workshop"
* to every business label (JobViewer.tsx ~205). Output is a single JSON array (+ meta).
*
* Run: npm run test:ipc:classifier
*
* Env:
* KBOT_IPC_CLASSIFIER_LLAMA — set 0 to use OpenRouter (KBOT_ROUTER, KBOT_IPC_MODEL) instead of local llama :8888
* KBOT_IPC_LLAMA_AUTOSTART — 0 to skip spawning run-7b.sh (llama mode only)
* KBOT_ROUTER / KBOT_IPC_MODEL — when classifier llama is off (same as test-ipc step 6)
* KBOT_CLASSIFIER_LIMIT — max labels in the batch (default: all)
* KBOT_CLASSIFIER_TIMEOUT_MS — single batched kbot-ai call (default: 300000)
*
* OpenRouter: npm run test:ipc:classifier:openrouter (sets KBOT_IPC_CLASSIFIER_LLAMA=0)
*
* Reports (reports.js): cwd/tests/test-ipc-classifier__HH-mm.{json,md}; array-only distances in
* test-ipc-classifier-distances__HH-mm.json (same timestamp as the main JSON).
*/
import { spawn } from 'node:child_process';
import { mkdir, writeFile } from 'node:fs/promises';
import { dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import net from 'node:net';
import { existsSync, unlinkSync } from 'node:fs';
import {
distExePath,
platform,
uds,
timeouts,
kbotAiPayloadLlamaLocal,
kbotAiPayloadFromEnv,
ensureLlamaLocalServer,
llama,
router,
} from './presets.js';
import {
createAssert,
payloadObj,
llamaAutostartEnabled,
ipcClassifierLlamaEnabled,
createIpcClient,
pipeWorkerStderr,
} from './test-commons.js';
import {
reportFilePathWithExt,
timeParts,
createMetricsCollector,
buildMetricsBundle,
writeTestReports,
} from './reports.js';
const __dirname = dirname(fileURLToPath(import.meta.url));
/** Set at run start; used by catch for error reports */
let classifierMetricsCollector = null;
let classifierRunStartedAt = null;
const EXE = distExePath(__dirname);
const stats = createAssert();
const { assert } = stats;
/** @see packages/kbot/.../JobViewer.tsx — business type options */
export const JOB_VIEWER_MACHINE_LABELS = [
'3D printing service',
'Drafting service',
'Engraver',
'Furniture maker',
'Industrial engineer',
'Industrial equipment supplier',
'Laser cutting service',
'Machine construction',
'Machine repair service',
'Machine shop',
'Machine workshop',
'Machinery parts manufacturer',
'Machining manufacturer',
'Manufacturer',
'Mechanic',
'Mechanical engineer',
'Mechanical plant',
'Metal fabricator',
'Metal heat treating service',
'Metal machinery supplier',
'Metal working shop',
'Metal workshop',
'Novelty store',
'Plywood supplier',
'Sign shop',
'Tool manufacturer',
'Trophy shop',
];
const ANCHOR = 'machine workshop';
/** Build one prompt: model returns a JSON array only. */
function classifierBatchPrompt(labels) {
const numbered = labels.map((l, i) => `${i + 1}. ${JSON.stringify(l)}`).join('\n');
return `You classify business types against one anchor. Output ONLY a JSON array, no markdown fences, no commentary.
Rules for each element:
- Use shape: {"label": <exact string from the list below>, "distance": <number>}
- "distance" is semantic distance from 0 (same as anchor or direct synonym) to 10 (unrelated). One decimal allowed.
- Include EXACTLY one object per line item below, in the SAME ORDER, with "label" copied character-for-character from the list.
Anchor business type: ${ANCHOR}
Candidate labels (in order):
${numbered}
Output: one JSON array, e.g. [{"label":"...","distance":2.5},...]`;
}
function extractJsonArray(text) {
if (!text || typeof text !== 'string') return null;
let s = text.trim().replace(/^```(?:json)?\s*/i, '').replace(/\s*```$/u, '').trim();
try {
const v = JSON.parse(s);
return Array.isArray(v) ? v : null;
} catch {
/* fall through */
}
const i = s.indexOf('[');
const j = s.lastIndexOf(']');
if (i < 0 || j <= i) return null;
try {
const v = JSON.parse(s.slice(i, j + 1));
return Array.isArray(v) ? v : null;
} catch {
return null;
}
}
/**
* @param {unknown[]} arr
* @param {string[]} expectedLabels — ordered
*/
function normalizeBatchArray(arr, expectedLabels) {
const expectedSet = new Set(expectedLabels);
const byLabel = new Map();
for (const item of arr) {
if (!item || typeof item !== 'object') continue;
const label = item.label;
let d = item.distance;
if (typeof d === 'string') d = parseFloat(d);
if (typeof label !== 'string' || typeof d !== 'number' || !Number.isFinite(d)) continue;
if (!expectedSet.has(label)) continue;
byLabel.set(label, d);
}
const distances = expectedLabels.map((label) => ({
label,
distance: byLabel.has(label) ? byLabel.get(label) : null,
}));
const missing = distances.filter((r) => r.distance == null).map((r) => r.label);
return { distances, missing };
}
function batchTimeoutMs() {
const raw = process.env.KBOT_CLASSIFIER_TIMEOUT_MS;
if (raw === undefined || raw === '') return 300_000;
const n = Number.parseInt(raw, 10);
return Number.isFinite(n) && n > 0 ? n : 300_000;
}
/** Log progress while awaiting a long LLM call (no silent hang). */
function withHeartbeat(promise, ipcTimeoutMs, backendLabel) {
const every = 15_000;
let n = 0;
const id = setInterval(() => {
n += 1;
const sec = (n * every) / 1000;
console.log(
` … still waiting on ${backendLabel} (batch is large; ${sec}s elapsed, IPC deadline ${Math.round(ipcTimeoutMs / 1000)}s)…`
);
}, every);
return promise.finally(() => clearInterval(id));
}
function buildKbotAiPayload(labels, tmo) {
const prompt = classifierBatchPrompt(labels);
if (ipcClassifierLlamaEnabled()) {
return { ...kbotAiPayloadLlamaLocal({ prompt }), llm_timeout_ms: tmo };
}
return {
...kbotAiPayloadFromEnv(),
prompt,
llm_timeout_ms: tmo,
};
}
async function run() {
classifierMetricsCollector = createMetricsCollector();
classifierRunStartedAt = new Date().toISOString();
const startedAt = classifierRunStartedAt;
const useLlama = ipcClassifierLlamaEnabled();
const backendLabel = useLlama ? `llama @ :${llama.port}` : `router=${router.fromEnv()}`;
console.log(`\n📐 IPC classifier (${backendLabel}) — one batch, distance vs "machine workshop"\n`);
if (!existsSync(EXE)) {
console.error(`❌ Binary not found at ${EXE}`);
process.exit(1);
}
if (useLlama) {
await ensureLlamaLocalServer({
autostart: llamaAutostartEnabled(),
startTimeoutMs: timeouts.llamaServerStart,
});
}
const limitRaw = process.env.KBOT_CLASSIFIER_LIMIT;
let labels = [...JOB_VIEWER_MACHINE_LABELS];
if (limitRaw !== undefined && limitRaw !== '') {
const lim = Number.parseInt(limitRaw, 10);
if (Number.isFinite(lim) && lim > 0) labels = labels.slice(0, lim);
}
const CPP_UDS_ARG = uds.workerArg();
if (!platform.isWin && existsSync(CPP_UDS_ARG)) {
unlinkSync(CPP_UDS_ARG);
}
const workerProc = spawn(EXE, ['worker', '--uds', CPP_UDS_ARG], { stdio: 'pipe' });
pipeWorkerStderr(workerProc);
let socket;
for (let i = 0; i < timeouts.connectAttempts; i++) {
try {
await new Promise((res, rej) => {
socket = net.connect(uds.connectOpts(CPP_UDS_ARG));
socket.once('connect', res);
socket.once('error', rej);
});
break;
} catch (e) {
if (i === timeouts.connectAttempts - 1) throw e;
await new Promise((r) => setTimeout(r, timeouts.connectRetryMs));
}
}
const ipc = createIpcClient(socket);
ipc.attach();
await ipc.readyPromise;
const tmo = batchTimeoutMs();
const ipcDeadlineMs = tmo + 60_000;
console.log(` Single kbot-ai batch: ${labels.length} labels`);
console.log(` liboai HTTP timeout: ${tmo} ms (llm_timeout_ms) — rebuild kbot if this was stuck at ~30s before`);
console.log(` IPC wait deadline: ${ipcDeadlineMs} ms (HTTP + margin)`);
console.log(` (Large batches can take many minutes; heartbeat every 15s…)\n`);
const payload = buildKbotAiPayload(labels, tmo);
const waitLabel = useLlama ? 'llama' : router.fromEnv();
const msg = await withHeartbeat(
ipc.request({ type: 'kbot-ai', payload }, ipcDeadlineMs),
ipcDeadlineMs,
waitLabel
);
const p = payloadObj(msg);
let rawText = null;
let distances = [];
let parseError = null;
let missing = [];
if (p?.status === 'success' && typeof p?.text === 'string') {
rawText = p.text;
const arr = extractJsonArray(p.text);
if (arr) {
const norm = normalizeBatchArray(arr, labels);
distances = norm.distances;
missing = norm.missing;
if (missing.length === 0) {
assert(true, 'batch JSON array: all labels have distance');
} else {
assert(false, `batch array complete (${missing.length} missing labels)`);
parseError = `missing: ${missing.join('; ')}`;
}
} else {
assert(false, 'batch response parses as JSON array');
parseError = 'could not parse JSON array from model text';
}
} else {
assert(false, 'kbot-ai success');
parseError = p?.error ?? 'not success';
}
const shutdownRes = await ipc.request({ type: 'shutdown' }, timeouts.ipcDefault);
assert(shutdownRes.type === 'shutdown_ack', 'shutdown ack');
await new Promise((r) => setTimeout(r, timeouts.postShutdownMs));
socket.destroy();
assert(workerProc.exitCode === 0, 'worker exit 0');
const finishedAt = new Date().toISOString();
/** Final array: sorted by distance (nulls last). */
const byDistance = [...distances].sort((a, b) => {
if (a.distance == null && b.distance == null) return 0;
if (a.distance == null) return 1;
if (b.distance == null) return -1;
return a.distance - b.distance;
});
const reportNow = new Date();
const cwd = process.cwd();
const reportData = {
startedAt,
finishedAt,
passed: stats.passed,
failed: stats.failed,
ok: stats.failed === 0,
ipcClassifierLlama: useLlama,
env: {
KBOT_IPC_CLASSIFIER_LLAMA: process.env.KBOT_IPC_CLASSIFIER_LLAMA ?? null,
KBOT_IPC_LLAMA_AUTOSTART: process.env.KBOT_IPC_LLAMA_AUTOSTART ?? null,
KBOT_ROUTER: process.env.KBOT_ROUTER ?? null,
KBOT_IPC_MODEL: process.env.KBOT_IPC_MODEL ?? null,
KBOT_CLASSIFIER_LIMIT: process.env.KBOT_CLASSIFIER_LIMIT ?? null,
KBOT_CLASSIFIER_TIMEOUT_MS: process.env.KBOT_CLASSIFIER_TIMEOUT_MS ?? null,
KBOT_LLAMA_PORT: process.env.KBOT_LLAMA_PORT ?? null,
KBOT_LLAMA_BASE_URL: process.env.KBOT_LLAMA_BASE_URL ?? null,
},
metrics: buildMetricsBundle(classifierMetricsCollector, startedAt, finishedAt),
anchor: ANCHOR,
source: 'JobViewer.tsx:205',
batch: true,
backend: useLlama ? 'llama_local' : 'remote_router',
...(useLlama
? {
llama: {
baseURL: llama.baseURL,
port: llama.port,
router: llama.router,
model: llama.model,
},
}
: {
router: router.fromEnv(),
model: process.env.KBOT_IPC_MODEL ?? null,
}),
labelCount: labels.length,
/** Provider metadata from API (usage, model, id, OpenRouter fields) — see LLMClient + kbot `llm` key */
llm: p?.llm ?? null,
distances,
byDistance,
rawText,
parseError: parseError ?? null,
};
let jsonPath = '';
let mdPath = '';
try {
const written = await writeTestReports('test-ipc-classifier', reportData, { cwd, now: reportNow });
jsonPath = written.jsonPath;
mdPath = written.mdPath;
} catch (e) {
console.error(' ⚠️ Failed to write report:', e?.message ?? e);
}
/** Array-only artifact (same timestamp as main report). */
const arrayPath = reportFilePathWithExt('test-ipc-classifier-distances', '.json', { cwd, now: reportNow });
await mkdir(dirname(arrayPath), { recursive: true });
await writeFile(arrayPath, `${JSON.stringify(distances, null, 2)}\n`, 'utf8');
const { label: timeLabel } = timeParts(reportNow);
console.log(`\n────────────────────────────────`);
console.log(` Passed: ${stats.passed} Failed: ${stats.failed}`);
if (jsonPath) console.log(` Report JSON: ${jsonPath}`);
if (mdPath) console.log(` Report MD: ${mdPath}`);
console.log(` Distances JSON: ${arrayPath}`);
console.log(` Run id: test-ipc-classifier::${timeLabel}`);
console.log(` distances.length: ${distances.length}`);
console.log(`────────────────────────────────\n`);
process.exit(stats.failed > 0 ? 1 : 0);
}
run().catch(async (err) => {
console.error('Classifier error:', err);
try {
const finishedAt = new Date().toISOString();
const c = classifierMetricsCollector ?? createMetricsCollector();
const started = classifierRunStartedAt ?? finishedAt;
await writeTestReports(
'test-ipc-classifier',
{
startedAt: started,
finishedAt,
error: String(err?.stack ?? err),
passed: stats.passed,
failed: stats.failed,
ok: false,
ipcClassifierLlama: ipcClassifierLlamaEnabled(),
metrics: buildMetricsBundle(c, started, finishedAt),
},
{ cwd: process.cwd() }
);
} catch (_) {
/* ignore */
}
process.exit(1);
});