mono/packages/kbot/docs_/iterator.md
2025-04-07 18:52:59 +02:00

32 KiB

Data Transformation with Iterators

The Iterator module provides a powerful and flexible way to transform complex data structures (like JSON objects) using asynchronous operations, especially useful for applying LLM transformations to specific fields.

Key Concepts

  • Targeted Transformation: Use JSONPath expressions to select specific fields or elements within your data for transformation.
  • LLM Integration: Seamlessly integrate Large Language Models (LLMs) to modify text, generate content, or analyze data based on prompts.
  • Structured Output: Define JSON schemas using the format option to ensure LLM outputs conform to a required structure.
  • In-Place or New Fields: Choose whether to modify data directly or add transformed results to new fields.
  • Customization: Control concurrency, rate limiting, error handling, filtering, and caching.
  • Callbacks: Hook into the transformation process using onTransform (before LLM call) and onTransformed (after LLM call) callbacks.

Core Components & Usage

There are two main ways to use the transformation capabilities:

  1. transform Helper Function: A simplified approach for common use cases.
  2. createIterator Factory: Provides more control and customization options.

1. Simplified Usage: transform Function

For straightforward transformations, the transform function offers a minimal setup:

import { transform, FieldMapping, E_Mode } from '@polymech/kbot';

// Sample Data
const data = { product: { description: "Old description" } };

// Field mapping definition
const mappings: FieldMapping[] = [
    {
        jsonPath: '$.product.description',
        targetPath: null, // Transform in-place
        options: {
            prompt: 'Rewrite this description to be more exciting'
        }
    }
];

// Global LLM options
const llmOptions = {
    model: 'openai/gpt-4o',
    router: 'openai',
    mode: E_Mode.COMPLETION
};

// Optional callbacks
const callbacks = {
    onTransform: async (path, value) => { console.log(`Transforming: ${path}`); return value; },
    onTransformed: async (path, newValue) => { console.log(`Transformed: ${path}`); return newValue; }
};

// Perform the transformation
await transform(data, mappings, llmOptions, callbacks);

console.log(data.product.description); // Output: The exciting new description

See the simpleTransformExample function within src/examples/core/iterator-factory-example.ts for a runnable demonstration.

2. Advanced Usage: createIterator Factory

The createIterator factory provides maximum flexibility for complex scenarios, including fine-grained control over network options, caching, logging, and transformer creation.

import {
    createIterator,
    createLLMTransformer,
    FieldMapping,
    IOptions,
    CacheConfig,
    INetworkOptions,
    E_Mode
} from '@polymech/kbot';
import { getLogger } from '@polymech/kbot'; // Assuming logger setup

const logger = getLogger({ logLevel: 4 });

// Sample Data
const data = { products: [{ id: 'p1', name: 'Apple', details: { nutrition: 'Rich in fiber' } }] };

// Global LLM options mixin
const globalOptionsMixin = {
    model: 'anthropic/claude-3.5-sonnet',
    router: 'openrouter',
    mode: E_Mode.COMPLETION
};

// Field Mappings
const fieldMappings: FieldMapping[] = [
    {
        jsonPath: '$.products[*].name',
        targetPath: 'marketingName', // Add a new field
        options: { prompt: 'Create a catchy marketing name' }
    },
    {
        jsonPath: '$.products[*].details.nutrition',
        targetPath: null, // Transform in-place
        options: { prompt: 'Expand nutrition info with health benefits (20 words)' }
    }
];

// Network Configuration
const networkOptions: INetworkOptions = {
    throttleDelay: 500,
    concurrentTasks: 2,
    maxRetries: 3,
    retryDelay: 1000
};

// Cache Configuration
const cacheConfig: CacheConfig = {
    enabled: true,
    namespace: 'product-info-transforms',
    expiration: 3600 // 1 hour in seconds
};

// Iterator Options
const iteratorOptions: IOptions = {
    network: networkOptions,
    errorCallback: (path, value, error) => logger.error(`Error at ${path}: ${error.message}`),
    filterCallback: async () => true, // Example: only transform strings
    transformerFactory: (opts) => createLLMTransformer(opts, logger, cacheConfig),
    logger,
    cacheConfig,
    onTransform: async (path, value, opts) => { logger.debug(`About to transform ${path}`); return value; },
    onTransformed: async (path, transformedValue, opts) => { logger.debug(`Finished transforming ${path}`); return transformedValue; }
};

