mono/packages/kbot/docs_/iterator.md
2025-04-07 16:01:06 +02:00

23 KiB

Iterator Documentation

The Iterator module provides a powerful way to transform data structures using asynchronous operations, particularly suited for applying LLM-based transformations to JSON data. This document covers the core functionality, usage patterns, and examples.

Overview

The Iterator module allows you to:

  1. Define mappings between JSON paths and transformations
  2. Apply transformations in place or to new fields
  3. Customize filtering, error handling, and concurrency
  4. Chain multiple transformations together

Example Implementations

For complete working examples, see:

These examples demonstrate transforming a sample product dataset with various JSONPath expressions and LLM-powered transformations.

Core Components

AsyncTransformer

The fundamental unit that applies transformations:

type AsyncTransformer = (input: string, path: string) => Promise<string>

Every transformer takes a string input and its path in the JSON structure, then returns a transformed string.

Field Mappings

Field mappings define which parts of the data to transform and how:

interface FieldMapping {
    jsonPath: string       // JSONPath expression to find values
    targetPath?: string    // Optional target field for transformed values
    options?: IKBotTask    // Options for the transformation
}

Basic Usage

Creating an Iterator

import { createIterator } from '@polymech/kbot'

// Create an iterator instance
const iterator = createIterator(
    data,                  // The data to transform
    globalOptionsMixin,    // Global options for all transformations
    {
        throttleDelay: 1000,
        concurrentTasks: 1,
        errorCallback: (path, value, error) => console.error(`Error at ${path}: ${error.message}`),
        filterCallback: async () => true,
        transformerFactory: createCustomTransformer
    }
)

// Define field mappings
const mappings = [
    {
        jsonPath: '$.products.*.name',
        targetPath: null,  // Transform in place
        options: {
            prompt: 'Make this product name more appealing'
        }
    }
]

// Apply transformations
await iterator.transform(mappings)

JSONPath Patterns

The Iterator uses JSONPath to identify fields for transformation:

  • $..name - All name fields at any level
  • $.products..name - All name fields under the products key
  • $.products.*.*.name - Names of items in product categories
  • $[*].description - All description fields at the first level

Advanced Usage

Custom Transformers

Create custom transformers for specific transformation logic:

const createCustomTransformer = (options: IKBotTask): AsyncTransformer => {
    return async (input: string, jsonPath: string): Promise<string> => {
        // Transform the input string based on options
        return transformedValue
    }
}

In-Place vs. Target Field Transformations

In-Place Transformation

To transform values in place, set targetPath to null:

{
    jsonPath: '$.products.*.*.description',
    targetPath: null,
    options: {
        prompt: 'Make this description more engaging'
    }
}

This will replace the original description with the transformed value.

Adding New Fields

To keep the original value and add a transformed version, specify a targetPath:

{
    jsonPath: '$.products.*.*.name',
    targetPath: 'marketingName',
    options: {
        prompt: 'Generate a marketing name based on this product'
    }
}

This keeps the original name and adds a new marketingName field.

Structured Output with Format Option

The format option allows you to define a JSON schema that the LLM output should conform to. This is extremely useful for ensuring consistent, structured responses that can be easily parsed and used in your application.

Basic Format Usage

To request structured output, add a format property to your field mapping options:

{
    jsonPath: '$.productReview.reviewText',
    targetPath: 'analysis',
    options: {
        prompt: 'Analyze this product review and extract key information',
        format: {
            type: "object",
            properties: {
                sentiment: {
                    type: "string",
                    enum: ["positive", "neutral", "negative"],
                    description: "The overall sentiment of the review"
                },
                pros: {
                    type: "array",
                    items: {
                        type: "string"
                    },
                    description: "Positive aspects mentioned in the review",
                    minItems: 1,
                    maxItems: 3
                },
                cons: {
                    type: "array",
                    items: {
                        type: "string"
                    },
                    description: "Negative aspects mentioned in the review",
                    minItems: 0,
                    maxItems: 3
                }
            },
            required: ["sentiment", "pros", "cons"]
        }
    }
}

Processing Structured Responses

The formatted response may be returned as a JSON string. When working with formatted responses, it's good practice to handle potential string parsing:

