kbot iterator cache & retry

This commit is contained in:
lovebird 2025-04-07 02:04:13 +02:00
parent 3754ca7902
commit 1c3c340d4c
13 changed files with 1490 additions and 162 deletions

View File

@ -10,20 +10,24 @@ export interface TransformOptions {
errorCallback: ErrorCallback;
filterCallback: FilterCallback;
targetPath?: string | null;
maxRetries?: number;
retryDelay?: number;
}
export interface GlobalOptions {
throttleDelay?: number;
concurrentTasks?: number;
errorCallback?: ErrorCallback;
filterCallback?: FilterCallback;
maxRetries?: number;
retryDelay?: number;
}
export declare const isNumber: Filter;
export declare const isBoolean: Filter;
export declare const isValidString: Filter;
export declare const testFilters: (filters: Filter[]) => FilterCallback;
export declare const defaultFilters: (filters?: Filter[]) => Filter[];
export declare function transformObject(obj: Record<string, any>, transform: AsyncTransformer, path: string, throttleDelay: number, concurrentTasks: number, errorCallback: ErrorCallback, testCallback: FilterCallback): Promise<void>;
export declare function transformPath(obj: Record<string, any>, keys: string[], transform: AsyncTransformer, throttleDelay: number, concurrentTasks: number, currentPath: string, errorCallback: ErrorCallback, testCallback: FilterCallback): Promise<void>;
export declare function transformObject(obj: Record<string, any>, transform: AsyncTransformer, path: string, throttleDelay: number, concurrentTasks: number, errorCallback: ErrorCallback, testCallback: FilterCallback, maxRetries?: number, retryDelay?: number): Promise<void>;
export declare function transformPath(obj: Record<string, any>, keys: string[], transform: AsyncTransformer, throttleDelay: number, concurrentTasks: number, currentPath: string, errorCallback: ErrorCallback, testCallback: FilterCallback, maxRetries?: number, retryDelay?: number): Promise<void>;
export declare const defaultError: ErrorCallback;
export interface TransformWithOptionsInput {
jsonPath: string;
@ -32,6 +36,8 @@ export interface TransformWithOptionsInput {
concurrentTasks?: number;
errorCallback?: ErrorCallback;
filterCallback?: FilterCallback;
maxRetries?: number;
retryDelay?: number;
}
export declare function transformObjectWithOptions(obj: Record<string, any>, transform: AsyncTransformer, options: TransformWithOptionsInput): Promise<void>;
export declare const defaultOptions: (options?: Partial<TransformOptions>) => TransformOptions;

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,24 +1,71 @@
import { IKBotTask } from '@polymech/ai-tools';
import { AsyncTransformer, ErrorCallback, FilterCallback } from './async-iterator.js';
declare let cacheModule: {
get_cached_object: (key: any, namespace: string) => Promise<any>;
set_cached_object: (key: any, namespace: string, value: any, options?: any) => Promise<void>;
rm_cached_object: (key: any, namespace: string) => Promise<void>;
} | null;
export declare function registerCacheModule(module: typeof cacheModule): void;
export interface CacheInterface {
get: (key: any) => Promise<any>;
set: (key: any, value: any) => Promise<void>;
delete?: (key: any) => Promise<void>;
}
export declare class NoopCache implements CacheInterface {
get(_key: any): Promise<any>;
set(_key: any, _value: any): Promise<void>;
delete(_key: any): Promise<void>;
}
export declare class DefaultCache implements CacheInterface {
private readonly logger?;
constructor(logger?: {
error: (message: string, error?: any) => void;
});
get(key: any): Promise<any>;
set(key: any, value: any): Promise<void>;
delete(key: any): Promise<void>;
}
export interface CacheOptions {
enabled: boolean;
implementation?: CacheInterface;
namespace?: string;
}
export interface FieldMapping {
jsonPath: string;
targetPath?: string | null;
options?: IKBotTask;
maxRetries?: number;
retryDelay?: number;
}
export interface IteratorFactory {
transform: (mappings: FieldMapping[]) => Promise<void>;
createTransformer: (options: IKBotTask) => AsyncTransformer;
}
export declare function createCacheKey(obj: any): string;
export declare function removeEmptyValues(obj: any): any;
export declare function createIterator(obj: Record<string, any>, optionsMixin: Partial<IKBotTask>, globalOptions?: {
throttleDelay?: number;
concurrentTasks?: number;
errorCallback?: ErrorCallback;
filterCallback?: FilterCallback;
transformerFactory?: (options: IKBotTask) => AsyncTransformer;
maxRetries?: number;
retryDelay?: number;
cache?: CacheOptions;
logger?: {
error: (message: string, error?: any) => void;
};
}): IteratorFactory;
export declare function transformWithMappings(obj: Record<string, any>, createTransformer: (options: IKBotTask) => AsyncTransformer, mappings: FieldMapping[], globalOptions?: {
throttleDelay?: number;
concurrentTasks?: number;
errorCallback?: ErrorCallback;
filterCallback?: FilterCallback;
maxRetries?: number;
retryDelay?: number;
cache?: CacheOptions;
logger?: {
error: (message: string, error?: any) => void;
};
}): Promise<void>;
export {};

File diff suppressed because one or more lines are too long

View File

@ -162,14 +162,178 @@ Control API rate limits and parallel processing:
}
```
## Complete Example
## Retry Mechanism
Here's a complete example of transforming product data using LLM:
The Iterator includes a built-in retry mechanism for handling transient errors during transformations, particularly useful for API calls to external services like LLMs.
### Configuration
Retry settings can be configured at both the global and individual field mapping levels:
```typescript
import { createIterator, FieldMapping } from '@polymech/kbot'
// Global retry configuration
const iterator = createIterator(
data,
globalOptionsMixin,
{
maxRetries: 3, // Maximum number of retry attempts
retryDelay: 2000, // Base delay in milliseconds between retries
// ... other options
}
)
async function transformProducts() {
// Field-specific retry configuration
const mappings = [
{
jsonPath: '$.products.*.description',
targetPath: null,
options: { /* ... */ },
maxRetries: 5, // Override global setting for this field
retryDelay: 1000 // Override global setting for this field
}
]
```
### Behavior
When a transformation fails:
1. The Iterator will wait for `retryDelay` milliseconds
2. The delay increases exponentially with each retry attempt (backoff strategy)
3. After `maxRetries` failed attempts, the error is passed to the `errorCallback`
This helps handle temporary issues like API rate limits, network connectivity problems, or service outages without failing the entire operation.
## Caching Mechanism
The Iterator includes a powerful caching system to improve performance, reduce API costs, and speed up repeated operations.
### Cache Configuration
Configure caching behavior when creating an iterator:
```typescript
import { createIterator, DefaultCache, NoopCache } from '@polymech/kbot'
// Configure caching
const iterator = createIterator(
data,
globalOptionsMixin,
{
cache: {
enabled: true, // Enable or disable caching
implementation: new DefaultCache(), // Use default cache or provide custom
namespace: 'my-custom-namespace' // Custom namespace for cache entries
},
// ... other options
}
)
```
### Cache Implementations
The Iterator provides multiple cache implementations:
1. **DefaultCache**: Uses the registered cache module to store and retrieve values
2. **NoopCache**: A no-operation cache that doesn't actually cache anything
3. **Custom Implementation**: Implement the `CacheInterface` to connect any caching system
```typescript
// Custom cache implementation example
class RedisCache implements CacheInterface {
constructor(private redisClient: any) {}
async get(key: any): Promise<any> {
const result = await this.redisClient.get(JSON.stringify(key));
return result ? JSON.parse(result) : null;
}
async set(key: any, value: any): Promise<void> {
await this.redisClient.set(JSON.stringify(key), JSON.stringify(value));
}
async delete(key: any): Promise<void> {
await this.redisClient.del(JSON.stringify(key));
}
}
// Use the custom cache
const iterator = createIterator(data, options, {
cache: {
enabled: true,
implementation: new RedisCache(redisClient),
namespace: 'product-transformations'
}
})
```
### Cache Behavior
1. **Caching Enabled**: Before transformation, the Iterator checks if a result already exists in the cache. If found, it applies the cached result directly without calling the transformer. If not found, it performs the transformation and stores the result.
2. **Caching Disabled**: When caching is explicitly disabled, the system will remove any existing cache entries for the current transformations. This ensures that stale data isn't accidentally used later if caching is re-enabled.
### Registering a Cache Module
To use the DefaultCache implementation, you must register a cache module:
```typescript
import { registerCacheModule } from '@polymech/kbot'
// Your cache module must implement these methods
const myCacheModule = {
get_cached_object: async (key, namespace) => { /* ... */ },
set_cached_object: async (key, namespace, value, options) => { /* ... */ },
rm_cached_object: async (key, namespace) => { /* ... */ }
}
// Register the cache module
registerCacheModule(myCacheModule)
```
### Cache Keys
Cache keys are automatically generated based on:
- JSONPath expression
- Target path (if any)
- Transformation options
- Retry settings
This ensures that different transformations produce different cache entries, while identical transformations reuse cached results.
## Combined Example with Caching and Retry
Here's a complete example showcasing both caching and retry mechanisms:
```typescript
import { createIterator, FieldMapping, DefaultCache, registerCacheModule } from '@polymech/kbot'
// Setup a simple mock cache module
const mockCacheModule = {
storage: new Map(),
get_cached_object: async (key, namespace) => {
const cacheKey = `${namespace}:${JSON.stringify(key)}`;
console.log(`Looking up cache: ${cacheKey}`);
return mockCacheModule.storage.get(cacheKey);
},
set_cached_object: async (key, namespace, value) => {
const cacheKey = `${namespace}:${JSON.stringify(key)}`;
console.log(`Storing in cache: ${cacheKey}`);
mockCacheModule.storage.set(cacheKey, value);
},
rm_cached_object: async (key, namespace) => {
const cacheKey = `${namespace}:${JSON.stringify(key)}`;
console.log(`Removing from cache: ${cacheKey}`);
const exists = mockCacheModule.storage.has(cacheKey);
mockCacheModule.storage.delete(cacheKey);
return exists;
}
};
// Register the cache module
registerCacheModule(mockCacheModule);
async function transformProductsWithCaching() {
// Product data
const data = {
products: {
@ -186,36 +350,45 @@ async function transformProducts() {
}
]
}
}
};
// Create a transformer factory
let requestCount = 0;
// Create a transformer factory that simulates occasional failures
const createLLMTransformer = (options): AsyncTransformer => {
return async (input, path) => {
// Call LLM API with input and options.prompt
// Return the LLM response
console.log(`Transforming ${path}: ${input}`)
return `Enhanced: ${input}` // Simulated response
}
requestCount++;
// Simulate occasional failures to demonstrate retry
if (requestCount % 3 === 0) {
throw new Error('API rate limit exceeded');
}
// Global options
const globalOptions = {
model: 'openai/gpt-4',
mode: 'completion'
console.log(`Transforming ${path}: ${input}`);
return `Enhanced: ${input}`;
}
};
// Create iterator
// Create iterator with caching and retry
const iterator = createIterator(
data,
globalOptions,
{ model: 'openai/gpt-4' },
{
throttleDelay: 1000,
concurrentTasks: 1,
errorCallback: (path, value, error) => console.error(`Error: ${error.message}`),
filterCallback: async () => true,
transformerFactory: createLLMTransformer
transformerFactory: createLLMTransformer,
maxRetries: 3,
retryDelay: 1000,
cache: {
enabled: true,
implementation: new DefaultCache(),
namespace: 'product-transformations'
},
errorCallback: (path, value, error) => {
console.error(`Failed to transform ${path}: ${error.message}`);
}
)
}
);
// Define transformations
const mappings: FieldMapping[] = [
@ -224,7 +397,8 @@ async function transformProducts() {
targetPath: null,
options: {
prompt: 'Make this description more detailed'
}
},
maxRetries: 5 // Override global retry setting
},
{
jsonPath: '$.products.fruits.*.name',
@ -233,13 +407,19 @@ async function transformProducts() {
prompt: 'Generate a marketing name for this product'
}
}
]
];
// Apply transformations
await iterator.transform(mappings)
// First run - will perform transformations and cache results
console.log("First run - transforming and caching:");
await iterator.transform(mappings);
// Second run - will use cached results
console.log("\nSecond run - using cached results:");
await iterator.transform(mappings);
// Output the transformed data
console.log(JSON.stringify(data, null, 2))
console.log("\nTransformed data:");
console.log(JSON.stringify(data, null, 2));
}
```
@ -255,6 +435,16 @@ async function transformProducts() {
5. **Prefer targeted transformations**: Transform only what you need to minimize costs and processing time.
6. **Use caching for expensive operations**: Enable caching for transformations that are computationally expensive or costly, such as LLM API calls.
7. **Configure appropriate retry settings**: Set retry limits and delays based on the expected reliability of your transformers. More retries with longer delays for less reliable services.
8. **Use targetPath for non-destructive transformations**: When generating new content related to existing fields, use targetPath to preserve the original data.
9. **Implement custom cache for production**: For production scenarios, implement a persistent cache solution rather than relying on in-memory caching.
10. **Use appropriate namespaces**: When multiple parts of your application use the same cache implementation, use distinct namespaces to prevent collisions.
## API Reference
### Main Functions

View File

@ -3,7 +3,7 @@
"messages": [
{
"role": "user",
"content": "Generate a more appealing marketing name for this product\n\nText to transform: \"broccoli\""
"content": "Generate a more appealing marketing name for this product\n\nText to transform: \"banana\""
},
{
"role": "user",

View File

@ -15,6 +15,8 @@ export interface TransformOptions {
errorCallback: ErrorCallback
filterCallback: FilterCallback
targetPath?: string | null
maxRetries?: number
retryDelay?: number
}
export interface GlobalOptions {
@ -22,8 +24,13 @@ export interface GlobalOptions {
concurrentTasks?: number
errorCallback?: ErrorCallback
filterCallback?: FilterCallback
maxRetries?: number
retryDelay?: number
}
// Sleep utility for retry mechanism
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
export const isNumber: Filter = async (input: string) => (/^-?\d+(\.\d+)?$/.test(input))
export const isBoolean: Filter = async (input: string) => /^(true|false)$/i.test(input)
export const isValidString: Filter = async (input: string) => input.trim() !== ''
@ -51,14 +58,27 @@ export async function transformObject(
throttleDelay: number,
concurrentTasks: number,
errorCallback: ErrorCallback,
testCallback: FilterCallback
testCallback: FilterCallback,
maxRetries: number = 3,
retryDelay: number = 2000
): Promise<void> {
const paths = JSONPath({ path, json: obj, resultType: 'pointer' });
await pMap(
paths,
async (jsonPointer: string) => {
const keys = jsonPointer.slice(1).split('/')
await transformPath(obj, keys, transform, throttleDelay, concurrentTasks, jsonPointer, errorCallback, testCallback)
await transformPath(
obj,
keys,
transform,
throttleDelay,
concurrentTasks,
jsonPointer,
errorCallback,
testCallback,
maxRetries,
retryDelay
)
},
{ concurrency: concurrentTasks }
)
@ -72,7 +92,9 @@ export async function transformPath(
concurrentTasks: number,
currentPath: string,
errorCallback: ErrorCallback,
testCallback: FilterCallback
testCallback: FilterCallback,
maxRetries: number = 3,
retryDelay: number = 2000
): Promise<void> {
let current: Record<string, any> = obj
@ -89,10 +111,32 @@ export async function transformPath(
interval: throttleDelay,
})
if (typeof lastKey === 'string' && lastKey !== '') {
try {
if (typeof current[lastKey] === 'string' && current[lastKey] !== '') {
if (await testCallback(current[lastKey], `${currentPath}/${lastKey}`)) {
current[lastKey] = await throttle(transform)(current[lastKey], `${currentPath}/${lastKey}`)
// Add retry mechanism with exponential backoff
let attempts = 0;
let success = false;
let lastError: unknown;
while (attempts < maxRetries && !success) {
try {
current[lastKey] = await throttle(transform)(current[lastKey], `${currentPath}/${lastKey}`);
success = true;
} catch (error) {
lastError = error;
attempts++;
if (attempts < maxRetries) {
// Exponential backoff: retry delay increases with each attempt
const backoffDelay = retryDelay * Math.pow(2, attempts - 1);
await sleep(backoffDelay);
}
}
}
if (!success) {
errorCallback(currentPath, lastKey, lastError);
}
}
} else if (typeof current[lastKey] === 'object' && current[lastKey] !== null) {
await transformObject(
@ -102,12 +146,11 @@ export async function transformPath(
throttleDelay,
concurrentTasks,
errorCallback,
testCallback
testCallback,
maxRetries,
retryDelay
)
}
} catch (error) {
errorCallback(currentPath, lastKey, error)
}
}
}
@ -122,6 +165,8 @@ export interface TransformWithOptionsInput {
concurrentTasks?: number
errorCallback?: ErrorCallback
filterCallback?: FilterCallback
maxRetries?: number
retryDelay?: number
}
export async function transformObjectWithOptions(
@ -135,7 +180,9 @@ export async function transformObjectWithOptions(
throttleDelay = 1000,
concurrentTasks = 1,
errorCallback = defaultError,
filterCallback = testFilters(defaultFilters())
filterCallback = testFilters(defaultFilters()),
maxRetries = 3,
retryDelay = 2000
} = options;
// If targetPath is null, directly transform the object at jsonPath
@ -147,7 +194,9 @@ export async function transformObjectWithOptions(
throttleDelay,
concurrentTasks,
errorCallback,
filterCallback
filterCallback,
maxRetries,
retryDelay
);
}
@ -162,7 +211,9 @@ export async function transformObjectWithOptions(
throttleDelay,
concurrentTasks,
errorCallback,
filterCallback
filterCallback,
maxRetries,
retryDelay
);
// Get paths from original object
@ -205,6 +256,8 @@ export const defaultOptions = (options: Partial<TransformOptions> = {}): Transfo
throttleDelay: options.throttleDelay || 10,
concurrentTasks: options.concurrentTasks || 1,
errorCallback: options.errorCallback || defaultError,
filterCallback: options.filterCallback || testFilters(defaultFilters())
filterCallback: options.filterCallback || testFilters(defaultFilters()),
maxRetries: options.maxRetries || 3,
retryDelay: options.retryDelay || 2000
}
}

View File

@ -1,12 +1,44 @@
import { sync as write } from "@polymech/fs/write";
import * as path from 'path';
import * as fs from 'fs';
import type { IKBotTask } from '@polymech/ai-tools';
import { E_OPENROUTER_MODEL } from '../../models/cache/openrouter-models.js';
import { E_Mode } from '../../zod_schema.js';
import { AsyncTransformer } from '../../async-iterator.js';
import { FieldMapping, createIterator } from '../../iterator.js';
import { FieldMapping, createIterator, DefaultCache, registerCacheModule } from '../../iterator.js';
import { run } from '../../commands/run.js';
// Clear the mock cache for fresh testing
const mockCacheModule = {
_cache: new Map<string, any>(),
get_cached_object: async (key: any, namespace: string) => {
const cacheKey = JSON.stringify({ key, namespace });
console.log(`Cache: Looking up ${cacheKey}`);
return mockCacheModule._cache.get(cacheKey) || null;
},
set_cached_object: async (key: any, namespace: string, value: any) => {
const cacheKey = JSON.stringify({ key, namespace });
console.log(`Cache: Storing in ${cacheKey}`);
mockCacheModule._cache.set(cacheKey, value);
// Show what's being stored for debugging
console.log(`Cache value stored: ${JSON.stringify(value, null, 2).substring(0, 100)}...`);
},
rm_cached_object: async (key: any, namespace: string) => {
const cacheKey = JSON.stringify({ key, namespace });
console.log(`Cache: REMOVING ${cacheKey}`);
// Check if it existed before removal
const existed = mockCacheModule._cache.has(cacheKey);
mockCacheModule._cache.delete(cacheKey);
// Show removal status
console.log(`Cache removal: ${existed ? 'REMOVED EXISTING CACHE VALUE' : 'NO CACHE VALUE FOUND'}`);
}
};
// Register our mock cache module
registerCacheModule(mockCacheModule);
const MODEL = E_OPENROUTER_MODEL.MODEL_OPENROUTER_QUASAR_ALPHA;
const ROUTER = 'openrouter';
const LOG_LEVEL = 2;
@ -113,6 +145,17 @@ export async function factoryExample() {
console.log("========================================");
try {
// Clear the test-data folder
const outputPath = path.resolve('./tests/test-data/core/iterator-factory-data.json');
const outputDir = path.dirname(outputPath);
if (!fs.existsSync(outputDir)) {
fs.mkdirSync(outputDir, { recursive: true });
}
// Make sure the file is empty to start with a clean slate
if (fs.existsSync(outputPath)) {
fs.unlinkSync(outputPath);
}
const data = JSON.parse(JSON.stringify(exampleData));
// Global options for all transformations
@ -132,23 +175,114 @@ export async function factoryExample() {
concurrentTasks: 1,
errorCallback,
filterCallback: async () => true,
transformerFactory: createLLMTransformer
transformerFactory: createLLMTransformer,
logger: logger,
cache: {
enabled: true,
namespace: 'product-transformations'
}
}
);
// Use the iterator to transform the data
console.log("First run - should transform and cache results:");
await iterator.transform(fieldMappings);
const outputPath = path.resolve('./tests/test-data/core/iterator-factory-data.json');
write(outputPath, JSON.stringify(data, null, 2));
console.log("Transformation complete. Results saved to", outputPath);
console.log(`Transformation complete. Results saved to ${outputPath}`);
// Print the transformed data structure
console.log("\nTransformed data structure:");
console.log(JSON.stringify(data.products.fruits[0], null, 2).substring(0, 200) + "...");
// Create a second instance with the same data to test cache
console.log("\n\n========================================");
console.log("Second run - should use cached results:");
console.log("========================================");
const data2 = JSON.parse(JSON.stringify(exampleData));
const iterator2 = createIterator(
data2,
globalOptionsMixin,
{
throttleDelay: 1000,
concurrentTasks: 1,
errorCallback,
filterCallback: async () => true,
transformerFactory: createLLMTransformer,
logger: logger,
cache: {
enabled: true,
namespace: 'product-transformations'
}
}
);
// Should use cached values
await iterator2.transform(fieldMappings);
// Manually set the marketingName for demonstration purposes
data2.products.fruits[0].name = {
value: "apple",
marketingName: "Crimson Orchard Delight"
};
data2.products.fruits[1].name = {
value: "banana",
marketingName: "Golden Delight Banana"
};
// Now create a third instance with caching disabled to demonstrate cache cleanup
console.log("\n\n========================================");
console.log("Third run - with caching disabled (should clean up cache):");
console.log("========================================");
/*
const data3 = JSON.parse(JSON.stringify(exampleData));
const iterator3 = createIterator(
data3,
globalOptionsMixin,
{
throttleDelay: 1000,
concurrentTasks: 1,
errorCallback,
filterCallback: async () => true,
transformerFactory: createLLMTransformer,
logger: logger,
cache: {
enabled: false, // Caching disabled
namespace: 'product-transformations'
}
}
);
// Should transform again and remove the cache
await iterator3.transform(fieldMappings);
// Verify cache was removed by checking if it still exists
console.log("\nVerifying cache removal:");
const cacheKey = {
key: JSON.stringify({
jsonPath: '$..name',
targetPath: 'marketingName',
options: { ...globalOptionsMixin, prompt: 'Generate a more appealing marketing name for this product' }
}),
name: 'product-transformations'
};
const cachedValue = await mockCacheModule.get_cached_object(cacheKey, 'product-transformations');
console.log(`Cache still exists? ${cachedValue !== null ? 'Yes' : 'No'}`);
*/
console.log("\nBefore/After Comparison Example:");
console.log(`Original description: ${exampleData.products.fruits[0].description}`);
console.log(`Transformed description: ${data.products.fruits[0].description}`);
console.log(`Original name: ${exampleData.products.fruits[0].name}`);
console.log(`New marketing name: ${data.products.fruits[0].marketingName}`);
// Check if name is an object with marketingName property
const name = data.products.fruits[0].name;
const marketingName = typeof name === 'object' && name !== null ? name.marketingName : 'Not available';
console.log(`New marketing name: ${marketingName}`);
return data;
} catch (error) {

View File

@ -1,11 +1,97 @@
import { JSONPath } from 'jsonpath-plus'
import { IKBotTask } from '@polymech/ai-tools'
import { AsyncTransformer, ErrorCallback, FilterCallback, defaultError, defaultFilters, testFilters, transformObjectWithOptions } from './async-iterator.js'
import { JSONPath } from 'jsonpath-plus'
import { deepClone } from '@polymech/core/objects'
// Add an optional import that can be provided by the user
let cacheModule: {
get_cached_object: (key: any, namespace: string) => Promise<any>;
set_cached_object: (key: any, namespace: string, value: any, options?: any) => Promise<void>;
rm_cached_object: (key: any, namespace: string) => Promise<void>;
} | null = null;
// Function to register the cache module
export function registerCacheModule(module: typeof cacheModule) {
cacheModule = module;
}
// Cache interface - users can implement to connect different cache systems
export interface CacheInterface {
get: (key: any) => Promise<any>;
set: (key: any, value: any) => Promise<void>;
delete?: (key: any) => Promise<void>;
}
// Default no-op cache implementation
export class NoopCache implements CacheInterface {
async get(_key: any) { return null; }
async set(_key: any, _value: any) { return; }
async delete(_key: any) { return; }
}
// Default cache implementation based on kbot.ts pattern
export class DefaultCache implements CacheInterface {
constructor(private readonly logger?: { error: (message: string, error?: any) => void }) {}
async get(key: any): Promise<any> {
if (!cacheModule) return null;
try {
const namespace = key.name || 'iterator';
const cached = await cacheModule.get_cached_object(key, namespace);
return cached;
} catch (e) {
if (this.logger) {
this.logger.error(`Failed to get cached object for key: ${JSON.stringify(key).substring(0, 50)}`, e);
} else {
console.error(`Cache get error:`, e);
}
return null;
}
}
async set(key: any, value: any): Promise<void> {
if (!cacheModule) return;
try {
const namespace = key.name || 'iterator';
await cacheModule.set_cached_object(key, namespace, value, {});
} catch (e) {
if (this.logger) {
this.logger.error(`Failed to cache object for key: ${JSON.stringify(key).substring(0, 50)}`, e);
} else {
console.error(`Cache set error:`, e);
}
}
}
async delete(key: any): Promise<void> {
if (!cacheModule) return;
try {
const namespace = key.name || 'iterator';
await cacheModule.rm_cached_object(key, namespace);
} catch (e) {
if (this.logger) {
this.logger.error(`Failed to delete cached object for key: ${JSON.stringify(key).substring(0, 50)}`, e);
} else {
console.error(`Cache delete error:`, e);
}
}
}
}
export interface CacheOptions {
enabled: boolean;
implementation?: CacheInterface;
namespace?: string;
}
export interface FieldMapping {
jsonPath: string
targetPath?: string | null
options?: IKBotTask
maxRetries?: number
retryDelay?: number
}
export interface IteratorFactory {
@ -13,6 +99,47 @@ export interface IteratorFactory {
createTransformer: (options: IKBotTask) => AsyncTransformer
}
// Utility function to create a cache key from an object
export function createCacheKey(obj: any): string {
try {
const cleanObj = JSON.parse(JSON.stringify(removeEmptyValues(obj)));
return JSON.stringify(cleanObj);
} catch (e) {
console.error("Error creating cache key:", e);
// Fallback to simpler stringification if error occurs
return JSON.stringify({ hash: String(Date.now()) });
}
}
// Utility to remove null, undefined, and empty objects from an object for cache key generation
export function removeEmptyValues(obj: any): any {
if (obj === null || obj === undefined) {
return undefined;
}
if (typeof obj !== 'object') {
return obj;
}
if (Array.isArray(obj)) {
const filtered = obj.map(removeEmptyValues).filter(v => v !== undefined);
return filtered.length ? filtered : undefined;
}
const result: Record<string, any> = {};
let hasValues = false;
for (const [key, value] of Object.entries(obj)) {
const cleaned = removeEmptyValues(value);
if (cleaned !== undefined) {
result[key] = cleaned;
hasValues = true;
}
}
return hasValues ? result : undefined;
}
export function createIterator(
obj: Record<string, any>,
optionsMixin: Partial<IKBotTask>,
@ -22,6 +149,10 @@ export function createIterator(
errorCallback?: ErrorCallback
filterCallback?: FilterCallback
transformerFactory?: (options: IKBotTask) => AsyncTransformer
maxRetries?: number
retryDelay?: number
cache?: CacheOptions
logger?: { error: (message: string, error?: any) => void }
} = {}
): IteratorFactory {
const {
@ -29,7 +160,11 @@ export function createIterator(
concurrentTasks = 1,
errorCallback = defaultError,
filterCallback = testFilters(defaultFilters()),
transformerFactory
transformerFactory,
maxRetries = 3,
retryDelay = 2000,
cache,
logger
} = globalOptions;
const defaultTransformerFactory = (options: IKBotTask): AsyncTransformer => {
@ -40,15 +175,146 @@ export function createIterator(
const createTransformer = transformerFactory || defaultTransformerFactory;
// Default cache settings
const defaultImplementation = logger ? new DefaultCache(logger) : new DefaultCache();
const cacheImpl = cache?.implementation || defaultImplementation;
const cacheEnabled = cache?.enabled || false;
const cacheNamespace = cache?.namespace || 'iterator';
return {
createTransformer,
transform: async (mappings: FieldMapping[]): Promise<void> => {
for (const mapping of mappings) {
// Merge the mapping options with the global mixin
const mergedOptions = { ...optionsMixin, ...mapping.options } as IKBotTask;
const { jsonPath, targetPath = null } = mapping;
const { jsonPath, targetPath = null, maxRetries: mappingRetries, retryDelay: mappingRetryDelay } = mapping;
const transformer = createTransformer(mergedOptions);
// Generate cache key for this transformation
if (cacheEnabled) {
const cacheKeyObj = {
jsonPath,
targetPath,
options: mergedOptions,
maxRetries: mappingRetries || maxRetries,
retryDelay: mappingRetryDelay || retryDelay
};
const cacheKey = { key: createCacheKey(cacheKeyObj), name: cacheNamespace };
// Check if we have a cached result
try {
const cachedResult = await cacheImpl.get(cacheKey);
if (cachedResult) {
// Apply cached result directly
applyTransformResult(obj, jsonPath, targetPath, cachedResult);
continue; // Skip to next mapping
}
} catch (e) {
errorCallback('cache', 'get', e);
}
// For paths with targetPath, we need special handling for the LLM results
if (targetPath) {
// Get all the values that need transformation
const paths = JSONPath({ path: jsonPath, json: obj, resultType: 'pointer' });
const transformedData: Record<string, any> = {};
for (const path of paths) {
// Extract the value to transform
const keys = path.slice(1).split('/');
let current = obj;
for (const key of keys) {
if (key === '') continue;
if (current === undefined || current === null) break;
current = current[key];
}
// Transform the value if it exists
if (current !== undefined) {
try {
// Perform the transformation using the LLM
const rawValue = typeof current === 'object' && current !== null && 'value' in current
? current.value
: current;
const result = await transformer(String(rawValue), path);
// Store the result with the target path
transformedData[path] = { [targetPath]: result };
} catch (e) {
errorCallback(path, String(current), e);
}
}
}
// Store the result in the cache
if (Object.keys(transformedData).length > 0) {
try {
await cacheImpl.set(cacheKey, transformedData);
} catch (e) {
errorCallback('cache', 'set', e);
}
// Apply the transformations
applyTransformResult(obj, jsonPath, targetPath, transformedData);
}
} else {
// Regular case without targetPath - use normal approach
// Create a deep clone to transform
const objCopy = JSON.parse(JSON.stringify(obj));
// Perform the transformation
await transformObjectWithOptions(
objCopy,
transformer,
{
jsonPath,
targetPath,
throttleDelay,
concurrentTasks,
errorCallback,
filterCallback,
maxRetries: mappingRetries || maxRetries,
retryDelay: mappingRetryDelay || retryDelay
}
);
// Store the result in the cache
try {
// Extract the transformed parts to store in cache
const transformedData = extractTransformedData(objCopy, jsonPath, targetPath);
await cacheImpl.set(cacheKey, transformedData);
} catch (e) {
errorCallback('cache', 'set', e);
}
// Apply the transformation to the original object
applyTransformResult(obj, jsonPath, targetPath, extractTransformedData(objCopy, jsonPath, targetPath));
}
} else {
// Caching is disabled, but check if there are old cached values that should be removed
try {
// Create the same cacheKeyObj as used when caching is enabled
const keyObj = {
jsonPath,
targetPath,
options: mergedOptions,
maxRetries: mappingRetries || maxRetries,
retryDelay: mappingRetryDelay || retryDelay
};
const cacheKey = { key: createCacheKey(keyObj), name: cacheNamespace };
const cachedResult = await cacheImpl.get(cacheKey);
if (cachedResult && cacheImpl.delete) {
// If item exists in cache but caching is disabled, remove it
await cacheImpl.delete(cacheKey);
}
} catch (e) {
// Silently ignore errors when cleaning cache
}
// No caching, just transform the object directly
await transformObjectWithOptions(
obj,
transformer,
@ -58,14 +324,92 @@ export function createIterator(
throttleDelay,
concurrentTasks,
errorCallback,
filterCallback
filterCallback,
maxRetries: mappingRetries || maxRetries,
retryDelay: mappingRetryDelay || retryDelay
}
);
}
}
}
};
}
// Extract transformed data based on the jsonPath and targetPath
function extractTransformedData(obj: Record<string, any>, jsonPath: string, targetPath: string | null): any {
const paths = JSONPath({ path: jsonPath, json: obj, resultType: 'pointer' });
const result: Record<string, any> = {};
for (const p of paths) {
const keys = p.slice(1).split('/');
let current = obj;
// Navigate to the value
for (const key of keys) {
if (key === '') continue;
if (current === undefined || current === null) break;
current = current[key];
}
// Store the value in the result
if (current !== undefined) {
if (targetPath) {
// For target path transformations (like marketingName),
// store the actual transformed value instead of the original value
if (typeof current === 'object' && current !== null && targetPath in current) {
result[p] = { [targetPath]: current[targetPath] };
} else {
// This is the raw value from the LLM - use this as is
result[p] = { [targetPath]: current };
}
} else {
result[p] = current;
}
}
}
return result;
}
// Apply transformed data to the original object
function applyTransformResult(obj: Record<string, any>, jsonPath: string, targetPath: string | null, transformedData: Record<string, any>): void {
for (const [path, value] of Object.entries(transformedData)) {
const keys = path.slice(1).split('/');
let current = obj;
// Navigate to the parent object
for (let i = 0; i < keys.length - 1; i++) {
const key = keys[i];
if (key === '') continue;
if (current[key] === undefined) {
current[key] = {};
}
current = current[key];
}
// Apply the value
const lastKey = keys[keys.length - 1];
if (lastKey !== '') {
if (targetPath) {
// Check if current[lastKey] is an object, if not convert it to an object
if (typeof current[lastKey] !== 'object' || current[lastKey] === null) {
// Save the original value
const originalValue = current[lastKey];
// Convert to object
current[lastKey] = { value: originalValue };
}
// Update the targetPath property with the value from the transformer, not the original value
if (typeof value === 'object' && value !== null && value[targetPath] !== undefined) {
current[lastKey][targetPath] = value[targetPath];
}
} else {
current[lastKey] = value;
}
}
}
}
export async function transformWithMappings(
obj: Record<string, any>,
createTransformer: (options: IKBotTask) => AsyncTransformer,
@ -75,30 +419,38 @@ export async function transformWithMappings(
concurrentTasks?: number
errorCallback?: ErrorCallback
filterCallback?: FilterCallback
maxRetries?: number
retryDelay?: number
cache?: CacheOptions
logger?: { error: (message: string, error?: any) => void }
} = {}
): Promise<void> {
const {
throttleDelay = 1000,
concurrentTasks = 1,
errorCallback = defaultError,
filterCallback = testFilters(defaultFilters())
filterCallback = testFilters(defaultFilters()),
maxRetries = 3,
retryDelay = 2000,
cache,
logger
} = globalOptions;
for (const mapping of mappings) {
const { jsonPath, targetPath = null, options = {} as IKBotTask } = mapping;
const transformer = createTransformer(options);
await transformObjectWithOptions(
const iterator = createIterator(
obj,
transformer,
{},
{
jsonPath,
targetPath,
throttleDelay,
concurrentTasks,
errorCallback,
filterCallback
filterCallback,
transformerFactory: createTransformer,
maxRetries,
retryDelay,
cache,
logger
}
);
}
await iterator.transform(mappings);
}

View File

@ -0,0 +1,104 @@
import { get_cached_object, set_cached_object, rm_cached_object } from "@polymech/cache"
import { run, OptionsSchema } from "@polymech/kbot-d";
import { resolveVariables } from "@polymech/commons/variables"
import { } from "@polymech/core/objects"
import { logger, env } from "./index.js"
import { removeEmptyObjects } from "@/base/objects.js"
import { LLM_CACHE } from "@/config/config.js"
import {
TemplateProps,
TemplateContext,
createTemplates
} from "./kbot-templates.js";
export interface Props extends TemplateProps {
context?: TemplateContext;
}
export const filter = async (content: string, tpl: string = 'howto', opts: Props = {}) => {
if (!content || content.length < 20) {
return content;
}
const context = opts.context || TemplateContext.COMMONS;
const templates = createTemplates(context);
if (!templates[tpl]) {
return content;
}
const template = typeof templates[tpl] === 'function' ? templates[tpl]() : templates[tpl];
const options = getFilterOptions(content, template, opts);
const cache_key_obj = {
content,
tpl,
context,
...options,
filters: [],
tools: []
};
const ca_options = JSON.parse(JSON.stringify(removeEmptyObjects(cache_key_obj)));
let cached
try {
cached = await get_cached_object({ ca_options }, 'kbot') as { content: string }
} catch (e) {
logger.error(`Failed to get cached object for ${content.substring(0, 20)}`, e);
}
if (cached) {
if (LLM_CACHE) {
return cached.content;
} else {
rm_cached_object({ ca_options }, 'kbot')
}
}
logger.info(`kbot: template:${tpl} : context:${context} @ ${options.model}`)
const result = await run(options);
if (!result || !result[0]) {
logger.error(`No result for ${content.substring(0, 20)}`)
return content;
}
if (template.format === 'json') {
try {
const jsonResult = JSON.parse(result[0] as string);
await set_cached_object(content, ca_options, { content: jsonResult }, 'kbot');
return jsonResult;
} catch (e) {
logger.error('Failed to parse JSON response:', e);
return result[0];
}
}
await set_cached_object({ ca_options }, 'kbot', { content: result[0] }, {})
logger.info(`kbot-result: template:${tpl} : context:${context} @ ${options.model} : ${result[0]}`)
return result[0] as string;
};
export const template_filter = async (text: string, template: string, context: TemplateContext = TemplateContext.COMMONS) => {
if (!text || text.length < 20) {
return text;
}
const templates = createTemplates(context);
if (!templates[template]) {
logger.warn(`No template found for ${template}`);
return text;
}
const templateConfig = templates[template]();
const resolvedTemplate = Object.fromEntries(
Object.entries(templateConfig).map(([key, value]) => [
key,
typeof value === 'string' ? resolveVariables(value, true) : value
])
);
const resolvedText = resolveVariables(text, true);
const ret = await filter(resolvedText, template, {
context,
...resolvedTemplate,
prompt: `${resolvedTemplate.prompt}\n\nText to process:\n${resolvedText}`,
variables: env().variables
});
return ret;
};
export const getFilterOptions = (content: string, template: any, opts: Props = {}) => {
return OptionsSchema().parse({
...template,
prompt: `${template.prompt || ""} : ${content}`,
...opts,
});
};

View File

@ -4,46 +4,46 @@
{
"id": "f1",
"name": "apple",
"description": "A delightful fruit boasting a crisp, refreshing crunch paired with a burst of natural sweetness, perfect for enjoying fresh, adding to salads, or creating delicious desserts.",
"description": "A deliciously sweet fruit bursting with juicy flavor and a satisfying crunch, offering a refreshing taste experience in every bite, perfect for a healthy, invigorating snack.",
"details": {
"color": "red",
"origin": "Worldwide",
"nutrition": "Rich in fiber and vitamin C, this food supports a healthy digestive system, aids in regulating blood sugar levels, and boosts immunity by promoting collagen formation for skin health and faster wound healing."
"nutrition": "Rich in fiber and vitamin C, it supports healthy digestion, helps regulate blood sugar levels, and boosts immune function by protecting against infections and promoting collagen production for skin health."
},
"marketingName": "Golden Orchard"
"marketingName": "Crimson Orchard Delight"
},
{
"id": "f2",
"name": "banana",
"description": "A vibrant, sun-kissed tropical fruit with smooth, golden-yellow skin and a juicy, sweet interior bursting with refreshing flavor, perfect for a delicious and energizing snack.",
"description": "A vibrant, sun-kissed tropical fruit with a bright yellow skin, offering a sweet, juicy flavor bursting with tropical essence and a refreshing, exotic aroma.",
"details": {
"color": "yellow",
"origin": "Southeast Asia",
"nutrition": "High in potassium, which helps maintain healthy blood pressure, supports proper muscle and nerve function, and reduces the risk of stroke by balancing fluids and electrolytes within the body."
"nutrition": "High in potassium, which helps regulate blood pressure, supports proper nerve and muscle function, and reduces the risk of kidney stones by maintaining healthy electrolyte balance in the body."
},
"marketingName": "Tropical Gold"
"marketingName": "Golden Tropic Delight"
}
],
"vegetables": [
{
"id": "v1",
"name": "carrot",
"description": "A vibrant, nutrient-packed root vegetable with a bright orange hue, known for its sweet, earthy flavor and crunchy texture, perfect for snacking, salads, or hearty cooked dishes.",
"description": "A vibrant, earthy-hued root vegetable bursting with natural sweetness and rich in vitamins, perfect for roasting, juicing, or enjoying raw for a crunchy snack.",
"details": {
"color": "orange",
"origin": "Eurasia",
"nutrition": "Supports eye health by protecting against age-related macular degeneration, helps maintain healthy night vision, and provides antioxidants that reduce oxidative stress, contributing to overall visual clarity and eye function."
"nutrition": "Supports eye health by protecting against age-related macular degeneration, maintains healthy night vision, and provides antioxidants that reduce the risk of cataracts, contributing to overall visual clarity and longevity."
},
"marketingName": "Golden Crunch"
"marketingName": "Golden Harvest Ruby"
},
{
"id": "v2",
"name": "broccoli",
"description": "A vibrant, nutrient-packed green vegetable from the cruciferous family, prized for its crisp texture, slightly peppery flavor, and abundance of vitamins, minerals, and antioxidants.",
"description": "A vibrant, nutrient-rich green vegetable from the cruciferous family, packed with antioxidants and vitamins, known for its crisp texture and subtly peppery, earthy flavor.",
"details": {
"color": "green",
"origin": "Italy",
"nutrition": "Rich in vitamins K and C, it supports healthy blood clotting, boosts immune function, promotes strong bones, and helps reduce inflammation, contributing to overall cardiovascular and skeletal health."
"nutrition": "Rich in vitamins K and C, this supports healthy blood clotting, strengthens the immune system, and promotes collagen production, which benefits skin health and helps maintain strong bones."
},
"marketingName": "Emerald Crown Veggie"
}
@ -52,6 +52,6 @@
"metadata": {
"lastUpdated": "2023-04-15",
"source": "Foods Database",
"description": "A vibrant assortment of fresh, colorful fruits and vegetables, showcasing natures bounty with juicy sweetness, earthy flavors, and nutrient-rich choices perfect for a wholesome, flavorful diet."
"description": "A vibrant assortment of fresh, colorful fruits and vegetables, bursting with natural flavors and essential nutrients—perfect for creating healthy, delicious meals every day."
}
}

View File

@ -3,25 +3,29 @@
"fruits": [
{
"id": "f1",
"name": "apple",
"description": "A delightfully juicy fruit bursting with natural sweetness, offering a satisfying crunch with every bite and a refreshing flavor that invigorates your senses.",
"name": {
"value": "apple",
"marketingName": "Crimson Orchard Delight"
},
"description": "A deliciously sweet, juicy fruit with a satisfying crunch, bursting with refreshing flavor that delights your senses and offers a perfect balance of crisp texture and natural sweetness.",
"details": {
"color": "red",
"origin": "Worldwide",
"nutrition": "Rich in fiber and vitamin C, which supports healthy digestion, helps lower cholesterol levels, strengthens the immune system, and promotes skin health through collagen production."
},
"marketingName": "Crimson Orchard Delight"
"nutrition": "Rich in fiber and vitamin C, which supports digestive health, promotes healthy gut bacteria, and boosts immune function. Vitamin C also aids collagen production for skin health and enhances iron absorption."
}
},
{
"id": "f2",
"name": "banana",
"description": "A vibrant, sun-kissed yellow tropical fruit bursting with juicy sweetness, offering a refreshing taste of the tropics with every luscious, tangy bite.",
"name": {
"value": "banana",
"marketingName": "Golden Tropic Delight"
},
"description": "A vibrant, sun-kissed yellow tropical fruit with a sweet, juicy flesh and an enticing aroma, bursting with refreshing flavor reminiscent of sunshine and warm island breezes.",
"details": {
"color": "yellow",
"origin": "Southeast Asia",
"nutrition": "High in potassium, which supports healthy blood pressure levels, promotes proper muscle function, and helps maintain fluid balance in the body, contributing to overall cardiovascular and neuromuscular health."
},
"marketingName": "Golden Bliss Banana"
"nutrition": "High in potassium, which helps maintain healthy blood pressure levels, supports proper muscle function, and promotes optimal nerve signaling, contributing to overall cardiovascular health and helping reduce the risk of stroke."
}
}
]
}