// Create the iterator instance
const iterator = createIterator(
    data,
    globalOptionsMixin,
    iteratorOptions
);

// Apply transformations
await iterator.transform(fieldMappings);

console.log(JSON.stringify(data, null, 2));
/* Output might look like:
{
  "products": [
    {
      "id": "p1",
      "name": "Apple",
      "details": {
        "nutrition": "Rich in fiber, supporting digestion and heart health. Apples provide essential vitamins for overall well-being."
      },
      "marketingName": "Orchard Crisp Delight"
    }
  ]
}
*/

Refer to the factoryExample function in src/examples/core/iterator-factory-example.ts for a comprehensive, runnable example demonstrating caching, structured output, and callbacks.

Field Mappings (FieldMapping)

Define what to transform and how:

interface FieldMapping {
    jsonPath: string;       // JSONPath expression identifying data to transform.
    targetPath: string | null; // Where to put the result. `null` for in-place, or a string for a new relative field name.
    options?: Partial<IKBotTask>; // LLM options (prompt, model, format, etc.) specific to this mapping.
}

Callbacks (onTransform, onTransformed)

Inject custom logic before and after the core transformation (e.g., the LLM call):

  • onTransform(jsonPath, originalValue, options): Called just before the value is sent to the transformer (LLM). You can modify the originalValue before it's processed.
    • Note: For non-string values (like arrays), originalValue will be the original non-string data type. Ensure your callback handles this or stringifies if necessary before returning.
  • onTransformed(jsonPath, transformedValue, options): Called after the transformer returns a result. You can modify the transformedValue before it's written back to the data object.

Structured Output (format Option)

The format option within a FieldMapping's options ensures the LLM's output conforms to a specific JSON schema. This is crucial for reliable data extraction and processing.

// Example FieldMapping using 'format'
{
    jsonPath: '$.reviewText',
    targetPath: 'analysis',
    options: {
        prompt: 'Analyze sentiment, pros, and cons from this review.',
        format: {
            type: "object",
            properties: {
                sentiment: { type: "string", enum: ["positive", "neutral", "negative"] },
                pros: { type: "array", items: { type: "string" } },
                cons: { type: "array", items: { type: "string" } }
            },
            required: ["sentiment", "pros", "cons"]
        }
    }
}
  • The LLM is instructed to return a JSON object matching the schema.
  • The result assigned to data.analysis will typically be a string containing the JSON. You'll likely need to JSON.parse() it.
  • See the iterator-factory-example.ts for a full example of defining and handling formatted output.

Caching (CacheConfig)

Improve performance and reduce costs by caching transformation results.

interface CacheConfig {
    enabled: boolean;          // Master switch for caching
    namespace?: string;        // Optional prefix for cache keys (recommended)
    expiration?: number;       // Cache duration in seconds
    // implementation?: CacheInterface; // Advanced: Provide a custom cache backend
}
  • Caching is configured within the IOptions passed to createIterator.
  • The default cache uses @polymech/cache.
  • Cache keys are generated based on the input value and transformation options.
  • The iterator-factory-example.ts includes logic to demonstrate caching and how to clear specific cache entries using rm_cached_object from @polymech/cache for testing purposes.

Network Configuration (INetworkOptions)

Fine-tune network behavior for API calls:

interface INetworkOptions {
    throttleDelay?: number; // ms delay between requests
    concurrentTasks?: number; // Max parallel requests
    maxRetries?: number;    // Retries on failure
    retryDelay?: number;    // Base delay (ms) between retries (exponential backoff applied)
}

Examples

Explore the source code for detailed, runnable examples:

  • src/examples/core/iterator-factory-example.ts: Demonstrates createIterator, transform, callbacks, caching, structured output (format), and handling different data types (strings, number arrays).

Overview

The Iterator module allows you to:

  1. Define mappings between JSON paths and transformations
  2. Apply transformations in place or to new fields
  3. Customize filtering, error handling, and concurrency
  4. Chain multiple transformations together

Example Implementations

For complete working examples, see:

These examples demonstrate transforming a sample product dataset with various JSONPath expressions and LLM-powered transformations.

Core Components

AsyncTransformer