// After transformation
if (data.productReview && data.productReview.analysis) {
    try {
        // Parse the JSON string if needed
        const analysisJson = typeof data.productReview.analysis === 'string' 
            ? JSON.parse(data.productReview.analysis)
            : data.productReview.analysis;
        
        // Now you can work with the structured data
        console.log(`Sentiment: ${analysisJson.sentiment}`);
        console.log(`Pros: ${analysisJson.pros.join(', ')}`);
        console.log(`Cons: ${analysisJson.cons.join(', ')}`);
    } catch (e) {
        console.error("Error parsing structured output:", e);
    }
}

Best Practices for Formatted Output

  1. Clear Prompt Instructions: Include explicit instructions in your prompt about the expected format.
  2. Schema Validation: Use detailed JSON schemas with required fields and appropriate types.
  3. Parsing Handling: Always include error handling when parsing the output.
  4. Schema Examples: Consider including examples in your prompt for more complex schemas.

Format Option Example

Here's a complete example from the iterator-factory-example.ts file:

// Define a field mapping with format option
const fieldMappings = [
    {
        jsonPath: '$.productReview.reviewText',
        targetPath: 'analysis',
        options: {
            // Clear and explicit prompt that includes the schema format details
            prompt: `Analyze this product review and extract key information using EXACTLY the schema specified below.

The review: "Great selection of fruits with good prices and quality. Some items were out of stock."

Your response MUST be a valid JSON object following this exact schema:
{
  "sentiment": "positive" | "neutral" | "negative",
  "pros": ["string", "string"...],  // 1-3 items
  "cons": ["string"...]  // 0-3 items
}

Do not add any extra fields not in the schema, and make sure to use the exact field names as specified.`,
            // Schema validation ensures structured output format
            format: {
                type: "object",
                properties: {
                    sentiment: {
                        type: "string",
                        enum: ["positive", "neutral", "negative"],
                        description: "The overall sentiment of the review"
                    },
                    pros: {
                        type: "array",
                        items: {
                            type: "string"
                        },
                        description: "Positive aspects mentioned in the review",
                        minItems: 1,
                        maxItems: 3
                    },
                    cons: {
                        type: "array",
                        items: {
                            type: "string"
                        },
                        description: "Negative aspects mentioned in the review",
                        minItems: 0,
                        maxItems: 3
                    }
                },
                required: ["sentiment", "pros", "cons"]
            }
        }
    }
]

When run, this produces a structured output like:

{
  "sentiment": "positive",
  "pros": [
    "great selection of fruits",
    "good prices",
    "good quality"
  ],
  "cons": [
    "some items were out of stock"
  ]
}

This structured format is much easier to work with programmatically than free-form text responses.

Filtering

Filters determine which values should be transformed:

// Default filters that skip numbers, booleans, and empty strings
const defaultFilters = [isNumber, isBoolean, isValidString]

// Custom filter example
const skipFirstItem: FilterCallback = async (input, path) => {
    return !path.includes('[0]')
}

Throttling and Concurrency

Control API rate limits and parallel processing:

{
    throttleDelay: 1000,    // Milliseconds between requests
    concurrentTasks: 2      // Number of parallel transformations
}

Retry Mechanism

The Iterator includes a built-in retry mechanism for handling transient errors during transformations, particularly useful for API calls to external services like LLMs.

Configuration

Retry settings can be configured at both the global and individual field mapping levels:

// Global retry configuration
const iterator = createIterator(
    data,
    globalOptionsMixin,
    {
        maxRetries: 3,             // Maximum number of retry attempts
        retryDelay: 2000,          // Base delay in milliseconds between retries
        // ... other options
    }
)

// Field-specific retry configuration
const mappings = [
    {
        jsonPath: '$.products.*.description',
        targetPath: null,
        options: { /* ... */ },
        maxRetries: 5,             // Override global setting for this field
        retryDelay: 1000           // Override global setting for this field
    }
]

Behavior

When a transformation fails:

  1. The Iterator will wait for retryDelay milliseconds
  2. The delay increases exponentially with each retry attempt (backoff strategy)
  3. After maxRetries failed attempts, the error is passed to the errorCallback

This helps handle temporary issues like API rate limits, network connectivity problems, or service outages without failing the entire operation.

Caching Mechanism

The Iterator includes a powerful caching system to improve performance, reduce API costs, and speed up repeated operations.

Cache Configuration

Configure caching behavior when creating an iterator:

import { createIterator, DefaultCache, NoopCache } from '@polymech/kbot'

// Configure caching
const iterator = createIterator(
    data,
    globalOptionsMixin,
    {
        cache: {
            enabled: true,                    // Enable or disable caching
            implementation: new DefaultCache(), // Use default cache or provide custom
            namespace: 'my-custom-namespace'  // Custom namespace for cache entries
        },
        // ... other options
    }
)

