# Data Transformation with Iterators The Iterator module provides a powerful and flexible way to transform complex data structures (like JSON objects) using asynchronous operations, especially useful for applying LLM transformations to specific fields. ## Key Concepts * **Targeted Transformation:** Use JSONPath expressions to select specific fields or elements within your data for transformation. * **LLM Integration:** Seamlessly integrate Large Language Models (LLMs) to modify text, generate content, or analyze data based on prompts. * **Structured Output:** Define JSON schemas using the `format` option to ensure LLM outputs conform to a required structure. * **In-Place or New Fields:** Choose whether to modify data directly or add transformed results to new fields. * **Customization:** Control concurrency, rate limiting, error handling, filtering, and caching. * **Callbacks:** Hook into the transformation process using `onTransform` (before LLM call) and `onTransformed` (after LLM call) callbacks. ## Core Components & Usage There are two main ways to use the transformation capabilities: 1. **`transform` Helper Function:** A simplified approach for common use cases. 2. **`createIterator` Factory:** Provides more control and customization options. ### 1. Simplified Usage: `transform` Function For straightforward transformations, the `transform` function offers a minimal setup: ```typescript import { transform, FieldMapping, E_Mode } from '@polymech/kbot'; // Sample Data const data = { product: { description: "Old description" } }; // Field mapping definition const mappings: FieldMapping[] = [ { jsonPath: '$.product.description', targetPath: null, // Transform in-place options: { prompt: 'Rewrite this description to be more exciting' } } ]; // Global LLM options const llmOptions = { model: 'openai/gpt-4o', router: 'openai', mode: E_Mode.COMPLETION }; // Optional callbacks const callbacks = { onTransform: async (path, value) => { console.log(`Transforming: ${path}`); return value; }, onTransformed: async (path, newValue) => { console.log(`Transformed: ${path}`); return newValue; } }; // Perform the transformation await transform(data, mappings, llmOptions, callbacks); console.log(data.product.description); // Output: The exciting new description ``` See the `simpleTransformExample` function within `src/examples/core/iterator-factory-example.ts` for a runnable demonstration. ### 2. Advanced Usage: `createIterator` Factory The `createIterator` factory provides maximum flexibility for complex scenarios, including fine-grained control over network options, caching, logging, and transformer creation. ```typescript import { createIterator, createLLMTransformer, FieldMapping, IOptions, CacheConfig, INetworkOptions, E_Mode } from '@polymech/kbot'; import { getLogger } from '@polymech/kbot'; // Assuming logger setup const logger = getLogger({ logLevel: 4 }); // Sample Data const data = { products: [{ id: 'p1', name: 'Apple', details: { nutrition: 'Rich in fiber' } }] }; // Global LLM options mixin const globalOptionsMixin = { model: 'anthropic/claude-3.5-sonnet', router: 'openrouter', mode: E_Mode.COMPLETION }; // Field Mappings const fieldMappings: FieldMapping[] = [ { jsonPath: '$.products[*].name', targetPath: 'marketingName', // Add a new field options: { prompt: 'Create a catchy marketing name' } }, { jsonPath: '$.products[*].details.nutrition', targetPath: null, // Transform in-place options: { prompt: 'Expand nutrition info with health benefits (20 words)' } } ]; // Network Configuration const networkOptions: INetworkOptions = { throttleDelay: 500, concurrentTasks: 2, maxRetries: 3, retryDelay: 1000 }; // Cache Configuration const cacheConfig: CacheConfig = { enabled: true, namespace: 'product-info-transforms', expiration: 3600 // 1 hour in seconds }; // Iterator Options const iteratorOptions: IOptions = { network: networkOptions, errorCallback: (path, value, error) => logger.error(`Error at ${path}: ${error.message}`), filterCallback: async () => true, // Example: only transform strings transformerFactory: (opts) => createLLMTransformer(opts, logger, cacheConfig), logger, cacheConfig, onTransform: async (path, value, opts) => { logger.debug(`About to transform ${path}`); return value; }, onTransformed: async (path, transformedValue, opts) => { logger.debug(`Finished transforming ${path}`); return transformedValue; } }; // Create the iterator instance const iterator = createIterator( data, globalOptionsMixin, iteratorOptions ); // Apply transformations await iterator.transform(fieldMappings); console.log(JSON.stringify(data, null, 2)); /* Output might look like: { "products": [ { "id": "p1", "name": "Apple", "details": { "nutrition": "Rich in fiber, supporting digestion and heart health. Apples provide essential vitamins for overall well-being." }, "marketingName": "Orchard Crisp Delight" } ] } */ ``` Refer to the `factoryExample` function in `src/examples/core/iterator-factory-example.ts` for a comprehensive, runnable example demonstrating caching, structured output, and callbacks. ## Field Mappings (`FieldMapping`) Define *what* to transform and *how*: ```typescript interface FieldMapping { jsonPath: string; // JSONPath expression identifying data to transform. targetPath: string | null; // Where to put the result. `null` for in-place, or a string for a new relative field name. options?: Partial; // LLM options (prompt, model, format, etc.) specific to this mapping. } ``` ## Callbacks (`onTransform`, `onTransformed`) Inject custom logic before and after the core transformation (e.g., the LLM call): * **`onTransform(jsonPath, originalValue, options)`:** Called just before the value is sent to the transformer (LLM). You can modify the `originalValue` before it's processed. * **Note:** For non-string values (like arrays), `originalValue` will be the original non-string data type. Ensure your callback handles this or stringifies if necessary before returning. * **`onTransformed(jsonPath, transformedValue, options)`:** Called after the transformer returns a result. You can modify the `transformedValue` before it's written back to the data object. ## Structured Output (`format` Option) The `format` option within a `FieldMapping`'s `options` ensures the LLM's output conforms to a specific JSON schema. This is crucial for reliable data extraction and processing. ```typescript // Example FieldMapping using 'format' { jsonPath: '$.reviewText', targetPath: 'analysis', options: { prompt: 'Analyze sentiment, pros, and cons from this review.', format: { type: "object", properties: { sentiment: { type: "string", enum: ["positive", "neutral", "negative"] }, pros: { type: "array", items: { type: "string" } }, cons: { type: "array", items: { type: "string" } } }, required: ["sentiment", "pros", "cons"] } } } ``` * The LLM is instructed to return a JSON object matching the schema. * The result assigned to `data.analysis` will typically be a *string* containing the JSON. You'll likely need to `JSON.parse()` it. * See the `iterator-factory-example.ts` for a full example of defining and handling formatted output. ## Caching (`CacheConfig`) Improve performance and reduce costs by caching transformation results. ```typescript interface CacheConfig { enabled: boolean; // Master switch for caching namespace?: string; // Optional prefix for cache keys (recommended) expiration?: number; // Cache duration in seconds // implementation?: CacheInterface; // Advanced: Provide a custom cache backend } ``` * Caching is configured within the `IOptions` passed to `createIterator`. * The default cache uses `@polymech/cache`. * Cache keys are generated based on the input value and transformation options. * The `iterator-factory-example.ts` includes logic to demonstrate caching and how to clear specific cache entries using `rm_cached_object` from `@polymech/cache` for testing purposes. ## Network Configuration (`INetworkOptions`) Fine-tune network behavior for API calls: ```typescript interface INetworkOptions { throttleDelay?: number; // ms delay between requests concurrentTasks?: number; // Max parallel requests maxRetries?: number; // Retries on failure retryDelay?: number; // Base delay (ms) between retries (exponential backoff applied) } ``` ## Examples Explore the source code for detailed, runnable examples: * **`src/examples/core/iterator-factory-example.ts`**: Demonstrates `createIterator`, `transform`, callbacks, caching, structured output (`format`), and handling different data types (strings, number arrays). ## Overview The Iterator module allows you to: 1. Define mappings between JSON paths and transformations 2. Apply transformations in place or to new fields 3. Customize filtering, error handling, and concurrency 4. Chain multiple transformations together ## Example Implementations For complete working examples, see: - [`async-iterator-example.ts`](../src/examples/core/async-iterator-example.ts): Shows basic data transformation with LLM and targetPath usage - [`iterator-factory-example.ts`](../src/examples/core/iterator-factory-example.ts): Demonstrates factory pattern with multiple field transformations These examples demonstrate transforming a sample product dataset with various JSONPath expressions and LLM-powered transformations. ## Core Components ### AsyncTransformer The fundamental unit that applies transformations: ```typescript type AsyncTransformer = (input: string, path: string) => Promise ``` Every transformer takes a string input and its path in the JSON structure, then returns a transformed string. ### Field Mappings Field mappings define which parts of the data to transform and how: ```typescript interface FieldMapping { jsonPath: string // JSONPath expression to find values targetPath?: string // Optional target field for transformed values options?: IKBotTask // Options for the transformation } ``` ## Basic Usage ### Creating an Iterator ```typescript import { createIterator } from '@polymech/kbot' // Create an iterator instance const iterator = createIterator( data, // The data to transform globalOptionsMixin, // Global options for all transformations { throttleDelay: 1000, concurrentTasks: 1, errorCallback: (path, value, error) => console.error(`Error at ${path}: ${error.message}`), filterCallback: async () => true, transformerFactory: createCustomTransformer } ) // Define field mappings const mappings = [ { jsonPath: '$.products.*.name', targetPath: null, // Transform in place options: { prompt: 'Make this product name more appealing' } } ] // Apply transformations await iterator.transform(mappings) ``` ### JSONPath Patterns The Iterator uses JSONPath to identify fields for transformation: - `$..name` - All name fields at any level - `$.products..name` - All name fields under the products key - `$.products.*.*.name` - Names of items in product categories - `$[*].description` - All description fields at the first level ## Advanced Usage ### Custom Transformers Create custom transformers for specific transformation logic: ```typescript const createCustomTransformer = (options: IKBotTask): AsyncTransformer => { return async (input: string, jsonPath: string): Promise => { // Transform the input string based on options return transformedValue } } ``` ### In-Place vs. Target Field Transformations #### In-Place Transformation To transform values in place, set `targetPath` to `null`: ```typescript { jsonPath: '$.products.*.*.description', targetPath: null, options: { prompt: 'Make this description more engaging' } } ``` This will replace the original description with the transformed value. #### Adding New Fields To keep the original value and add a transformed version, specify a `targetPath`: ```typescript { jsonPath: '$.products.*.*.name', targetPath: 'marketingName', options: { prompt: 'Generate a marketing name based on this product' } } ``` This keeps the original `name` and adds a new `marketingName` field. ### Structured Output with Format Option The `format` option allows you to define a JSON schema that the LLM output should conform to. This is extremely useful for ensuring consistent, structured responses that can be easily parsed and used in your application. #### Basic Format Usage To request structured output, add a `format` property to your field mapping options: ```typescript { jsonPath: '$.productReview.reviewText', targetPath: 'analysis', options: { prompt: 'Analyze this product review and extract key information', format: { type: "object", properties: { sentiment: { type: "string", enum: ["positive", "neutral", "negative"], description: "The overall sentiment of the review" }, pros: { type: "array", items: { type: "string" }, description: "Positive aspects mentioned in the review", minItems: 1, maxItems: 3 }, cons: { type: "array", items: { type: "string" }, description: "Negative aspects mentioned in the review", minItems: 0, maxItems: 3 } }, required: ["sentiment", "pros", "cons"] } } } ``` #### Processing Structured Responses The formatted response may be returned as a JSON string. When working with formatted responses, it's good practice to handle potential string parsing: ```typescript // After transformation if (data.productReview && data.productReview.analysis) { try { // Parse the JSON string if needed const analysisJson = typeof data.productReview.analysis === 'string' ? JSON.parse(data.productReview.analysis) : data.productReview.analysis; // Now you can work with the structured data console.log(`Sentiment: ${analysisJson.sentiment}`); console.log(`Pros: ${analysisJson.pros.join(', ')}`); console.log(`Cons: ${analysisJson.cons.join(', ')}`); } catch (e) { console.error("Error parsing structured output:", e); } } ``` #### Best Practices for Formatted Output 1. **Clear Prompt Instructions**: Include explicit instructions in your prompt about the expected format. 2. **Schema Validation**: Use detailed JSON schemas with required fields and appropriate types. 3. **Parsing Handling**: Always include error handling when parsing the output. 4. **Schema Examples**: Consider including examples in your prompt for more complex schemas. #### Format Option Example Here's a complete example from the iterator-factory-example.ts file: ```typescript // Define a field mapping with format option const fieldMappings = [ { jsonPath: '$.productReview.reviewText', targetPath: 'analysis', options: { // Clear and explicit prompt that includes the schema format details prompt: `Analyze this product review and extract key information using EXACTLY the schema specified below. The review: "Great selection of fruits with good prices and quality. Some items were out of stock." Your response MUST be a valid JSON object following this exact schema: { "sentiment": "positive" | "neutral" | "negative", "pros": ["string", "string"...], // 1-3 items "cons": ["string"...] // 0-3 items } Do not add any extra fields not in the schema, and make sure to use the exact field names as specified.`, // Schema validation ensures structured output format format: { type: "object", properties: { sentiment: { type: "string", enum: ["positive", "neutral", "negative"], description: "The overall sentiment of the review" }, pros: { type: "array", items: { type: "string" }, description: "Positive aspects mentioned in the review", minItems: 1, maxItems: 3 }, cons: { type: "array", items: { type: "string" }, description: "Negative aspects mentioned in the review", minItems: 0, maxItems: 3 } }, required: ["sentiment", "pros", "cons"] } } } ] ``` When run, this produces a structured output like: ```json { "sentiment": "positive", "pros": [ "great selection of fruits", "good prices", "good quality" ], "cons": [ "some items were out of stock" ] } ``` This structured format is much easier to work with programmatically than free-form text responses. ## Filtering Filters determine which values should be transformed: ```typescript // Default filters that skip numbers, booleans, and empty strings const defaultFilters = [isNumber, isBoolean, isValidString] // Custom filter example const skipFirstItem: FilterCallback = async (input, path) => { return !path.includes('[0]') } ``` ## Throttling and Concurrency Control API rate limits and parallel processing: ```typescript { throttleDelay: 1000, // Milliseconds between requests concurrentTasks: 2 // Number of parallel transformations } ``` ## Retry Mechanism The Iterator includes a built-in retry mechanism for handling transient errors during transformations, particularly useful for API calls to external services like LLMs. ### Configuration Retry settings can be configured at both the global and individual field mapping levels: ```typescript // Global retry configuration const iterator = createIterator( data, globalOptionsMixin, { maxRetries: 3, // Maximum number of retry attempts retryDelay: 2000, // Base delay in milliseconds between retries // ... other options } ) // Field-specific retry configuration const mappings = [ { jsonPath: '$.products.*.description', targetPath: null, options: { /* ... */ }, maxRetries: 5, // Override global setting for this field retryDelay: 1000 // Override global setting for this field } ] ``` ### Behavior When a transformation fails: 1. The Iterator will wait for `retryDelay` milliseconds 2. The delay increases exponentially with each retry attempt (backoff strategy) 3. After `maxRetries` failed attempts, the error is passed to the `errorCallback` This helps handle temporary issues like API rate limits, network connectivity problems, or service outages without failing the entire operation. ## Caching Mechanism The Iterator includes a powerful caching system to improve performance, reduce API costs, and speed up repeated operations. ### Cache Configuration Configure caching behavior when creating an iterator: ```typescript import { createIterator, DefaultCache, NoopCache } from '@polymech/kbot' // Configure caching const iterator = createIterator( data, globalOptionsMixin, { cache: { enabled: true, // Enable or disable caching implementation: new DefaultCache(), // Use default cache or provide custom namespace: 'my-custom-namespace' // Custom namespace for cache entries }, // ... other options } ) ``` ### Cache Implementations The Iterator provides multiple cache implementations: 1. **DefaultCache**: Uses the registered cache module to store and retrieve values 2. **NoopCache**: A no-operation cache that doesn't actually cache anything 3. **Custom Implementation**: Implement the `CacheInterface` to connect any caching system ```typescript // Custom cache implementation example class RedisCache implements CacheInterface { constructor(private redisClient: any) {} async get(key: any): Promise { const result = await this.redisClient.get(JSON.stringify(key)); return result ? JSON.parse(result) : null; } async set(key: any, value: any): Promise { await this.redisClient.set(JSON.stringify(key), JSON.stringify(value)); } async delete(key: any): Promise { await this.redisClient.del(JSON.stringify(key)); } } // Use the custom cache const iterator = createIterator(data, options, { cache: { enabled: true, implementation: new RedisCache(redisClient), namespace: 'product-transformations' } }) ``` ### Cache Behavior 1. **Caching Enabled**: Before transformation, the Iterator checks if a result already exists in the cache. If found, it applies the cached result directly without calling the transformer. If not found, it performs the transformation and stores the result. 2. **Caching Disabled**: When caching is explicitly disabled, the system will remove any existing cache entries for the current transformations. This ensures that stale data isn't accidentally used later if caching is re-enabled. ### Registering a Cache Module To use the DefaultCache implementation, you must register a cache module: ```typescript import { registerCacheModule } from '@polymech/kbot' // Your cache module must implement these methods const myCacheModule = { get_cached_object: async (key, namespace) => { /* ... */ }, set_cached_object: async (key, namespace, value, options) => { /* ... */ }, rm_cached_object: async (key, namespace) => { /* ... */ } } // Register the cache module registerCacheModule(myCacheModule) ``` ### Cache Keys Cache keys are automatically generated based on: - JSONPath expression - Target path (if any) - Transformation options - Retry settings This ensures that different transformations produce different cache entries, while identical transformations reuse cached results. ## Combined Example with Caching and Retry Here's a complete example showcasing both caching and retry mechanisms: ```typescript import { createIterator, FieldMapping, DefaultCache, registerCacheModule } from '@polymech/kbot' // Setup a simple mock cache module const mockCacheModule = { storage: new Map(), get_cached_object: async (key, namespace) => { const cacheKey = `${namespace}:${JSON.stringify(key)}`; console.log(`Looking up cache: ${cacheKey}`); return mockCacheModule.storage.get(cacheKey); }, set_cached_object: async (key, namespace, value) => { const cacheKey = `${namespace}:${JSON.stringify(key)}`; console.log(`Storing in cache: ${cacheKey}`); mockCacheModule.storage.set(cacheKey, value); }, rm_cached_object: async (key, namespace) => { const cacheKey = `${namespace}:${JSON.stringify(key)}`; console.log(`Removing from cache: ${cacheKey}`); const exists = mockCacheModule.storage.has(cacheKey); mockCacheModule.storage.delete(cacheKey); return exists; } }; // Register the cache module registerCacheModule(mockCacheModule); async function transformProductsWithCaching() { // Product data const data = { products: { fruits: [ { id: 'f1', name: 'apple', description: 'A sweet fruit', }, { id: 'f2', name: 'banana', description: 'A yellow fruit', } ] } }; let requestCount = 0; // Create a transformer factory that simulates occasional failures const createLLMTransformer = (options): AsyncTransformer => { return async (input, path) => { requestCount++; // Simulate occasional failures to demonstrate retry if (requestCount % 3 === 0) { throw new Error('API rate limit exceeded'); } console.log(`Transforming ${path}: ${input}`); return `Enhanced: ${input}`; } }; // Create iterator with caching and retry const iterator = createIterator( data, { model: 'openai/gpt-4' }, { throttleDelay: 1000, concurrentTasks: 1, transformerFactory: createLLMTransformer, maxRetries: 3, retryDelay: 1000, cache: { enabled: true, implementation: new DefaultCache(), namespace: 'product-transformations' }, errorCallback: (path, value, error) => { console.error(`Failed to transform ${path}: ${error.message}`); } } ); // Define transformations const mappings: FieldMapping[] = [ { jsonPath: '$.products.fruits.*.description', targetPath: null, options: { prompt: 'Make this description more detailed' }, maxRetries: 5 // Override global retry setting }, { jsonPath: '$.products.fruits.*.name', targetPath: 'marketingName', options: { prompt: 'Generate a marketing name for this product' } } ]; // First run - will perform transformations and cache results console.log("First run - transforming and caching:"); await iterator.transform(mappings); // Second run - will use cached results console.log("\nSecond run - using cached results:"); await iterator.transform(mappings); // Output the transformed data console.log("\nTransformed data:"); console.log(JSON.stringify(data, null, 2)); } ``` ## Best Practices 1. **Be specific with JSONPaths**: Use precise JSONPath expressions to target only the fields you want to transform. 2. **Handle errors gracefully**: Provide an error callback to handle failed transformations without breaking the entire process. 3. **Respect rate limits**: Set appropriate throttle delays when working with external APIs. 4. **Test with small datasets first**: Validate your transformations on a smaller subset before processing large datasets. 5. **Prefer targeted transformations**: Transform only what you need to minimize costs and processing time. 6. **Use caching for expensive operations**: Enable caching for transformations that are computationally expensive or costly, such as LLM API calls. 7. **Configure appropriate retry settings**: Set retry limits and delays based on the expected reliability of your transformers. More retries with longer delays for less reliable services. 8. **Use targetPath for non-destructive transformations**: When generating new content related to existing fields, use targetPath to preserve the original data. 9. **Use format for structured outputs**: When you need consistent, structured data from LLMs, use the format option with clear JSON schemas. 10. **Include schema details in prompts**: For complex schemas, include the schema structure in your prompt to guide the LLM. 11. **Handle string parsing**: Always add error handling when parsing structured responses, as they may be returned as string JSON. 12. **Implement custom cache for production**: For production scenarios, implement a persistent cache solution rather than relying on in-memory caching. 13. **Use appropriate namespaces**: When multiple parts of your application use the same cache implementation, use distinct namespaces to prevent collisions. ## API Reference ### Main Functions - `createIterator(data, optionsMixin, globalOptions)`: Creates an iterator instance - `transformObjectWithOptions(obj, transform, options)`: Low-level function to transform objects - `transformObject(obj, transform, path, ...)`: Transforms matching paths in an object ### Helper Functions - `testFilters(filters)`: Creates a filter callback from filter functions - `defaultFilters()`: Returns commonly used filters - `defaultError`: Default error handler that logs to console ### Types and Interfaces - `AsyncTransformer`: Function that transforms strings asynchronously - `FilterCallback`: Function that determines if a value should be transformed - `ErrorCallback`: Function that handles transformation errors - `FieldMapping`: Configuration for a transformation - `jsonPath`: JSONPath expression to select values - `targetPath`: Optional field to store transformed value (null for in-place) - `options`: Configuration for the transformation including: - `prompt`: The prompt for the LLM - `format`: Optional JSON schema for structured output - `TransformOptions`: Options for the transformation process - `CacheConfig`: Configuration for the caching mechanism - `INetworkOptions`: Configuration for throttling and concurrency ## Limitations 1. The Iterator works with string values; objects and arrays are traversed but not directly transformed. 2. Large datasets might require pagination or chunking for efficient processing. 3. External API rate limits might require careful throttling configuration. ## Troubleshooting Common issues and solutions: - **No transformations occurring**: Check your JSONPath expressions and filter conditions - **Unexpected field structure**: Examine the exact structure of your data - **Rate limiting errors**: Increase the throttleDelay between requests - **Transformation errors**: Implement a custom error callback for detailed logging ## Running the Examples To run the included examples: ```bash # Run the basic async iterator example npm run examples:async-iterator # Run the iterator factory example npm run examples:iterator-factory # Run with debug logging npm run examples:async-iterator -- --debug # Run with caching disabled (forces fresh responses) npm run examples:iterator-factory -- --no-cache ``` The examples will transform sample JSON data and save the results to the `tests/test-data/core/` directory. ## See Also - [JSON Path Syntax](https://goessner.net/articles/JsonPath/) - Reference for JSONPath expressions - [p-throttle](https://github.com/sindresorhus/p-throttle) - The throttling library used internally - [p-map](https://github.com/sindresorhus/p-map) - For concurrent asynchronous mapping