The fundamental unit that applies transformations:

type AsyncTransformer = (input: string, path: string) => Promise<string>

Every transformer takes a string input and its path in the JSON structure, then returns a transformed string.

Field Mappings

Field mappings define which parts of the data to transform and how:

interface FieldMapping {
    jsonPath: string       // JSONPath expression to find values
    targetPath?: string    // Optional target field for transformed values
    options?: IKBotTask    // Options for the transformation
}

Basic Usage

Creating an Iterator

import { createIterator } from '@polymech/kbot'

// Create an iterator instance
const iterator = createIterator(
    data,                  // The data to transform
    globalOptionsMixin,    // Global options for all transformations
    {
        throttleDelay: 1000,
        concurrentTasks: 1,
        errorCallback: (path, value, error) => console.error(`Error at ${path}: ${error.message}`),
        filterCallback: async () => true,
        transformerFactory: createCustomTransformer
    }
)

// Define field mappings
const mappings = [
    {
        jsonPath: '$.products.*.name',
        targetPath: null,  // Transform in place
        options: {
            prompt: 'Make this product name more appealing'
        }
    }
]

// Apply transformations
await iterator.transform(mappings)

JSONPath Patterns

The Iterator uses JSONPath to identify fields for transformation:

  • $..name - All name fields at any level
  • $.products..name - All name fields under the products key
  • $.products.*.*.name - Names of items in product categories
  • $[*].description - All description fields at the first level

Advanced Usage

Custom Transformers

Create custom transformers for specific transformation logic:

const createCustomTransformer = (options: IKBotTask): AsyncTransformer => {
    return async (input: string, jsonPath: string): Promise<string> => {
        // Transform the input string based on options
        return transformedValue
    }
}

In-Place vs. Target Field Transformations

In-Place Transformation

To transform values in place, set targetPath to null:

{
    jsonPath: '$.products.*.*.description',
    targetPath: null,
    options: {
        prompt: 'Make this description more engaging'
    }
}

This will replace the original description with the transformed value.

Adding New Fields

To keep the original value and add a transformed version, specify a targetPath:

{
    jsonPath: '$.products.*.*.name',
    targetPath: 'marketingName',
    options: {
        prompt: 'Generate a marketing name based on this product'
    }
}

This keeps the original name and adds a new marketingName field.

Structured Output with Format Option

The format option allows you to define a JSON schema that the LLM output should conform to. This is extremely useful for ensuring consistent, structured responses that can be easily parsed and used in your application.

Basic Format Usage

To request structured output, add a format property to your field mapping options:

{
    jsonPath: '$.productReview.reviewText',
    targetPath: 'analysis',
    options: {
        prompt: 'Analyze this product review and extract key information',
        format: {
            type: "object",
            properties: {
                sentiment: {
                    type: "string",
                    enum: ["positive", "neutral", "negative"],
                    description: "The overall sentiment of the review"
                },
                pros: {
                    type: "array",
                    items: {
                        type: "string"
                    },
                    description: "Positive aspects mentioned in the review",
                    minItems: 1,
                    maxItems: 3
                },
                cons: {
                    type: "array",
                    items: {
                        type: "string"
                    },
                    description: "Negative aspects mentioned in the review",
                    minItems: 0,
                    maxItems: 3
                }
            },
            required: ["sentiment", "pros", "cons"]
        }
    }
}

Processing Structured Responses

The formatted response may be returned as a JSON string. When working with formatted responses, it's good practice to handle potential string parsing:

// After transformation
if (data.productReview && data.productReview.analysis) {
    try {
        // Parse the JSON string if needed
        const analysisJson = typeof data.productReview.analysis === 'string' 
            ? JSON.parse(data.productReview.analysis)
            : data.productReview.analysis;
        
        // Now you can work with the structured data
        console.log(`Sentiment: ${analysisJson.sentiment}`);
        console.log(`Pros: ${analysisJson.pros.join(', ')}`);
        console.log(`Cons: ${analysisJson.cons.join(', ')}`);
    } catch (e) {
        console.error("Error parsing structured output:", e);
    }
}

Best Practices for Formatted Output

  1. Clear Prompt Instructions: Include explicit instructions in your prompt about the expected format.
  2. Schema Validation: Use detailed JSON schemas with required fields and appropriate types.
  3. Parsing Handling: Always include error handling when parsing the output.
  4. Schema Examples: Consider including examples in your prompt for more complex schemas.