Cache Implementations

The Iterator provides multiple cache implementations:

  1. DefaultCache: Uses the registered cache module to store and retrieve values
  2. NoopCache: A no-operation cache that doesn't actually cache anything
  3. Custom Implementation: Implement the CacheInterface to connect any caching system
// Custom cache implementation example
class RedisCache implements CacheInterface {
    constructor(private redisClient: any) {}
    
    async get(key: any): Promise<any> {
        const result = await this.redisClient.get(JSON.stringify(key));
        return result ? JSON.parse(result) : null;
    }
    
    async set(key: any, value: any): Promise<void> {
        await this.redisClient.set(JSON.stringify(key), JSON.stringify(value));
    }
    
    async delete(key: any): Promise<void> {
        await this.redisClient.del(JSON.stringify(key));
    }
}

// Use the custom cache
const iterator = createIterator(data, options, {
    cache: {
        enabled: true,
        implementation: new RedisCache(redisClient),
        namespace: 'product-transformations'
    }
})

Cache Behavior

  1. Caching Enabled: Before transformation, the Iterator checks if a result already exists in the cache. If found, it applies the cached result directly without calling the transformer. If not found, it performs the transformation and stores the result.

  2. Caching Disabled: When caching is explicitly disabled, the system will remove any existing cache entries for the current transformations. This ensures that stale data isn't accidentally used later if caching is re-enabled.

Registering a Cache Module

To use the DefaultCache implementation, you must register a cache module:

import { registerCacheModule } from '@polymech/kbot'

// Your cache module must implement these methods
const myCacheModule = {
    get_cached_object: async (key, namespace) => { /* ... */ },
    set_cached_object: async (key, namespace, value, options) => { /* ... */ },
    rm_cached_object: async (key, namespace) => { /* ... */ }
}

// Register the cache module
registerCacheModule(myCacheModule)

Cache Keys

Cache keys are automatically generated based on:

  • JSONPath expression
  • Target path (if any)
  • Transformation options
  • Retry settings

This ensures that different transformations produce different cache entries, while identical transformations reuse cached results.

Combined Example with Caching and Retry

Here's a complete example showcasing both caching and retry mechanisms:

import { createIterator, FieldMapping, DefaultCache, registerCacheModule } from '@polymech/kbot'

// Setup a simple mock cache module
const mockCacheModule = {
    storage: new Map(),
    get_cached_object: async (key, namespace) => {
        const cacheKey = `${namespace}:${JSON.stringify(key)}`;
        console.log(`Looking up cache: ${cacheKey}`);
        return mockCacheModule.storage.get(cacheKey);
    },
    set_cached_object: async (key, namespace, value) => {
        const cacheKey = `${namespace}:${JSON.stringify(key)}`;
        console.log(`Storing in cache: ${cacheKey}`);
        mockCacheModule.storage.set(cacheKey, value);
    },
    rm_cached_object: async (key, namespace) => {
        const cacheKey = `${namespace}:${JSON.stringify(key)}`;
        console.log(`Removing from cache: ${cacheKey}`);
        const exists = mockCacheModule.storage.has(cacheKey);
        mockCacheModule.storage.delete(cacheKey);
        return exists;
    }
};

// Register the cache module
registerCacheModule(mockCacheModule);

