506 lines
17 KiB
Markdown
506 lines
17 KiB
Markdown
# Iterator Documentation
|
|
|
|
The Iterator module provides a powerful way to transform data structures using asynchronous operations, particularly suited for applying LLM-based transformations to JSON data. This document covers the core functionality, usage patterns, and examples.
|
|
|
|
## Overview
|
|
|
|
The Iterator module allows you to:
|
|
|
|
1. Define mappings between JSON paths and transformations
|
|
2. Apply transformations in place or to new fields
|
|
3. Customize filtering, error handling, and concurrency
|
|
4. Chain multiple transformations together
|
|
|
|
## Example Implementations
|
|
|
|
For complete working examples, see:
|
|
|
|
- [`async-iterator-example.ts`](../src/examples/core/async-iterator-example.ts): Shows basic data transformation with LLM and targetPath usage
|
|
- [`iterator-factory-example.ts`](../src/examples/core/iterator-factory-example.ts): Demonstrates factory pattern with multiple field transformations
|
|
|
|
These examples demonstrate transforming a sample product dataset with various JSONPath expressions and LLM-powered transformations.
|
|
|
|
## Core Components
|
|
|
|
### AsyncTransformer
|
|
|
|
The fundamental unit that applies transformations:
|
|
|
|
```typescript
|
|
type AsyncTransformer = (input: string, path: string) => Promise<string>
|
|
```
|
|
|
|
Every transformer takes a string input and its path in the JSON structure, then returns a transformed string.
|
|
|
|
### Field Mappings
|
|
|
|
Field mappings define which parts of the data to transform and how:
|
|
|
|
```typescript
|
|
interface FieldMapping {
|
|
jsonPath: string // JSONPath expression to find values
|
|
targetPath?: string // Optional target field for transformed values
|
|
options?: IKBotTask // Options for the transformation
|
|
}
|
|
```
|
|
|
|
## Basic Usage
|
|
|
|
### Creating an Iterator
|
|
|
|
```typescript
|
|
import { createIterator } from '@polymech/kbot'
|
|
|
|
// Create an iterator instance
|
|
const iterator = createIterator(
|
|
data, // The data to transform
|
|
globalOptionsMixin, // Global options for all transformations
|
|
{
|
|
throttleDelay: 1000,
|
|
concurrentTasks: 1,
|
|
errorCallback: (path, value, error) => console.error(`Error at ${path}: ${error.message}`),
|
|
filterCallback: async () => true,
|
|
transformerFactory: createCustomTransformer
|
|
}
|
|
)
|
|
|
|
// Define field mappings
|
|
const mappings = [
|
|
{
|
|
jsonPath: '$.products.*.name',
|
|
targetPath: null, // Transform in place
|
|
options: {
|
|
prompt: 'Make this product name more appealing'
|
|
}
|
|
}
|
|
]
|
|
|
|
// Apply transformations
|
|
await iterator.transform(mappings)
|
|
```
|
|
|
|
### JSONPath Patterns
|
|
|
|
The Iterator uses JSONPath to identify fields for transformation:
|
|
|
|
- `$..name` - All name fields at any level
|
|
- `$.products..name` - All name fields under the products key
|
|
- `$.products.*.*.name` - Names of items in product categories
|
|
- `$[*].description` - All description fields at the first level
|
|
|
|
## Advanced Usage
|
|
|
|
### Custom Transformers
|
|
|
|
Create custom transformers for specific transformation logic:
|
|
|
|
```typescript
|
|
const createCustomTransformer = (options: IKBotTask): AsyncTransformer => {
|
|
return async (input: string, jsonPath: string): Promise<string> => {
|
|
// Transform the input string based on options
|
|
return transformedValue
|
|
}
|
|
}
|
|
```
|
|
|
|
### In-Place vs. Target Field Transformations
|
|
|
|
#### In-Place Transformation
|
|
|
|
To transform values in place, set `targetPath` to `null`:
|
|
|
|
```typescript
|
|
{
|
|
jsonPath: '$.products.*.*.description',
|
|
targetPath: null,
|
|
options: {
|
|
prompt: 'Make this description more engaging'
|
|
}
|
|
}
|
|
```
|
|
|
|
This will replace the original description with the transformed value.
|
|
|
|
#### Adding New Fields
|
|
|
|
To keep the original value and add a transformed version, specify a `targetPath`:
|
|
|
|
```typescript
|
|
{
|
|
jsonPath: '$.products.*.*.name',
|
|
targetPath: 'marketingName',
|
|
options: {
|
|
prompt: 'Generate a marketing name based on this product'
|
|
}
|
|
}
|
|
```
|
|
|
|
This keeps the original `name` and adds a new `marketingName` field.
|
|
|
|
## Filtering
|
|
|
|
Filters determine which values should be transformed:
|
|
|
|
```typescript
|
|
// Default filters that skip numbers, booleans, and empty strings
|
|
const defaultFilters = [isNumber, isBoolean, isValidString]
|
|
|
|
// Custom filter example
|
|
const skipFirstItem: FilterCallback = async (input, path) => {
|
|
return !path.includes('[0]')
|
|
}
|
|
```
|
|
|
|
## Throttling and Concurrency
|
|
|
|
Control API rate limits and parallel processing:
|
|
|
|
```typescript
|
|
{
|
|
throttleDelay: 1000, // Milliseconds between requests
|
|
concurrentTasks: 2 // Number of parallel transformations
|
|
}
|
|
```
|
|
|
|
## Retry Mechanism
|
|
|
|
The Iterator includes a built-in retry mechanism for handling transient errors during transformations, particularly useful for API calls to external services like LLMs.
|
|
|
|
### Configuration
|
|
|
|
Retry settings can be configured at both the global and individual field mapping levels:
|
|
|
|
```typescript
|
|
// Global retry configuration
|
|
const iterator = createIterator(
|
|
data,
|
|
globalOptionsMixin,
|
|
{
|
|
maxRetries: 3, // Maximum number of retry attempts
|
|
retryDelay: 2000, // Base delay in milliseconds between retries
|
|
// ... other options
|
|
}
|
|
)
|
|
|
|
// Field-specific retry configuration
|
|
const mappings = [
|
|
{
|
|
jsonPath: '$.products.*.description',
|
|
targetPath: null,
|
|
options: { /* ... */ },
|
|
maxRetries: 5, // Override global setting for this field
|
|
retryDelay: 1000 // Override global setting for this field
|
|
}
|
|
]
|
|
```
|
|
|
|
### Behavior
|
|
|
|
When a transformation fails:
|
|
|
|
1. The Iterator will wait for `retryDelay` milliseconds
|
|
2. The delay increases exponentially with each retry attempt (backoff strategy)
|
|
3. After `maxRetries` failed attempts, the error is passed to the `errorCallback`
|
|
|
|
This helps handle temporary issues like API rate limits, network connectivity problems, or service outages without failing the entire operation.
|
|
|
|
## Caching Mechanism
|
|
|
|
The Iterator includes a powerful caching system to improve performance, reduce API costs, and speed up repeated operations.
|
|
|
|
### Cache Configuration
|
|
|
|
Configure caching behavior when creating an iterator:
|
|
|
|
```typescript
|
|
import { createIterator, DefaultCache, NoopCache } from '@polymech/kbot'
|
|
|
|
// Configure caching
|
|
const iterator = createIterator(
|
|
data,
|
|
globalOptionsMixin,
|
|
{
|
|
cache: {
|
|
enabled: true, // Enable or disable caching
|
|
implementation: new DefaultCache(), // Use default cache or provide custom
|
|
namespace: 'my-custom-namespace' // Custom namespace for cache entries
|
|
},
|
|
// ... other options
|
|
}
|
|
)
|
|
```
|
|
|
|
### Cache Implementations
|
|
|
|
The Iterator provides multiple cache implementations:
|
|
|
|
1. **DefaultCache**: Uses the registered cache module to store and retrieve values
|
|
2. **NoopCache**: A no-operation cache that doesn't actually cache anything
|
|
3. **Custom Implementation**: Implement the `CacheInterface` to connect any caching system
|
|
|
|
```typescript
|
|
// Custom cache implementation example
|
|
class RedisCache implements CacheInterface {
|
|
constructor(private redisClient: any) {}
|
|
|
|
async get(key: any): Promise<any> {
|
|
const result = await this.redisClient.get(JSON.stringify(key));
|
|
return result ? JSON.parse(result) : null;
|
|
}
|
|
|
|
async set(key: any, value: any): Promise<void> {
|
|
await this.redisClient.set(JSON.stringify(key), JSON.stringify(value));
|
|
}
|
|
|
|
async delete(key: any): Promise<void> {
|
|
await this.redisClient.del(JSON.stringify(key));
|
|
}
|
|
}
|
|
|
|
// Use the custom cache
|
|
const iterator = createIterator(data, options, {
|
|
cache: {
|
|
enabled: true,
|
|
implementation: new RedisCache(redisClient),
|
|
namespace: 'product-transformations'
|
|
}
|
|
})
|
|
```
|
|
|
|
### Cache Behavior
|
|
|
|
1. **Caching Enabled**: Before transformation, the Iterator checks if a result already exists in the cache. If found, it applies the cached result directly without calling the transformer. If not found, it performs the transformation and stores the result.
|
|
|
|
2. **Caching Disabled**: When caching is explicitly disabled, the system will remove any existing cache entries for the current transformations. This ensures that stale data isn't accidentally used later if caching is re-enabled.
|
|
|
|
### Registering a Cache Module
|
|
|
|
To use the DefaultCache implementation, you must register a cache module:
|
|
|
|
```typescript
|
|
import { registerCacheModule } from '@polymech/kbot'
|
|
|
|
// Your cache module must implement these methods
|
|
const myCacheModule = {
|
|
get_cached_object: async (key, namespace) => { /* ... */ },
|
|
set_cached_object: async (key, namespace, value, options) => { /* ... */ },
|
|
rm_cached_object: async (key, namespace) => { /* ... */ }
|
|
}
|
|
|
|
// Register the cache module
|
|
registerCacheModule(myCacheModule)
|
|
```
|
|
|
|
### Cache Keys
|
|
|
|
Cache keys are automatically generated based on:
|
|
- JSONPath expression
|
|
- Target path (if any)
|
|
- Transformation options
|
|
- Retry settings
|
|
|
|
This ensures that different transformations produce different cache entries, while identical transformations reuse cached results.
|
|
|
|
## Combined Example with Caching and Retry
|
|
|
|
Here's a complete example showcasing both caching and retry mechanisms:
|
|
|
|
```typescript
|
|
import { createIterator, FieldMapping, DefaultCache, registerCacheModule } from '@polymech/kbot'
|
|
|
|
// Setup a simple mock cache module
|
|
const mockCacheModule = {
|
|
storage: new Map(),
|
|
get_cached_object: async (key, namespace) => {
|
|
const cacheKey = `${namespace}:${JSON.stringify(key)}`;
|
|
console.log(`Looking up cache: ${cacheKey}`);
|
|
return mockCacheModule.storage.get(cacheKey);
|
|
},
|
|
set_cached_object: async (key, namespace, value) => {
|
|
const cacheKey = `${namespace}:${JSON.stringify(key)}`;
|
|
console.log(`Storing in cache: ${cacheKey}`);
|
|
mockCacheModule.storage.set(cacheKey, value);
|
|
},
|
|
rm_cached_object: async (key, namespace) => {
|
|
const cacheKey = `${namespace}:${JSON.stringify(key)}`;
|
|
console.log(`Removing from cache: ${cacheKey}`);
|
|
const exists = mockCacheModule.storage.has(cacheKey);
|
|
mockCacheModule.storage.delete(cacheKey);
|
|
return exists;
|
|
}
|
|
};
|
|
|
|
// Register the cache module
|
|
registerCacheModule(mockCacheModule);
|
|
|
|
async function transformProductsWithCaching() {
|
|
// Product data
|
|
const data = {
|
|
products: {
|
|
fruits: [
|
|
{
|
|
id: 'f1',
|
|
name: 'apple',
|
|
description: 'A sweet fruit',
|
|
},
|
|
{
|
|
id: 'f2',
|
|
name: 'banana',
|
|
description: 'A yellow fruit',
|
|
}
|
|
]
|
|
}
|
|
};
|
|
|
|
let requestCount = 0;
|
|
|
|
// Create a transformer factory that simulates occasional failures
|
|
const createLLMTransformer = (options): AsyncTransformer => {
|
|
return async (input, path) => {
|
|
requestCount++;
|
|
|
|
// Simulate occasional failures to demonstrate retry
|
|
if (requestCount % 3 === 0) {
|
|
throw new Error('API rate limit exceeded');
|
|
}
|
|
|
|
console.log(`Transforming ${path}: ${input}`);
|
|
return `Enhanced: ${input}`;
|
|
}
|
|
};
|
|
|
|
// Create iterator with caching and retry
|
|
const iterator = createIterator(
|
|
data,
|
|
{ model: 'openai/gpt-4' },
|
|
{
|
|
throttleDelay: 1000,
|
|
concurrentTasks: 1,
|
|
transformerFactory: createLLMTransformer,
|
|
maxRetries: 3,
|
|
retryDelay: 1000,
|
|
cache: {
|
|
enabled: true,
|
|
implementation: new DefaultCache(),
|
|
namespace: 'product-transformations'
|
|
},
|
|
errorCallback: (path, value, error) => {
|
|
console.error(`Failed to transform ${path}: ${error.message}`);
|
|
}
|
|
}
|
|
);
|
|
|
|
// Define transformations
|
|
const mappings: FieldMapping[] = [
|
|
{
|
|
jsonPath: '$.products.fruits.*.description',
|
|
targetPath: null,
|
|
options: {
|
|
prompt: 'Make this description more detailed'
|
|
},
|
|
maxRetries: 5 // Override global retry setting
|
|
},
|
|
{
|
|
jsonPath: '$.products.fruits.*.name',
|
|
targetPath: 'marketingName',
|
|
options: {
|
|
prompt: 'Generate a marketing name for this product'
|
|
}
|
|
}
|
|
];
|
|
|
|
// First run - will perform transformations and cache results
|
|
console.log("First run - transforming and caching:");
|
|
await iterator.transform(mappings);
|
|
|
|
// Second run - will use cached results
|
|
console.log("\nSecond run - using cached results:");
|
|
await iterator.transform(mappings);
|
|
|
|
// Output the transformed data
|
|
console.log("\nTransformed data:");
|
|
console.log(JSON.stringify(data, null, 2));
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Be specific with JSONPaths**: Use precise JSONPath expressions to target only the fields you want to transform.
|
|
|
|
2. **Handle errors gracefully**: Provide an error callback to handle failed transformations without breaking the entire process.
|
|
|
|
3. **Respect rate limits**: Set appropriate throttle delays when working with external APIs.
|
|
|
|
4. **Test with small datasets first**: Validate your transformations on a smaller subset before processing large datasets.
|
|
|
|
5. **Prefer targeted transformations**: Transform only what you need to minimize costs and processing time.
|
|
|
|
6. **Use caching for expensive operations**: Enable caching for transformations that are computationally expensive or costly, such as LLM API calls.
|
|
|
|
7. **Configure appropriate retry settings**: Set retry limits and delays based on the expected reliability of your transformers. More retries with longer delays for less reliable services.
|
|
|
|
8. **Use targetPath for non-destructive transformations**: When generating new content related to existing fields, use targetPath to preserve the original data.
|
|
|
|
9. **Implement custom cache for production**: For production scenarios, implement a persistent cache solution rather than relying on in-memory caching.
|
|
|
|
10. **Use appropriate namespaces**: When multiple parts of your application use the same cache implementation, use distinct namespaces to prevent collisions.
|
|
|
|
## API Reference
|
|
|
|
### Main Functions
|
|
|
|
- `createIterator(data, optionsMixin, globalOptions)`: Creates an iterator instance
|
|
- `transformObjectWithOptions(obj, transform, options)`: Low-level function to transform objects
|
|
- `transformObject(obj, transform, path, ...)`: Transforms matching paths in an object
|
|
|
|
### Helper Functions
|
|
|
|
- `testFilters(filters)`: Creates a filter callback from filter functions
|
|
- `defaultFilters()`: Returns commonly used filters
|
|
- `defaultError`: Default error handler that logs to console
|
|
|
|
### Types and Interfaces
|
|
|
|
- `AsyncTransformer`: Function that transforms strings asynchronously
|
|
- `FilterCallback`: Function that determines if a value should be transformed
|
|
- `ErrorCallback`: Function that handles transformation errors
|
|
- `FieldMapping`: Configuration for a transformation
|
|
- `TransformOptions`: Options for the transformation process
|
|
|
|
## Limitations
|
|
|
|
1. The Iterator works with string values; objects and arrays are traversed but not directly transformed.
|
|
2. Large datasets might require pagination or chunking for efficient processing.
|
|
3. External API rate limits might require careful throttling configuration.
|
|
|
|
## Troubleshooting
|
|
|
|
Common issues and solutions:
|
|
|
|
- **No transformations occurring**: Check your JSONPath expressions and filter conditions
|
|
- **Unexpected field structure**: Examine the exact structure of your data
|
|
- **Rate limiting errors**: Increase the throttleDelay between requests
|
|
- **Transformation errors**: Implement a custom error callback for detailed logging
|
|
|
|
## Running the Examples
|
|
|
|
To run the included examples:
|
|
|
|
```bash
|
|
# Run the basic async iterator example
|
|
npm run examples:async-iterator
|
|
|
|
# Run the iterator factory example
|
|
npm run examples:iterator-factory
|
|
|
|
# Run with debug logging
|
|
npm run examples:async-iterator -- --debug
|
|
```
|
|
|
|
The examples will transform sample JSON data and save the results to the `tests/test-data/core/` directory.
|
|
|
|
## See Also
|
|
|
|
- [JSON Path Syntax](https://goessner.net/articles/JsonPath/) - Reference for JSONPath expressions
|
|
- [p-throttle](https://github.com/sindresorhus/p-throttle) - The throttling library used internally
|
|
- [p-map](https://github.com/sindresorhus/p-map) - For concurrent asynchronous mapping |