Format Option Example

Here's a complete example from the iterator-factory-example.ts file:

// Define a field mapping with format option
const fieldMappings = [
    {
        jsonPath: '$.productReview.reviewText',
        targetPath: 'analysis',
        options: {
            // Clear and explicit prompt that includes the schema format details
            prompt: `Analyze this product review and extract key information using EXACTLY the schema specified below.

The review: "Great selection of fruits with good prices and quality. Some items were out of stock."

Your response MUST be a valid JSON object following this exact schema:
{
  "sentiment": "positive" | "neutral" | "negative",
  "pros": ["string", "string"...],  // 1-3 items
  "cons": ["string"...]  // 0-3 items
}

Do not add any extra fields not in the schema, and make sure to use the exact field names as specified.`,
            // Schema validation ensures structured output format
            format: {
                type: "object",
                properties: {
                    sentiment: {
                        type: "string",
                        enum: ["positive", "neutral", "negative"],
                        description: "The overall sentiment of the review"
                    },
                    pros: {
                        type: "array",
                        items: {
                            type: "string"
                        },
                        description: "Positive aspects mentioned in the review",
                        minItems: 1,
                        maxItems: 3
                    },
                    cons: {
                        type: "array",
                        items: {
                            type: "string"
                        },
                        description: "Negative aspects mentioned in the review",
                        minItems: 0,
                        maxItems: 3
                    }
                },
                required: ["sentiment", "pros", "cons"]
            }
        }
    }
]

When run, this produces a structured output like:

{
  "sentiment": "positive",
  "pros": [
    "great selection of fruits",
    "good prices",
    "good quality"
  ],
  "cons": [
    "some items were out of stock"
  ]
}

This structured format is much easier to work with programmatically than free-form text responses.

Filtering

Filters determine which values should be transformed:

// Default filters that skip numbers, booleans, and empty strings
const defaultFilters = [isNumber, isBoolean, isValidString]

// Custom filter example
const skipFirstItem: FilterCallback = async (input, path) => {
    return !path.includes('[0]')
}

Throttling and Concurrency

Control API rate limits and parallel processing:

{
    throttleDelay: 1000,    // Milliseconds between requests
    concurrentTasks: 2      // Number of parallel transformations
}

Retry Mechanism

The Iterator includes a built-in retry mechanism for handling transient errors during transformations, particularly useful for API calls to external services like LLMs.

Configuration

Retry settings can be configured at both the global and individual field mapping levels:

// Global retry configuration
const iterator = createIterator(
    data,
    globalOptionsMixin,
    {
        maxRetries: 3,             // Maximum number of retry attempts
        retryDelay: 2000,          // Base delay in milliseconds between retries
        // ... other options
    }
)

// Field-specific retry configuration
const mappings = [
    {
        jsonPath: '$.products.*.description',
        targetPath: null,
        options: { /* ... */ },
        maxRetries: 5,             // Override global setting for this field
        retryDelay: 1000           // Override global setting for this field
    }
]

Behavior

When a transformation fails:

  1. The Iterator will wait for retryDelay milliseconds
  2. The delay increases exponentially with each retry attempt (backoff strategy)
  3. After maxRetries failed attempts, the error is passed to the errorCallback

This helps handle temporary issues like API rate limits, network connectivity problems, or service outages without failing the entire operation.

Caching Mechanism

The Iterator includes a powerful caching system to improve performance, reduce API costs, and speed up repeated operations.

Cache Configuration

Configure caching behavior when creating an iterator:

import { createIterator, DefaultCache, NoopCache } from '@polymech/kbot'

// Configure caching
const iterator = createIterator(
    data,
    globalOptionsMixin,
    {
        cache: {
            enabled: true,                    // Enable or disable caching
            implementation: new DefaultCache(), // Use default cache or provide custom
            namespace: 'my-custom-namespace'  // Custom namespace for cache entries
        },
        // ... other options
    }
)

Cache Implementations

The Iterator provides multiple cache implementations:

  1. DefaultCache: Uses the registered cache module to store and retrieve values
  2. NoopCache: A no-operation cache that doesn't actually cache anything
  3. Custom Implementation: Implement the CacheInterface to connect any caching system