async function transformProductsWithCaching() {
    // Product data
    const data = {
        products: {
            fruits: [
                {
                    id: 'f1',
                    name: 'apple',
                    description: 'A sweet fruit',
                },
                {
                    id: 'f2',
                    name: 'banana',
                    description: 'A yellow fruit',
                }
            ]
        }
    };

    let requestCount = 0;
    
    // Create a transformer factory that simulates occasional failures
    const createLLMTransformer = (options): AsyncTransformer => {
        return async (input, path) => {
            requestCount++;
            
            // Simulate occasional failures to demonstrate retry
            if (requestCount % 3 === 0) {
                throw new Error('API rate limit exceeded');
            }
            
            console.log(`Transforming ${path}: ${input}`);
            return `Enhanced: ${input}`;
        }
    };

    // Create iterator with caching and retry
    const iterator = createIterator(
        data,
        { model: 'openai/gpt-4' },
        {
            throttleDelay: 1000,
            concurrentTasks: 1,
            transformerFactory: createLLMTransformer,
            maxRetries: 3,
            retryDelay: 1000,
            cache: {
                enabled: true,
                implementation: new DefaultCache(),
                namespace: 'product-transformations'
            },
            errorCallback: (path, value, error) => {
                console.error(`Failed to transform ${path}: ${error.message}`);
            }
        }
    );

    // Define transformations
    const mappings: FieldMapping[] = [
        {
            jsonPath: '$.products.fruits.*.description',
            targetPath: null,
            options: {
                prompt: 'Make this description more detailed'
            },
            maxRetries: 5  // Override global retry setting
        },
        {
            jsonPath: '$.products.fruits.*.name',
            targetPath: 'marketingName',
            options: {
                prompt: 'Generate a marketing name for this product'
            }
        }
    ];

    // First run - will perform transformations and cache results
    console.log("First run - transforming and caching:");
    await iterator.transform(mappings);
    
    // Second run - will use cached results
    console.log("\nSecond run - using cached results:");
    await iterator.transform(mappings);
    
    // Output the transformed data
    console.log("\nTransformed data:");
    console.log(JSON.stringify(data, null, 2));
}

Best Practices

  1. Be specific with JSONPaths: Use precise JSONPath expressions to target only the fields you want to transform.

  2. Handle errors gracefully: Provide an error callback to handle failed transformations without breaking the entire process.

  3. Respect rate limits: Set appropriate throttle delays when working with external APIs.

  4. Test with small datasets first: Validate your transformations on a smaller subset before processing large datasets.

  5. Prefer targeted transformations: Transform only what you need to minimize costs and processing time.

  6. Use caching for expensive operations: Enable caching for transformations that are computationally expensive or costly, such as LLM API calls.

  7. Configure appropriate retry settings: Set retry limits and delays based on the expected reliability of your transformers. More retries with longer delays for less reliable services.

  8. Use targetPath for non-destructive transformations: When generating new content related to existing fields, use targetPath to preserve the original data.

  9. Use format for structured outputs: When you need consistent, structured data from LLMs, use the format option with clear JSON schemas.

  10. Include schema details in prompts: For complex schemas, include the schema structure in your prompt to guide the LLM.

  11. Handle string parsing: Always add error handling when parsing structured responses, as they may be returned as string JSON.

  12. Implement custom cache for production: For production scenarios, implement a persistent cache solution rather than relying on in-memory caching.

  13. Use appropriate namespaces: When multiple parts of your application use the same cache implementation, use distinct namespaces to prevent collisions.

API Reference

Main Functions

  • createIterator(data, optionsMixin, globalOptions): Creates an iterator instance
  • transformObjectWithOptions(obj, transform, options): Low-level function to transform objects
  • transformObject(obj, transform, path, ...): Transforms matching paths in an object

Helper Functions

  • testFilters(filters): Creates a filter callback from filter functions
  • defaultFilters(): Returns commonly used filters
  • defaultError: Default error handler that logs to console

Types and Interfaces

  • AsyncTransformer: Function that transforms strings asynchronously
  • FilterCallback: Function that determines if a value should be transformed
  • ErrorCallback: Function that handles transformation errors
  • FieldMapping: Configuration for a transformation
    • jsonPath: JSONPath expression to select values
    • targetPath: Optional field to store transformed value (null for in-place)
    • options: Configuration for the transformation including:
      • prompt: The prompt for the LLM
      • format: Optional JSON schema for structured output
  • TransformOptions: Options for the transformation process
  • CacheConfig: Configuration for the caching mechanism
  • INetworkOptions: Configuration for throttling and concurrency

Limitations

  1. The Iterator works with string values; objects and arrays are traversed but not directly transformed.
  2. Large datasets might require pagination or chunking for efficient processing.
  3. External API rate limits might require careful throttling configuration.

Troubleshooting

Common issues and solutions:

  • No transformations occurring: Check your JSONPath expressions and filter conditions
  • Unexpected field structure: Examine the exact structure of your data
  • Rate limiting errors: Increase the throttleDelay between requests
  • Transformation errors: Implement a custom error callback for detailed logging

Running the Examples

To run the included examples:

# Run the basic async iterator example
npm run examples:async-iterator

# Run the iterator factory example
npm run examples:iterator-factory

# Run with debug logging
npm run examples:async-iterator -- --debug

# Run with caching disabled (forces fresh responses)
npm run examples:iterator-factory -- --no-cache

The examples will transform sample JSON data and save the results to the tests/test-data/core/ directory.

See Also