// Custom cache implementation example
class RedisCache implements CacheInterface {
    constructor(private redisClient: any) {}
    
    async get(key: any): Promise<any> {
        const result = await this.redisClient.get(JSON.stringify(key));
        return result ? JSON.parse(result) : null;
    }
    
    async set(key: any, value: any): Promise<void> {
        await this.redisClient.set(JSON.stringify(key), JSON.stringify(value));
    }
    
    async delete(key: any): Promise<void> {
        await this.redisClient.del(JSON.stringify(key));
    }
}

// Use the custom cache
const iterator = createIterator(data, options, {
    cache: {
        enabled: true,
        implementation: new RedisCache(redisClient),
        namespace: 'product-transformations'
    }
})

Cache Behavior

  1. Caching Enabled: Before transformation, the Iterator checks if a result already exists in the cache. If found, it applies the cached result directly without calling the transformer. If not found, it performs the transformation and stores the result.

  2. Caching Disabled: When caching is explicitly disabled, the system will remove any existing cache entries for the current transformations. This ensures that stale data isn't accidentally used later if caching is re-enabled.

Registering a Cache Module

To use the DefaultCache implementation, you must register a cache module:

import { registerCacheModule } from '@polymech/kbot'

// Your cache module must implement these methods
const myCacheModule = {
    get_cached_object: async (key, namespace) => { /* ... */ },
    set_cached_object: async (key, namespace, value, options) => { /* ... */ },
    rm_cached_object: async (key, namespace) => { /* ... */ }
}

// Register the cache module
registerCacheModule(myCacheModule)

Cache Keys

Cache keys are automatically generated based on:

  • JSONPath expression
  • Target path (if any)
  • Transformation options
  • Retry settings

This ensures that different transformations produce different cache entries, while identical transformations reuse cached results.

Combined Example with Caching and Retry

Here's a complete example showcasing both caching and retry mechanisms:

import { createIterator, FieldMapping, DefaultCache, registerCacheModule } from '@polymech/kbot'

// Setup a simple mock cache module
const mockCacheModule = {
    storage: new Map(),
    get_cached_object: async (key, namespace) => {
        const cacheKey = `${namespace}:${JSON.stringify(key)}`;
        console.log(`Looking up cache: ${cacheKey}`);
        return mockCacheModule.storage.get(cacheKey);
    },
    set_cached_object: async (key, namespace, value) => {
        const cacheKey = `${namespace}:${JSON.stringify(key)}`;
        console.log(`Storing in cache: ${cacheKey}`);
        mockCacheModule.storage.set(cacheKey, value);
    },
    rm_cached_object: async (key, namespace) => {
        const cacheKey = `${namespace}:${JSON.stringify(key)}`;
        console.log(`Removing from cache: ${cacheKey}`);
        const exists = mockCacheModule.storage.has(cacheKey);
        mockCacheModule.storage.delete(cacheKey);
        return exists;
    }
};

// Register the cache module
registerCacheModule(mockCacheModule);

async function transformProductsWithCaching() {
    // Product data
    const data = {
        products: {
            fruits: [
                {
                    id: 'f1',
                    name: 'apple',
                    description: 'A sweet fruit',
                },
                {
                    id: 'f2',
                    name: 'banana',
                    description: 'A yellow fruit',
                }
            ]
        }
    };

    let requestCount = 0;
    
    // Create a transformer factory that simulates occasional failures
    const createLLMTransformer = (options): AsyncTransformer => {
        return async (input, path) => {
            requestCount++;
            
            // Simulate occasional failures to demonstrate retry
            if (requestCount % 3 === 0) {
                throw new Error('API rate limit exceeded');
            }
            
            console.log(`Transforming ${path}: ${input}`);
            return `Enhanced: ${input}`;
        }
    };

    // Create iterator with caching and retry
    const iterator = createIterator(
        data,
        { model: 'openai/gpt-4' },
        {
            throttleDelay: 1000,
            concurrentTasks: 1,
            transformerFactory: createLLMTransformer,
            maxRetries: 3,
            retryDelay: 1000,
            cache: {
                enabled: true,
                implementation: new DefaultCache(),
                namespace: 'product-transformations'
            },
            errorCallback: (path, value, error) => {
                console.error(`Failed to transform ${path}: ${error.message}`);
            }
        }
    );

    // Define transformations
    const mappings: FieldMapping[] = [
        {
            jsonPath: '$.products.fruits.*.description',
            targetPath: null,
            options: {
                prompt: 'Make this description more detailed'
            },
            maxRetries: 5  // Override global retry setting
        },
        {
            jsonPath: '$.products.fruits.*.name',
            targetPath: 'marketingName',
            options: {
                prompt: 'Generate a marketing name for this product'
            }
        }
    ];

    // First run - will perform transformations and cache results
    console.log("First run - transforming and caching:");
    await iterator.transform(mappings);
    
    // Second run - will use cached results
    console.log("\nSecond run - using cached results:");
    await iterator.transform(mappings);
    
    // Output the transformed data
    console.log("\nTransformed data:");
    console.log(JSON.stringify(data, null, 2));
}

Best Practices

  1. Be specific with JSONPaths: Use precise JSONPath expressions to target only the fields you want to transform.

  2. Handle errors gracefully: Provide an error callback to handle failed transformations without breaking the entire process.

  3. Respect rate limits: Set appropriate throttle delays when working with external APIs.

  4. Test with small datasets first: Validate your transformations on a smaller subset before processing large datasets.

  5. Prefer targeted transformations: Transform only what you need to minimize costs and processing time.

  6. Use caching for expensive operations: Enable caching for transformations that are computationally expensive or costly, such as LLM API calls.

  7. Configure appropriate retry settings: Set retry limits and delays based on the expected reliability of your transformers. More retries with longer delays for less reliable services.

  8. Use targetPath for non-destructive transformations: When generating new content related to existing fields, use targetPath to preserve the original data.

  9. Use format for structured outputs: When you need consistent, structured data from LLMs, use the format option with clear JSON schemas.

  10. Include schema details in prompts: For complex schemas, include the schema structure in your prompt to guide the LLM.

  11. Handle string parsing: Always add error handling when parsing structured responses, as they may be returned as string JSON.

  12. Implement custom cache for production: For production scenarios, implement a persistent cache solution rather than relying on in-memory caching.

  13. Use appropriate namespaces: When multiple parts of your application use the same cache implementation, use distinct namespaces to prevent collisions.

API Reference

Main Functions

  • createIterator(data, optionsMixin, globalOptions): Creates an iterator instance
  • transformObjectWithOptions(obj, transform, options): Low-level function to transform objects
  • transformObject(obj, transform, path, ...): Transforms matching paths in an object

Helper Functions

  • testFilters(filters): Creates a filter callback from filter functions
  • defaultFilters(): Returns commonly used filters
  • defaultError: Default error handler that logs to console

Types and Interfaces

  • AsyncTransformer: Function that transforms strings asynchronously
  • FilterCallback: Function that determines if a value should be transformed
  • ErrorCallback: Function that handles transformation errors
  • FieldMapping: Configuration for a transformation
    • jsonPath: JSONPath expression to select values
    • targetPath: Optional field to store transformed value (null for in-place)
    • options: Configuration for the transformation including:
      • prompt: The prompt for the LLM
      • format: Optional JSON schema for structured output
  • TransformOptions: Options for the transformation process
  • CacheConfig: Configuration for the caching mechanism
  • INetworkOptions: Configuration for throttling and concurrency

Limitations

  1. The Iterator works with string values; objects and arrays are traversed but not directly transformed.
  2. Large datasets might require pagination or chunking for efficient processing.
  3. External API rate limits might require careful throttling configuration.

Troubleshooting

Common issues and solutions:

  • No transformations occurring: Check your JSONPath expressions and filter conditions
  • Unexpected field structure: Examine the exact structure of your data
  • Rate limiting errors: Increase the throttleDelay between requests
  • Transformation errors: Implement a custom error callback for detailed logging

Running the Examples

To run the included examples:

# Run the basic async iterator example
npm run examples:async-iterator

# Run the iterator factory example
npm run examples:iterator-factory

# Run with debug logging
npm run examples:async-iterator -- --debug

# Run with caching disabled (forces fresh responses)
npm run examples:iterator-factory -- --no-cache

The examples will transform sample JSON data and save the results to the tests/test-data/core/ directory.

See Also