pm server - shared - server products 1/3

This commit is contained in:
babayaga 2026-01-29 18:02:26 +01:00
parent dff0a8a2d6
commit f6592bb292
25 changed files with 3471 additions and 0 deletions

View File

@ -0,0 +1,275 @@
import EventEmitter from 'events';
import { PgBoss } from 'pg-boss';
import { createHash } from 'crypto';
import { streamSSE } from 'hono/streaming';
import { EventBus } from './EventBus.js';
import { ProductErrorCode } from './enums.js';
import { ProductError } from './errors.js';
import { logger } from '../commons/logger.js';
export interface JobCreationEvent {
queue: string;
data: any;
options: any;
}
export interface StreamOptions<TData = any> {
data: TData;
userId: string;
forceRefresh?: boolean;
fetcher: (data: TData, userId: string) => Promise<any[]>;
cacheChecker?: (hash: string) => Promise<any[] | null>;
}
export abstract class AbstractProduct<TJobData = any> extends EventEmitter {
abstract readonly id: string;
abstract readonly jobOptions: any;
abstract readonly actions: Record<string, any>;
abstract readonly workers: any[];
abstract readonly routes: any[];
protected boss: PgBoss | null = null;
protected workerSubscriptions: string[] = [];
async start(boss: PgBoss) {
try {
this.boss = boss;
await this.registerWorkers(boss);
await this.onStart(boss);
} catch (error: any) {
throw new ProductError(ProductErrorCode.START_FAILED, {
message: `Failed to start product ${this.id}: ${error.message}`,
originalError: error
});
}
}
protected async onStart(boss: PgBoss) {
// Optional hook for subclasses
}
async stop() {
try {
await this.unregisterWorkers();
await this.onStop();
} catch (error: any) {
throw new ProductError(ProductErrorCode.STOP_FAILED, {
message: `Failed to stop product ${this.id}: ${error.message}`,
originalError: error
});
}
}
protected async onStop() {
// Optional hook
}
async pause() {
try {
await this.unregisterWorkers();
} catch (error: any) {
throw new ProductError(ProductErrorCode.PAUSE_FAILED, {
message: `Failed to pause product ${this.id}: ${error.message}`,
originalError: error
});
}
}
async resume() {
if (!this.boss) {
throw new ProductError(ProductErrorCode.RESUME_FAILED, 'PgBoss not initialized');
}
try {
await this.registerWorkers(this.boss);
} catch (error: any) {
throw new ProductError(ProductErrorCode.RESUME_FAILED, {
message: `Failed to resume product ${this.id}: ${error.message}`,
originalError: error
});
}
}
protected async registerWorkers(boss: PgBoss) {
if (!this.workers) return;
for (const WorkerClass of this.workers) {
try {
// @ts-ignore
const workerInstance = new WorkerClass();
// Inject the EventBus so the worker can emit job events
(workerInstance as any).emitter = EventBus;
// Inject boss instance for advanced operations like cancellation check
(workerInstance as any).boss = boss;
logger.info(`[${this.id}] Registering worker for queue: ${workerInstance.queueName}`);
await boss.createQueue(workerInstance.queueName, workerInstance.queueOptions);
const workOptions = (workerInstance as any).teamSize ? { teamSize: (workerInstance as any).teamSize } : {};
const subscriptionId = await boss.work(workerInstance.queueName, workOptions as any, (job: any) => workerInstance.handler(job));
this.workerSubscriptions.push(subscriptionId);
} catch (error: any) {
throw new ProductError(ProductErrorCode.WORKER_REGISTRATION_FAILED, {
message: `Failed to register worker for ${this.id}: ${error.message}`,
worker: WorkerClass.name
});
}
}
}
protected async unregisterWorkers() {
if (!this.boss) return;
for (const subId of this.workerSubscriptions) {
try {
// @ts-ignore - Assuming offWork exists in PgBoss type or at runtime
await this.boss.offWork(subId);
} catch (error: any) {
logger.warn(`[${this.id}] Failed to unregister worker subscription ${subId}: ${error.message}`);
}
}
this.workerSubscriptions = [];
}
async sendJob(queue: string, data: TJobData, options: any = {}) {
if (!this.boss) {
throw new ProductError(ProductErrorCode.JOB_SUBMISSION_FAILED, 'PgBoss not initialized');
}
const event: JobCreationEvent = { queue, data, options };
// Emit event to allow subscribers to modify data/options
EventBus.emit('job:create', event);
try {
return await this.boss.send(queue, event.data, event.options);
} catch (error: any) {
throw new ProductError(ProductErrorCode.JOB_SUBMISSION_FAILED, {
message: `Failed to send job to ${queue}: ${error.message}`,
queue
});
}
}
async waitForJob(jobId: string, timeoutMs: number = 60000): Promise<any> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
reject(new ProductError(ProductErrorCode.JOB_TIMEOUT, { message: 'Job timeout', jobId }));
}, timeoutMs);
const onComplete = (event: any) => {
if (event.jobId === jobId) {
cleanup();
resolve(event.result);
}
};
const cleanup = () => {
clearTimeout(timer);
EventBus.off('job:complete', onComplete);
};
EventBus.on('job:complete', onComplete);
});
}
async waitForHash(targetHash: string, timeoutMs: number = 60000): Promise<any> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
reject(new ProductError(ProductErrorCode.JOB_TIMEOUT, { message: 'Job timeout (hash wait)', hash: targetHash }));
}, timeoutMs);
const onComplete = (event: any) => {
if (!event.data) return;
try {
const eventHash = this.hash(event.data);
if (eventHash === targetHash) {
cleanup();
resolve(event.result);
}
} catch (e) {
// Ignore hashing errors (mismatched data types from other queues)
}
};
const cleanup = () => {
clearTimeout(timer);
EventBus.off('job:complete', onComplete);
};
EventBus.on('job:complete', onComplete);
});
}
protected async handleStream(c: any, options: StreamOptions) {
const { data, userId, forceRefresh, fetcher, cacheChecker } = options;
const inputHash = this.generateHash(data);
return streamSSE(c, async (stream) => {
try {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({ stage: 'starting', percent: 0 })
});
if (!forceRefresh && cacheChecker) {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({ stage: 'checking_cache', percent: 10 })
});
const cached = await cacheChecker(inputHash);
if (cached) {
for (let i = 0; i < cached.length; i++) {
await stream.writeSSE({
event: 'result',
data: JSON.stringify(cached[i])
});
}
await stream.writeSSE({
event: 'complete',
data: JSON.stringify({ total: cached.length, cached: true })
});
return;
}
}
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({ stage: 'fetching_from_api', percent: 20 })
});
const results = await fetcher(data, userId);
for (let i = 0; i < results.length; i++) {
await stream.writeSSE({
event: 'result',
data: JSON.stringify(results[i])
});
}
await stream.writeSSE({
event: 'complete',
data: JSON.stringify({ total: results.length, cached: false })
});
} catch (error: any) {
logger.error(error, `[${this.id}] Stream error`);
await stream.writeSSE({
event: 'error',
data: JSON.stringify({ error: error.message || 'Internal Server Error' })
});
}
});
}
// Helper for hashing
protected generateHash(params: any) {
const normalizedInput = JSON.stringify(params, Object.keys(params).sort());
return createHash('sha256').update(normalizedInput).digest('hex');
}
abstract hash(data: TJobData): string;
abstract meta(userId: string): any;
}

View File

@ -0,0 +1,3 @@
import EventEmitter from 'events';
export const EventBus = new EventEmitter();

View File

@ -0,0 +1,22 @@
export enum ProductErrorCode {
// Lifecycle Errors
START_FAILED = 'PRODUCT_START_FAILED',
STOP_FAILED = 'PRODUCT_STOP_FAILED',
PAUSE_FAILED = 'PRODUCT_PAUSE_FAILED',
RESUME_FAILED = 'PRODUCT_RESUME_FAILED',
// Worker Errors
WORKER_REGISTRATION_FAILED = 'WORKER_REGISTRATION_FAILED',
WORKER_NOT_FOUND = 'WORKER_NOT_FOUND',
// Job Errors
JOB_SUBMISSION_FAILED = 'JOB_SUBMISSION_FAILED',
JOB_TIMEOUT = 'JOB_TIMEOUT',
// Configuration Errors
INVALID_CONFIG = 'INVALID_CONFIG',
MISSING_DEPENDENCY = 'MISSING_DEPENDENCY',
// Generic
UNKNOWN_ERROR = 'UNKNOWN_ERROR'
}

View File

@ -0,0 +1,29 @@
import { ProductErrorCode } from './enums.js';
export interface ProductErrorPayload {
message: string;
[key: string]: any;
}
export class ProductError extends Error {
public readonly code: ProductErrorCode;
public readonly payload: ProductErrorPayload;
constructor(code: ProductErrorCode, payload: ProductErrorPayload | string) {
const message = typeof payload === 'string' ? payload : payload.message;
super(message);
this.code = code;
this.payload = typeof payload === 'string' ? { message: payload } : payload;
// Restore prototype chain
Object.setPrototypeOf(this, new.target.prototype);
}
toJSON() {
return {
code: this.code,
message: this.message,
payload: this.payload
};
}
}

View File

@ -0,0 +1,94 @@
import { describe, it, expect, beforeAll } from 'vitest';
import * as dotenv from 'dotenv';
import path from 'path';
// Load env from server root
dotenv.config({ path: path.resolve(__dirname, '../../../../.env') });
describe('PyGADM Integration', () => {
let LocationsProduct: any;
beforeAll(async () => {
const productModule = await import('../index.js');
LocationsProduct = new productModule.LocationsProduct();
});
it('should search for regions using python wrapper', async () => {
const c = {
req: {
valid: () => ({ query: 'France' })
},
json: (data: any, status: number) => ({ data, status })
};
const result = await LocationsProduct.handleGetRegionSearch(c);
if (result.status !== 200) {
console.error('PyGADM Search Error:', JSON.stringify(result, null, 2));
}
expect(result.status).toBe(200);
expect(result.data).toBeDefined();
// console.log('PyGADM Search Result:', JSON.stringify(result.data, null, 2));
});
it('should search for regions with content_level', async () => {
const c = {
req: {
valid: () => ({ query: 'France', content_level: '1' })
},
json: (data: any, status: number) => ({ data, status })
};
const result = await LocationsProduct.handleGetRegionSearch(c);
expect(result.status).toBe(200);
expect(result.data).toBeDefined();
});
it('should search for regions with geojson output', async () => {
const c = {
req: {
valid: () => ({ query: 'France', content_level: '1', geojson: 'true' })
},
json: (data: any, status: number) => ({ data, status })
};
const result = await LocationsProduct.handleGetRegionSearch(c);
// console.log('PyGADM GeoJSON Search Result:', JSON.stringify(result.data, null, 2));
expect(result.status).toBe(200);
expect(result.data).toBeDefined();
expect(result.data.type).toBe('FeatureCollection');
expect(result.data.features).toBeDefined();
expect(Array.isArray(result.data.features)).toBe(true);
});
it('should get boundary using python wrapper', async () => {
const c = {
req: {
valid: () => ({ id: 'FRA.1_1' }) // Region for Auvergne-Rhône-Alpes
},
json: (data: any, status: number) => ({ data, status })
};
const result = await LocationsProduct.handleGetRegionBoundary(c);
if (result.status === 200) {
expect(result.data).toBeDefined();
} else {
console.log('PyGADM Boundary Result Status:', result.status, result.data);
}
});
it('should get region names using python wrapper', async () => {
const c = {
req: {
valid: () => ({ admin: 'FRA', content_level: '2' })
},
json: (data: any, status: number) => ({ data, status })
};
const result = await LocationsProduct.handleGetRegionNames(c);
if (result.status !== 200) {
console.error('PyGADM Names Error:', JSON.stringify(result, null, 2));
}
expect(result.status).toBe(200);
expect(result.data).toBeDefined();
console.log('PyGADM Names Result (Subset):', JSON.stringify(result.data.data.slice(0, 2), null, 2));
});
});

View File

@ -0,0 +1,313 @@
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import * as dotenv from 'dotenv';
import path from 'path';
// Load env from server root
dotenv.config({ path: path.resolve(__dirname, '../../../../.env') });
import { PgBoss } from 'pg-boss';
// import { submitJob } from '../../events.js';
// Ensure registry is loaded to register subscriber
import '../../registry.js';
import { LOCATIONS_JOB_OPTIONS } from '../constants.js';
// Dynamic imports to ensure env vars are loaded before modules that use them
let boss: PgBoss;
let startBoss: any;
let SearchWorker: any;
let supabase: any;
let logger: any;
describe('Locations Product E2E', () => {
let worker: any;
let TEST_USER_ID: string;
const TEST_QUERY = 'plastichub';
const TEST_LOCATION = 'Barcelona, Spain';
beforeAll(async () => {
// Import modules now that env vars are loaded
// Adjust paths relative to this file: src/products/locations/__tests__/e2e.test.ts
const clientModule = await import('../../../jobs/boss/client.js');
boss = clientModule.boss as PgBoss;
startBoss = clientModule.startBoss;
// Import the worker from the product
const workerModule = await import('../pgboss.js');
SearchWorker = workerModule.LocationsWorker;
const supabaseModule = await import('../../../commons/supabase.js');
supabase = supabaseModule.supabase;
const loggerModule = await import('../../../commons/logger.js');
logger = loggerModule.logger;
// Ensure boss is connected
if (!boss) {
throw new Error('PgBoss not initialized (check DATABASE_URL)');
}
await startBoss();
// Initialize product (lifecycle start)
// We need to import LocationsProduct to call start
const productModule = await import('../index.js');
const LocationsProduct = productModule.LocationsProduct;
if ((LocationsProduct as any).start) {
await (LocationsProduct as any).start(boss);
}
// Authenticate as the test user
const { data, error } = await supabase.auth.signInWithPassword({
email: 'sales@plastic-hub.com',
password: 'customer',
});
if (error || !data.session) {
console.error('Failed to sign in test user:', error);
throw new Error('Failed to sign in test user');
}
TEST_USER_ID = data.user.id;
logger.info(`E2E Test: Authenticated as ${TEST_USER_ID}`);
// Clean up existing data
const { error: deleteError } = await supabase
.from('locations')
.delete()
.eq('user_id', TEST_USER_ID)
.eq('title', 'Plastic Hub');
if (deleteError) {
logger.warn('Failed to clean up existing locations:', deleteError);
} else {
logger.info('Cleaned up existing locations for test');
}
// Register the worker in this process
worker = new SearchWorker();
logger.info(`Worker Queue Name: ${worker.queueName}`);
await boss.createQueue(worker.queueName);
await boss.work(worker.queueName, worker.handler.bind(worker));
logger.info('E2E Test: Worker started');
});
afterAll(async () => {
if (boss) {
// Delete jobs from the queue to clean up
// Note: pg-boss doesn't have a simple "delete all from queue" method exposed easily on the instance
// but we can try to stop the boss which stops processing.
// The user asked: "make sure that all jobs are being deleted after the test"
// We can use raw SQL via supabase or just stop the boss.
// Let's try to clear the queue using SQL if possible, or just rely on stop.
// Actually, we can use boss.fetch() to drain the queue or just leave it.
// But to be "clean", maybe we can delete the specific job we created if we had the ID.
await boss.stop();
}
});
it('should process a real search job and update Supabase', async () => {
if (!boss) return;
// 1. Send Job
const jobData = {
query: TEST_QUERY,
location: TEST_LOCATION,
userId: TEST_USER_ID,
filters: {
concurrency: 1
}
};
let jobId: string | null = null;
try {
// Use submitJob helper to trigger event-based metadata injection
// Use submitJob helper to trigger event-based metadata injection
// jobId = await submitJob(worker.queueName, jobData, {}, boss);
jobId = await boss.send(worker.queueName, jobData);
expect(jobId).toBeDefined();
logger.info(`Job submitted: ${jobId}`);
// Wait for job to be processed.
// Since we are running the worker in the same process, it should pick it up.
// We can poll the database for the result.
let attempts = 0;
const maxAttempts = 20; // 20 * 1s = 20s
let found = false;
while (attempts < maxAttempts && !found) {
await new Promise(resolve => setTimeout(resolve, 1000));
const { data: locations } = await supabase
.from('locations')
.select('place_id, title')
.eq('user_id', TEST_USER_ID)
.eq('title', 'Plastic Hub');
if (locations && locations.length > 0) {
found = true;
logger.info('Locations found:', locations.length);
expect(locations).toBeDefined();
expect(locations.length).toBeGreaterThan(0);
expect(locations[0].title).toBe('Plastic Hub');
}
attempts++;
}
if (!found) {
throw new Error('Timeout waiting for job to process and update DB');
}
} catch (err: any) {
logger.error({ err }, 'Error sending/processing job');
throw err;
} finally {
if (jobId) {
// Try to cancel/delete the job to clean up
try {
await boss.deleteJob(worker.queueName, jobId);
logger.info(`Cleaned up job ${jobId}`);
} catch (e) {
logger.warn(`Failed to clean up job ${jobId}: ${e}`);
}
}
}
}, 30000); // Increased timeout
it('should prevent duplicate jobs using singleton check and verify metadata', async () => {
if (!boss) return;
const jobData = {
query: 'duplicate-check',
userId: '39d5210c-23b0-430e-bbfc-00b48f67c899',
filters: {
concurrency: 1
},
location: 'Nowhere'
};
// First submission should succeed
// First submission should succeed
// const jobId1 = await submitJob(worker.queueName, jobData, {}, boss);
const jobId1 = await boss.send(worker.queueName, jobData);
expect(jobId1).toBeDefined();
logger.info(`Singleton Test: First job submitted: ${jobId1}`);
// Verify metadata
const job = await boss.getJobById(worker.queueName, jobId1!);
expect(job).not.toBeNull();
if (job) {
expect(job.data).toBeDefined();
// Check if metadata was injected (cost, timestamp)
// Note: userId is already in jobData, but getMetadata adds cost and timestamp
expect((job.data as any).cost).toBeDefined();
expect((job.data as any).timestamp).toBeDefined();
expect((job.data as any).userId).toBe(jobData.userId);
// Verify job options from constants
// Priority Low = 10
expect(job.priority).toBe(10);
expect(job.retryLimit).toBe(5);
logger.info('Singleton Test: Metadata verification successful');
} else {
logger.warn('Singleton Test: Job not found (might be completed/archived)');
}
// Second submission should fail (return null) because of singletonKey
// Second submission should fail (return null) because of singletonKey
// const jobId2 = await submitJob(worker.queueName, jobData, {}, boss);
const jobId2 = await boss.send(worker.queueName, jobData);
expect(jobId2).toBeNull();
logger.info('Singleton Test: Duplicate job correctly rejected (returned null)');
// Clean up
if (jobId1) {
try {
await boss.deleteJob(worker.queueName, jobId1);
logger.info(`Singleton Test: Cleaned up job ${jobId1}`);
} catch (e) {
logger.warn(`Singleton Test: Failed to clean up job ${jobId1}: ${e}`);
}
}
});
it('should resume cancelled jobs on product start', async () => {
const options = { retryDelay: 1, retryBackoff: true, retryDelayMax: 10 }
const batchSize = 4
/*
await Promise.all([
boss.send(worker.queueName, null, options),
boss.send(worker.queueName, null, options),
boss.send(worker.queueName, null, options),
boss.send(worker.queueName, null, options)
])*/
// const jobs = await boss.fetch(worker.queueName, { batchSize, includeMetadata: true })
// 1. Submit a job with delay so we can cancel it before it runs
const jobData = {
query: TEST_QUERY,
location: TEST_LOCATION,
userId: TEST_USER_ID
};
// Use a unique query to avoid singleton check (or ensure previous jobs are gone)
// We can append timestamp to query
jobData.query = `${TEST_QUERY}-${Date.now()}`;
// const jobId = await submitJob(boss, worker.queueName, jobData, { startAfter: 10 });
const job1 = await boss.send(worker.queueName, LOCATIONS_JOB_OPTIONS, options);
await boss.work(worker.queueName, async ([job]) => {
debugger
console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`)
})
const jobs2 = await boss.fetch(worker.queueName)
const jobId = ''
expect(jobId).toBeDefined();
logger.info(`Resume Test: Job submitted: ${jobId}`);
// 2. Cancel the job
await boss.cancel(worker.queueName, jobId!);
// 3. Verify it is cancelled
let job = await boss.getJobById(worker.queueName, jobId!);
expect(job).not.toBeNull();
expect(job!.state).toBe('cancelled');
logger.info(`Resume Test: Job cancelled`);
// 4. Call LocationsProduct.start to resume
const productModule = await import('../index.js');
const LocationsProduct = productModule.LocationsProduct;
if ((LocationsProduct as any).start) {
await (LocationsProduct as any).start(boss);
}
// 5. Verify it is resumed (state should be created or active)
// Wait a bit for the resume to happen (it's async inside start)
await new Promise(resolve => setTimeout(resolve, 1000));
job = await boss.getJobById(worker.queueName, jobId!);
expect(job).not.toBeNull();
logger.info(`Resume Test: Job state after resume: ${job!.state}`);
expect(job!.state).not.toBe('cancelled');
// It should probably be 'created' (waiting for startAfter?) or 'active' if startAfter was cleared?
// pg-boss resume might reset it to created.
expect(['created', 'active', 'completed']).toContain(job!.state);
// Clean up
if (jobId) {
try {
await boss.deleteJob(worker.queueName, jobId);
} catch (e) {
// ignore
}
}
});
});

View File

@ -0,0 +1,19 @@
import { createHash } from 'crypto';
import { getSearchByHash, getLocationsByPlaceIds } from './db.js';
export const params_hash = (params: any) => {
// Normalize input for hashing
// Sort keys to ensure consistent hash
const normalizedInput = JSON.stringify(params, Object.keys(params).sort());
return createHash('sha256').update(normalizedInput).digest('hex');
};
export const get_cached = async (inputHash: string) => {
const searchData = await getSearchByHash(inputHash);
if (searchData && searchData.result_place_ids) {
const locations = await getLocationsByPlaceIds(searchData.result_place_ids);
return locations;
}
return null;
};

View File

@ -0,0 +1,37 @@
// Locations Constants
export const COST_PER_SEARCH = parseInt(process.env.LOCATIONS_COST || '1');
export const TIMEOUT_MS_LOCATIONS = parseInt(process.env.LOCATIONS_TIMEOUT_MS || '300000');
export const MAX_CONCURRENCY_LOCATIONS = 1; // Kept hardcoded as it wasn't requested to be moved, but could be.
export const LOCATIONS_JOB_OPTIONS = {
expireInSeconds: parseInt(process.env.LOCATIONS_JOB_EXPIRE_SECONDS || '600'),
retryBackoff: true,
retryDelayMax: parseInt(process.env.LOCATIONS_JOB_RETRY_DELAY_MAX || '1000')
};
// Email Constants
export const EMAIL_SEARCH_COST = parseInt(process.env.EMAIL_SEARCH_COST || '5');
export const EMAIL_MAX_BATCH_SIZE = parseInt(process.env.MAX_PLACES_PER_REQUEST || '100');
export const EMAIL_META_TIMEOUT_MS = parseInt(process.env.EMAIL_META_TIMEOUT_SECONDS || '30') * 1000;
export const EMAIL_SEARCH_TIMEOUT_MS = parseInt(process.env.EMAIL_SEARCH_TIMEOUT_SECONDS || '60') * 1000;
export const EMAIL_SEARCH_PAGE_TIMEOUT_MS = parseInt(process.env.EMAIL_SEARCH_PAGE_TIMEOUT_SECONDS || '10') * 1000;
export const EMAIL_SEARCH_MAX_PAGES = parseInt(process.env.EMAIL_SEARCH_MAX_PAGES || '15');
export const EMAIL_SEARCH_PAGE_CONCURRENCY = parseInt(process.env.EMAIL_SEARCH_PAGE_CONCURRENCY || '2');
export const EMAIL_STREAM_TIMEOUT_MS = parseInt(process.env.EMAIL_STREAM_TIMEOUT_SECONDS || '300') * 1000;
export const EMAIL_MAX_CONCURRENT_JOBS = parseInt(process.env.EMAIL_MAX_CONCURRENT_JOBS || '5');
export const JOB_NAME = 'locations-find';
export const EMAIL_JOB_NAME = 'email-find';
export enum JobPriority {
Low = 10,
Medium = 20,
High = 30
}
export const EMAIL_JOB_OPTIONS = {
priority: JobPriority.Medium,
retryLimit: parseInt(process.env.EMAIL_JOB_RETRY_LIMIT || '1'),
expireInSeconds: parseInt(process.env.EMAIL_JOB_EXPIRE_SECONDS || '60'),
};

View File

@ -0,0 +1,71 @@
import { supabase } from '@/commons/supabase.js';
import { logger } from './logger.js';
export const getSearchByHash = async (inputHash: string) => {
const { data, error } = await supabase
.from('searches')
.select('result_place_ids')
.eq('input_hash', inputHash)
.single();
if (error && error.code !== 'PGRST116') { // PGRST116 is "The result contains 0 rows"
logger.error({ err: error }, 'Error fetching search by hash');
}
return data;
};
export const getLocationsByPlaceIds = async (placeIds: string[]) => {
if (placeIds.length === 0) return [];
const { data, error } = await supabase
.from('locations')
.select('*')
.in('place_id', placeIds);
if (error) {
logger.error({ err: error }, 'Error fetching locations');
throw error;
}
return data;
};
export const getLocationsMeta = async (placeIds: string[]) => {
if (placeIds.length === 0) return [];
const { data, error } = await supabase
.from('locations')
.select('place_id, meta')
.in('place_id', placeIds);
if (error) {
logger.error({ err: error }, 'Error fetching locations meta');
return [];
}
return data;
}
export const upsertLocations = async (locations: any[]) => {
if (locations.length === 0) return;
const { error } = await supabase
.from('locations')
.upsert(locations, { onConflict: 'place_id' });
if (error) {
logger.error({ err: error }, 'Error upserting locations ');
throw error;
}
};
export const storeSearch = async (inputHash: string, inputParams: any, placeIds: string[]) => {
const { error } = await supabase
.from('searches')
.upsert({
input_hash: inputHash,
input_params: inputParams,
result_place_ids: placeIds,
created_at: new Date().toISOString()
}, { onConflict: 'input_hash' });
if (error) {
logger.error({ err: error }, 'Error storing search');
}
};

View File

@ -0,0 +1,27 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { MetaEnricher } from '../meta.js';
// Mock axios and puppeteer if necessary, but for basic unit testing we might just want to
// check the interface or mock the private methods if we could.
// Since parseHtml is private, we test enrich().
describe('MetaEnricher', () => {
let enricher: MetaEnricher;
beforeEach(() => {
enricher = new MetaEnricher();
});
it('should have correct name and type', () => {
expect(enricher.name).toBe('meta');
expect(enricher.type).toBe('meta');
});
it('should return empty object if no website', async () => {
const result = await enricher.enrich({ place_id: '1', title: 'test' } as any, { userId: 'u1' });
expect(result).toEqual({});
});
// Deeper testing requires mocking parseHtml logic which uses external libs.
// For this scope, ensuring it handles basic input covers the wiring.
});

View File

@ -0,0 +1,33 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { EnricherRegistry, IEnricher, EnrichmentContext } from '../registry.js';
import { CompetitorFull } from '@polymech/shared';
class MockEnricher implements IEnricher {
name = 'mock';
type = 'test';
async enrich(location: CompetitorFull, context: EnrichmentContext) {
return { title: 'enriched' };
}
}
describe('EnricherRegistry', () => {
// Clear registry before each test if possible, but map is static.
// We can rely on unique names or just testing existence.
it('should register and retrieve an enricher', () => {
const enricher = new MockEnricher();
EnricherRegistry.register('test-mock', enricher);
expect(EnricherRegistry.get('test-mock')).toBe(enricher);
});
it('should return undefined for unknown enricher', () => {
expect(EnricherRegistry.get('unknown')).toBeUndefined();
});
it('should list all enrichers', () => {
const initialCount = EnricherRegistry.getAll().length;
EnricherRegistry.register('test-mock-2', new MockEnricher());
expect(EnricherRegistry.getAll().length).toBeGreaterThanOrEqual(1);
});
});

View File

@ -0,0 +1,306 @@
import { CompetitorFull } from '@polymech/shared';
import { IEnricher, EnrichmentContext } from './registry.js';
import axios, { AxiosRequestConfig } from 'axios';
import * as cheerio from 'cheerio';
import { URL } from 'url';
import puppeteer from 'puppeteer';
import puppeteerExtra from 'puppeteer-extra';
import StealthPlugin from 'puppeteer-extra-plugin-stealth';
import https from 'https';
import { logger } from '@polymech/search';
const puppeteerExtraAny = puppeteerExtra as any;
puppeteerExtraAny.use(StealthPlugin());
interface Og {
[key: string]: string | undefined;
}
interface Meta {
[key: string]: string | undefined;
}
interface Image {
src: string;
}
interface PageType {
url: string;
source: string;
status: string;
}
interface Structured {
[key: string]: any;
}
interface LocationSiteMeta {
title?: string;
description?: string;
image?: string;
url?: string;
social?: PageType[];
seo?: {
keywords?: string[];
structured?: Structured[];
og?: Og;
metaTags?: Meta;
};
pages?: PageType[];
externalLinks?: PageType[];
images?: Image[];
}
export class MetaEnricher implements IEnricher {
name = 'meta';
type = 'meta';
async enrich(location: CompetitorFull, context: EnrichmentContext): Promise<Partial<CompetitorFull>> {
if (!context.logger) {
context.logger = logger;
}
if (!location.website) {
if (context.logger) context.logger.debug({ placeId: location.place_id }, 'No website to enrich');
return {};
}
try {
if (context.logger) context.logger.info({ website: location.website }, 'Starting meta enrichment');
const meta = await this.parseHtml(location.website, null, { headless: true }); // Default to headless puppeteer for better success rate
const updates: Partial<CompetitorFull> = {
raw_data: {
...location.raw_data,
meta: meta
}
};
if (context.logger) {
const socialCount = meta.social?.length || 0;
const emailCount = (meta.seo?.structured || []).length; // structured data count, rough proxy
context.logger.info({
website: location.website,
socials: meta.social?.map(s => s.source),
title: meta.title ? 'Found' : 'Missing',
emails: meta.seo?.metaTags?.['email'] || 'None'
}, 'Meta enrichment complete');
}
// Extract social links to main object if not present
if (meta.social) {
// We can't directly assign to unmapped fields if schema doesn't exist yet,
// but implementation plan said 'schemas.ts' might be updated.
// For now, sticking to raw_data.meta is safe, but we can also return them
// and let the caller/schema handle it.
}
return updates;
} catch (error: any) {
if (context.logger) {
context.logger.error(`Error enriching meta for ${location.website}: ${error.message}`);
}
return {}; // Return empty update on failure
}
}
private isValidUrl(url: string) {
try {
new URL(url);
return true;
} catch (error) {
return false;
}
}
private readMetaTags($: cheerio.CheerioAPI, name: string) {
return $(`meta[name="${name}"]`).attr('content') || $(`meta[property="${name}"]`).attr('content') || null;
}
private static browserPromise: Promise<puppeteer.Browser> | null = null;
private static idleTimer: NodeJS.Timeout | null = null;
private static IDLE_TIMEOUT_SECONDS = parseInt(process.env.ENRICHER_META_IDLE_TIMEOUT || '60');
private static resetIdleTimer() {
if (MetaEnricher.idleTimer) clearTimeout(MetaEnricher.idleTimer);
MetaEnricher.idleTimer = setTimeout(async () => {
if (MetaEnricher.browserPromise) {
// context.logger.info(`[Puppeteer] Browser idle timeout (${60}s) reached` // No Logger context here, use console or ignore
const browser = await MetaEnricher.browserPromise;
await browser.close();
MetaEnricher.browserPromise = null;
}
}, MetaEnricher.IDLE_TIMEOUT_SECONDS * 1000);
}
private static async getBrowser(): Promise<puppeteer.Browser> {
MetaEnricher.resetIdleTimer();
if (MetaEnricher.browserPromise) return MetaEnricher.browserPromise;
logger.info(`[Puppeteer] Launching new browser`);
MetaEnricher.browserPromise = (async () => {
const browser = await puppeteerExtraAny.launch({
headless: "new",
args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
});
return browser;
})();
return MetaEnricher.browserPromise;
}
private async parseHtml(url: string, config: AxiosRequestConfig | null, options: any): Promise<LocationSiteMeta> {
if (!/(^http(s?):\/\/[^\s$.?#].[^\s]*)/i.test(url)) return {} as LocationSiteMeta;
let content = '';
if (options && options.headless) {
try {
const browser = await MetaEnricher.getBrowser();
const page = await browser.newPage();
try {
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36');
// Allow shorter timeout for meta tags
const timeout = options.timeout || 15000;
await page.goto(url, { waitUntil: 'domcontentloaded', timeout });
content = await page.content();
} finally {
await page.close();
MetaEnricher.resetIdleTimer();
}
} catch (e: any) {
// Fallback to axios if puppeteer fails (or specific connection/timeout error)
// console.error(`Puppeteer failed for ${url}: ${e.message}`);
}
}
if (!content) {
try {
const { data } = await axios(url, {
...config,
httpsAgent: new https.Agent({ rejectUnauthorized: false }),
headers: {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
},
timeout: 10000
});
content = data;
} catch (e: any) {
// logger.error(`Axios failed for ${url}: ${e.message}`);
return {} as LocationSiteMeta;
}
}
const $ = cheerio.load(content);
const og: Og = {};
const meta: Meta = {};
const images: Image[] = [];
const links: string[] = [];
let allLinks: string[] = [];
const title = $('title').text();
if (title) meta.title = title;
const canonical = $('link[rel=canonical]').attr('href');
if (canonical) meta.url = canonical;
['title', 'description', 'image'].forEach(s => {
const val = this.readMetaTags($, s);
if (val) meta[s] = val;
});
['og:title', 'og:description', 'og:image', 'og:url', 'og:site_name', 'og:type'].forEach(s => {
const val = this.readMetaTags($, s);
if (val) og[s.split(':')[1]] = val;
});
$('img').each((i: number, el: any) => {
let src = $(el).attr('src');
if (src) {
try {
src = new URL(src, url).href;
images.push({ src });
} catch (e) {
// ignore invalid urls
}
}
});
const jsonLdArray: Structured[] = [];
$('script[type="application/ld+json"]').each((_: number, element: any) => {
const jsonLdContent = $(element).html();
if (jsonLdContent) {
try {
const jsonData = JSON.parse(jsonLdContent);
jsonLdArray.push(jsonData);
} catch (e) {
// logger.error(`Error parsing JSON-LD: ${e.message} @ ${url}`);
}
}
});
$('a').each((index: number, element: any) => {
let href = $(element).attr('href');
if (href) {
try {
href = new URL(href, url).href;
if (this.isValidUrl(href)) {
if (href.indexOf('contact') !== -1 && !links.includes(href)) {
links.push(href);
}
allLinks.push(href);
}
} catch (e) {
// Ignore invalid URLs
}
}
});
allLinks = [...new Set(allLinks)];
const socialLinks: PageType[] = [];
const internalPages: PageType[] = [];
const externalLinks: PageType[] = [];
allLinks.forEach(link => {
if (link.includes('instagram.com')) socialLinks.push({ url: link, source: 'instagram', status: 'PENDING' });
else if (link.includes('facebook.com')) socialLinks.push({ url: link, source: 'facebook', status: 'PENDING' });
else if (link.includes('linkedin.com')) socialLinks.push({ url: link, source: 'linkedin', status: 'PENDING' });
else if (link.includes('youtube.com')) socialLinks.push({ url: link, source: 'youtube', status: 'PENDING' });
else if (link.includes('twitter.com')) socialLinks.push({ url: link, source: 'twitter', status: 'PENDING' });
else if (link.includes('mailto:')) { /* ignore mailto */ }
else {
try {
const baseUrl = new URL(url).hostname;
const linkUrl = new URL(link).hostname;
if (linkUrl === baseUrl || linkUrl.endsWith('.' + baseUrl)) {
internalPages.push({ url: link, source: 'site', status: 'PENDING' });
} else {
externalLinks.push({ url: link, source: 'external', status: 'PENDING' });
}
} catch (e) {
externalLinks.push({ url: link, source: 'external', status: 'PENDING' });
}
}
});
return {
title: meta.title || og.title,
description: meta.description || og.description,
image: meta.image || og.image,
url: meta.url || og.url || url,
social: socialLinks,
seo: {
keywords: ($('meta[property="og:keywords"]').attr("content") ||
$('meta[name="keywords"]').attr("content") || "").split(',').map((s: any) => s.trim()).filter((s: any) => s),
structured: jsonLdArray,
og,
metaTags: meta
},
pages: internalPages,
externalLinks: externalLinks,
images
};
}
}

View File

@ -0,0 +1,34 @@
import { CompetitorFull } from '@polymech/shared';
export interface EnrichmentContext {
userId: string;
logger?: any;
}
export interface IEnricher {
name: string;
type: 'meta' | 'email' | string;
/**
* Enrich a single location.
* @param location The partial competitor data available
* @param context Execution context
*/
enrich(location: CompetitorFull, context: EnrichmentContext): Promise<Partial<CompetitorFull>>;
}
export class EnricherRegistry {
private static enrichers: Map<string, IEnricher> = new Map();
static register(name: string, enricher: IEnricher) {
this.enrichers.set(name, enricher);
}
static get(name: string): IEnricher | undefined {
return this.enrichers.get(name);
}
static getAll(): IEnricher[] {
return Array.from(this.enrichers.values());
}
}

View File

@ -0,0 +1,135 @@
import { EnricherRegistry } from './registry.js';
import { getLocationsByPlaceIds, upsertLocations } from '../db.js';
import { logger } from '../logger.js';
import pMap from 'p-map';
export interface StreamInterface {
writeSSE(data: { event: string; data: string }): Promise<void>;
}
export class EnrichmentService {
/**
* Orchestrates the enrichment process for a list of places and enrichers.
* Optionally streams progress and updates if a stream is provided.
*/
async enrichAndStream(
placeIds: string[],
enricherNames: string[],
userId: string,
stream?: StreamInterface
) {
const requestedEnrichers = enricherNames.map(name => EnricherRegistry.get(name)).filter(e => e !== undefined);
if (requestedEnrichers.length === 0) {
throw new Error('No valid enrichers found');
}
if (stream) {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: 0,
total: placeIds.length,
message: `Starting enrichment for ${placeIds.length} items`
})
});
}
// Fetch current data
const existingLocations = await getLocationsByPlaceIds(placeIds);
const locationMap = new Map(existingLocations.map((l: any) => [l.place_id, l]));
// ... class definition ...
let processed = 0;
let successCount = 0;
let failedCount = 0;
await pMap(placeIds, async (placeId: string) => {
const location = locationMap.get(placeId);
if (!location) {
logger.warn({ placeId }, 'Location not found for enrichment');
failedCount++;
return;
}
let updates: any = {};
for (const enricher of requestedEnrichers) {
try {
// Enricher handles its own timeouts/shared resources
const result = await enricher!.enrich(location, { userId, logger });
if (Object.keys(result).length > 0) {
logger.info({ placeId, enricher: enricher!.name, keys: Object.keys(result) }, 'Enricher found data');
} else {
logger.info({ placeId, enricher: enricher!.name }, 'Enricher found no new data');
}
updates = { ...updates, ...result };
} catch (e: any) {
logger.error({ placeId, enricher: enricher!.name, err: e }, 'Enricher failed');
}
}
if (Object.keys(updates).length > 0) {
const updatedLocation = {
...location,
...updates,
raw_data: {
...(location.raw_data || {}),
...(updates.raw_data || {})
}
};
try {
logger.info({ placeId, updates }, 'Updating location in db');
await upsertLocations([updatedLocation]);
successCount++;
if (stream) {
await stream.writeSSE({
event: 'enrichment-update',
data: JSON.stringify({
place_id: placeId,
updates: updates
})
});
}
} catch (e: any) {
logger.error({ placeId, err: e }, 'Failed to persist enrichment updates');
failedCount++;
}
} else {
// No updates found, but technically processed successfully without error
successCount++;
}
processed++;
if (stream) {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: processed,
total: placeIds.length,
message: `Enriched ${processed} of ${placeIds.length}`
})
});
}
}, { concurrency: parseInt(process.env.ENRICHER_META_CONCURRENCY || '5') });
if (stream) {
await stream.writeSSE({
event: 'complete',
data: JSON.stringify({
processed,
success: successCount,
failed: failedCount
})
});
}
return { processed, success: successCount, failed: failedCount };
}
}
export const enrichmentService = new EnrichmentService();

View File

@ -0,0 +1,302 @@
import sys
import json
import argparse
import pygadm
import traceback
import hashlib
import os
CACHE_DIR = os.environ.get('GADM_CACHE', './cache/gadm')
def get_cache_path(prefix, value):
hash_object = hashlib.md5(value.encode('utf-8'))
hash_hex = hash_object.hexdigest()
return os.path.join(CACHE_DIR, f"{prefix}_{hash_hex}.json")
def read_cache(path):
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return None
return None
def write_cache(path, data):
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f)
except Exception as e:
sys.stderr.write(f"Cache write failed: {e}\n")
def search_regions(query, content_level=None, geojson=False, admin=None):
path = get_cache_path(f"search_{content_level if content_level else 'all'}_{'geo' if geojson else 'meta'}_{admin if admin else 'all'}", query)
cached = read_cache(path)
if cached:
print(json.dumps(cached, default=str))
return
try:
# If geojson is requested, we need pygadm.Items to get the geometry
if geojson:
# NOTE: This might still fail if name is ambiguous.
# Ideally the UI should search first (getting GIDs), then request boundary by ID.
# If a user requests search+geojson by name, they better be specific.
kwargs = {'name': [query]}
if content_level:
kwargs['content_level'] = int(content_level)
if admin:
kwargs['admin'] = admin
try:
gdf = pygadm.Items(**kwargs)
except Exception as e:
# Check if query contains comma
if ',' in query:
first_part = query.split(',')[0].strip()
kwargs['name'] = [first_part]
try:
gdf = pygadm.Items(**kwargs)
except:
print(json.dumps({"data": []}))
return
else:
raise e
else:
# Metadata search -> Use pygadm.Names to find all matches (handles ambiguity)
kwargs = {'name': query}
if content_level:
kwargs['content_level'] = int(content_level)
if admin:
kwargs['admin'] = admin
try:
gdf = pygadm.Names(**kwargs)
except Exception as e:
# Check if query contains comma
if ',' in query:
# Try searching for the first part
first_part = query.split(',')[0].strip()
kwargs['name'] = first_part
try:
gdf = pygadm.Names(**kwargs)
except:
# Still failed, raise original error or return empty?
# Let's print empty for consistent client handling
print(json.dumps({"data": []}))
return
else:
# No comma, propagate empty result
print(json.dumps({"data": []}))
return
if gdf is None or gdf.empty:
print(json.dumps({"data": []}))
return
# Convert to list of dicts for JSON serialization
# safe stringify helper
def safe_str(v):
return str(v) if v is not None else ""
if geojson:
# Full GeoJSON conversion with manual fallback
features = []
cols_props = [c for c in gdf.columns if c != 'geometry']
for index, row in gdf.iterrows():
properties = {c: safe_str(row[c]) for c in cols_props}
try:
geometry = row['geometry'].__geo_interface__ if row['geometry'] else None
except:
geometry = None
features.append({
"type": "Feature",
"properties": properties,
"geometry": geometry
})
output = {
"type": "FeatureCollection",
"features": features
}
else:
# Metadata only (from Names or Items)
results = []
cols = gdf.columns.tolist()
for index, row in gdf.iterrows():
r = {}
for c in cols:
r[c] = safe_str(row[c])
results.append(r)
output = {"data": results}
write_cache(path, output)
print(json.dumps(output, default=str))
except Exception as e:
print(json.dumps({"error": traceback.format_exc()}))
sys.exit(1)
def get_boundary(gadm_id, name, content_level):
path = get_cache_path("boundary", gadm_id)
cached = read_cache(path)
if cached:
print(json.dumps(cached, default=str))
return
try:
# Use admin code (GID) directly to avoid ambiguity
# pygadm.Items supports 'admin' to fetch by GID
kwargs = {'admin': [gadm_id]}
# content_level is not strictly needed if GID is specific,
# but passing it doesn't hurt if we want to be explicit,
# however 'admin' usually overrides or implies level.
# Let's rely on GID as the primary key.
gdf = pygadm.Items(**kwargs)
if gdf is None or gdf.empty:
print(json.dumps({"error": "Region not found"}))
sys.exit(1)
# Convert to GeoJSON manually to avoid pandas errors
features = []
cols_props = [c for c in gdf.columns if c != 'geometry']
# Helper for safe string conversion
def safe_str(v):
return str(v) if v is not None else ""
for index, row in gdf.iterrows():
properties = {c: safe_str(row[c]) for c in cols_props}
try:
geometry = row['geometry'].__geo_interface__ if row['geometry'] else None
except:
geometry = None
features.append({
"type": "Feature",
"properties": properties,
"geometry": geometry
})
output = {
"type": "FeatureCollection",
"features": features
}
write_cache(path, output)
print(json.dumps(output, default=str))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
def get_names(admin, content_level=None, depth=1):
path = get_cache_path(f"names_{admin}_{content_level}_{depth}", "names")
cached = read_cache(path)
if cached:
print(json.dumps(cached, default=str))
return
try:
all_results = []
# Determine starting level
# If content_level is explicit, start there.
# Else derive from admin code: e.g. "ESP.6_1" (1 dot) -> Level 1. Next is 2.
# "ESP" (0 dots) -> Level 0. Next is 1.
if content_level:
start_level = int(content_level)
else:
# GID format: XXX.L1.L2_1
# Count dots:
# ESP -> 0 -> start 1
# ESP.1_1 -> 1 -> start 2
start_level = admin.count('.') + 1
# Handle infinite depth (passed as -1 or similar string)
if str(depth).lower() in ['infinite', 'inf', '-1']:
limit = 5 # Reasonable cap for GADM (rarely goes beyond level 5)
else:
limit = int(depth)
current_level = start_level
for i in range(limit):
# Fetch names for this level
kwargs = {'admin': admin, 'content_level': current_level}
try:
gdf = pygadm.Names(**kwargs)
except:
# Likely level doesn't exist
break
if gdf is None or gdf.empty:
# No more levels
break
# Convert to list of dicts
cols = gdf.columns.tolist()
def safe_str(v):
return str(v) if v is not None else ""
for index, row in gdf.iterrows():
r = {}
for c in cols:
r[c] = safe_str(row[c])
all_results.append(r)
current_level += 1
output = {"data": all_results}
write_cache(path, output)
print(json.dumps(output, default=str))
except Exception as e:
print(json.dumps({"error": traceback.format_exc()}))
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='PyGADM Wrapper')
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# Search command
search_parser = subparsers.add_parser('search', help='Search for regions')
search_parser.add_argument('--query', required=True, help='Name to search for')
search_parser.add_argument('--content_level', required=False, help='Content level filter')
search_parser.add_argument('--geojson', action='store_true', help='Include GeoJSON geometry')
search_parser.add_argument('--country', required=False, help='Country filter (Admin code or name)')
# Boundary command
boundary_parser = subparsers.add_parser('boundary', help='Get region boundary')
boundary_parser.add_argument('--id', required=True, help='GADM ID (for cache)')
boundary_parser.add_argument('--name', required=True, help='Region Name')
boundary_parser.add_argument('--content_level', required=False, help='Content Level')
# Names command
names_parser = subparsers.add_parser('names', help='Get region names')
names_parser.add_argument('--admin', required=True, help='Admin code (e.g. FRA)')
names_parser.add_argument('--content_level', required=False, help='Content level')
names_parser.add_argument('--depth', required=False, default=1, help='Depth of recursion (1=default, -1=infinite)')
args = parser.parse_args()
if args.command == 'search':
search_regions(args.query, args.content_level, args.geojson, args.country)
elif args.command == 'boundary':
get_boundary(args.id, args.name, args.content_level)
elif args.command == 'names':
get_names(args.admin, args.content_level, args.depth)
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,844 @@
import { getLocationsRoute, getLocationsStreamRoute, getCompetitorByIdRoute, getFindEmailStreamRoute, cancelJobRoute, getRegionSearchRoute, getRegionBoundaryRoute, getRegionNamesRoute, getRegionReverseRoute, getWikiSearchRoute, getLlmRegionInfoRoute, getEnrichStreamRoute } from './routes.js';
import { run, IKBotTask } from '@polymech/kbot-d';
import { EnricherRegistry } from './enrichers/registry.js';
import { MetaEnricher } from './enrichers/meta.js';
import { enrichmentService } from './enrichers/service.js';
import { getLocationsByPlaceIds, upsertLocations } from './db.js';
import { reverse } from '@polymech/search';
import { LocationsWorker, LocationsJobData, EmailWorker, EmailJobData } from './pgboss.js';
import { LOCATIONS_JOB_OPTIONS, COST_PER_SEARCH, JOB_NAME, EMAIL_JOB_NAME, EMAIL_JOB_OPTIONS, EMAIL_SEARCH_COST, EMAIL_STREAM_TIMEOUT_MS, EMAIL_MAX_BATCH_SIZE, TIMEOUT_MS_LOCATIONS } from './constants.js';
import { logger } from '@/commons/logger.js';
import { get_cached, params_hash } from './cache.js';
import { streamSSE } from 'hono/streaming';
import { EventBus } from '../EventBus.js';
import { exec } from 'child_process';
import { promisify } from 'util';
import path from 'path';
import { fileURLToPath } from 'url';
const execAsync = promisify(exec);
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
import { AbstractProduct } from '../AbstractProduct.js';
import { CONFIG_DEFAULT } from '@polymech/commons';
function deduplicateResults(results: any[] | null | undefined) {
if (!Array.isArray(results)) return results;
const seen = new Set();
return results.filter(item => {
const id = item.place_id || item.title; // Fallback to title if place_id missing
if (!id) return true;
if (seen.has(id)) return false;
seen.add(id);
return true;
});
}
export class LocationsProduct extends AbstractProduct<LocationsJobData | EmailJobData> {
id = 'locations';
workers = [LocationsWorker, EmailWorker];
jobOptions = LOCATIONS_JOB_OPTIONS;
actions = {
search: {
costUnits: COST_PER_SEARCH,
cancellable: true,
description: 'Search for locations',
},
email: {
costUnits: EMAIL_SEARCH_COST,
cancellable: true,
description: 'Find emails for locations'
}
};
routes!: any[];
constructor() {
super();
this.routes = [
{ definition: getLocationsRoute, handler: this.handleGetLocations.bind(this) },
{ definition: getLocationsStreamRoute, handler: this.handleStreamGet.bind(this) },
{ definition: getCompetitorByIdRoute, handler: this.handleGetLocationById.bind(this) },
{ definition: getFindEmailStreamRoute, handler: this.handleStreamEmail.bind(this) },
{ definition: cancelJobRoute, handler: this.handleCancelJob.bind(this) },
{ definition: getRegionSearchRoute, handler: this.handleGetRegionSearch.bind(this) },
{ definition: getRegionBoundaryRoute, handler: this.handleGetRegionBoundary.bind(this) },
{ definition: getRegionReverseRoute, handler: this.handleGetRegionReverse.bind(this) },
{ definition: getRegionBoundaryRoute, handler: this.handleGetRegionBoundary.bind(this) },
{ definition: getRegionReverseRoute, handler: this.handleGetRegionReverse.bind(this) },
{ definition: getRegionNamesRoute, handler: this.handleGetRegionNames.bind(this) },
{ definition: getRegionNamesRoute, handler: this.handleGetRegionNames.bind(this) },
{ definition: getWikiSearchRoute, handler: this.handleGetWikiSearch.bind(this) },
{ definition: getLlmRegionInfoRoute, handler: this.handleGetLlmRegionInfo.bind(this) },
{ definition: getEnrichStreamRoute, handler: this.handleStreamEnrich.bind(this) }
];
// Register default enrichers
EnricherRegistry.register('meta', new MetaEnricher());
}
async handleGetLlmRegionInfo(c: any) {
const { location } = c.req.valid('query');
try {
const promptContent = `Provide a concise summary for ${location} strictly in valid JSON format with the following keys:
- population: string (current estimate)
- industry: string (key industries)
- status: string (current geopolitical status, e.g. "Stable", "Conflict Zone", "Developing")
- currency: string (currency name and code)
- description: string (brief general description)
Do not include markdown formatting (like \`\`\`json). Just the raw JSON object.`;
const task: IKBotTask = {
router: 'openai',
model: 'gpt-4o', // Using gpt-4o as gpt-5 might not be available/stable in all envs yet, or user req said gpt-5. Let's use gpt-4o or gpt-5 if confident. User said gpt-5. Correct.
prompt: promptContent,
// mode: 'completion' // Default is completion? No, default might be needed.
mode: 'completion',
path: '.', // unused but required?
};
// Override model to gpt-5 as requested if supported, otherwise fallback to gpt-4o.
// But checking kbot source, valid models are enums. If 'gpt-5' is not in enum, it might fail if validated strictly.
// The user explicitly asked for "router=openai model=gpt-5".
task.model = 'gpt-5';
const results = await run(task);
const result = results.length > 0 ? results[0] : null;
// KBot completion mode usually returns string or the completion object
let jsonStr = '';
if (typeof result === 'string') {
jsonStr = result;
} else if (result && typeof result === 'object' && 'content' in result) {
jsonStr = (result as any).content as string;
} else {
// Fallback or check structure
jsonStr = JSON.stringify(result);
}
// Cleanup jsonStr if it has markdown code blocks
jsonStr = jsonStr.replace(/```json\n?|```/g, '').trim();
let data;
try {
data = JSON.parse(jsonStr);
} catch (e) {
logger.error({ jsonStr }, 'Failed to parse LLM response');
return c.json({ error: 'Failed to parse LLM response' }, 500);
}
return c.json({ data }, 200);
} catch (error: any) {
logger.error(error, 'Error fetching LLM info');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
async handleGetWikiSearch(c: any) {
const { lat, lon, radius, limit } = c.req.valid('query');
try {
// Wikipedia API: https://en.wikipedia.org/w/api.php?action=query&list=geosearch&gscoord=37.7891838|-122.4033522&gsradius=10000&gslimit=10&format=json
// We need a user agent: https://meta.wikimedia.org/wiki/User-Agent_policy
const apiUrl = `https://en.wikipedia.org/w/api.php`;
const params = new URLSearchParams({
action: 'query',
list: 'geosearch',
gscoord: `${lat}|${lon}`,
gsradius: radius || '10000',
gslimit: limit || '10',
format: 'json'
});
const headers = {
'User-Agent': 'PolymechSaaS/1.0 (test@example.com)' // Replace with real contact if/when production
};
const res = await fetch(`${apiUrl}?${params.toString()}`, { headers });
if (!res.ok) {
return c.json({ error: 'Failed to fetch from Wikipedia' }, res.status);
}
const json = await res.json();
if (json.error) {
logger.error({ wikiError: json.error }, 'Wikipedia API Error');
return c.json({ error: json.error.info || 'Wikipedia API Error' }, 500);
}
const results = json.query?.geosearch || [];
return c.json({ data: results }, 200);
} catch (error: any) {
logger.error(error, 'Error fetching Wiki data');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
async handleCancelJob(c: any) {
console.log('[LocationsProduct] handleCancelJob called', c.req.param('jobId'));
const { jobId } = c.req.valid('param');
try {
if (!this.boss) {
logger.error('[LocationsProduct] Job system not initialized');
return c.json({ error: 'Job system not initialized' }, 503);
}
await (this.boss as any).cancel(EMAIL_JOB_NAME, jobId);
logger.info({ jobId }, '[LocationsProduct] Job cancellation requested via API');
return c.json({ message: 'Job cancellation requested' }, 200);
} catch (e: any) {
logger.error({ err: e, jobId }, '[LocationsProduct] Failed to cancel job');
return c.json({ error: 'Failed to cancel job' }, 500);
}
}
async handleGetLocationById(c: any) {
const { place_id } = c.req.valid('param');
try {
const results = await getLocationsByPlaceIds([place_id]);
const location = results && results.length > 0 ? results[0] : null;
if (!location) {
return c.json({ error: 'Competitor not found' }, 404);
}
return c.json({
message: 'Get competitor details success',
data: location
}, 200);
} catch (error: any) {
logger.error(error, 'Error fetching competitor details');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
async handleGetRegionSearch(c: any) {
const { query, content_level, geojson, country } = c.req.valid('query');
try {
const wrapperPath = path.join(__dirname, 'gadm_wrapper.py');
// Ensure python is in path
let cmd = `python "${wrapperPath}" search --query "${query}"`;
if (content_level) {
cmd += ` --content_level "${content_level}"`;
}
if (geojson === 'true') {
cmd += ` --geojson`;
}
if (country) {
cmd += ` --country "${country}"`;
}
const { stdout, stderr } = await execAsync(cmd, { maxBuffer: 1024 * 1024 * 50 }); // 50MB buffer
if (stderr) {
console.error('[PyGADM] Stderr:', stderr);
}
try {
const json = JSON.parse(stdout);
if (json.error) {
return c.json({ error: json.error }, 500);
}
return c.json(json, 200);
} catch (e) {
logger.error({ stdout }, 'Failed to parse python output');
return c.json({ error: 'Failed to parse GADM results' }, 500);
}
} catch (error: any) {
// Check if it's a python script error that outputted JSON
if (error.stdout) {
try {
const json = JSON.parse(error.stdout);
if (json.error) {
logger.warn({ error: json.error }, 'GADM script returned error');
return c.json({ error: json.error }, 400); // Bad request / Search failed
}
} catch (e) {
// ignore parsing error, fall through to generic error
}
}
logger.error(error, 'Error searching regions');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
async handleGetRegionBoundary(c: any) {
const { id } = c.req.valid('param');
const { name, level } = c.req.valid('query');
try {
const wrapperPath = path.join(__dirname, 'gadm_wrapper.py');
// We need name to fetch. If not provided, we can't do much with PyGADM Items class unless cached.
// But we can try to rely on cache by ID if it exists.
// If not cached and name missing -> error.
if (!name) {
// Check cache only? Or just require name.
// For now, let's require name from client.
}
const cmd = `python "${wrapperPath}" boundary --id "${id}" --name "${name || ''}" --content_level "${level || ''}"`;
const { stdout, stderr } = await execAsync(cmd, { maxBuffer: 1024 * 1024 * 50 }); // 50MB buffer
if (stderr) {
console.error('[PyGADM Boundary] Stderr:', stderr);
}
try {
const json = JSON.parse(stdout);
if (json.error) {
return c.json({ error: json.error }, 404);
}
return c.json(json, 200);
} catch (e) {
logger.error({ stdout }, 'Failed to parse python output');
return c.json({ error: 'Failed to parse GADM boundary' }, 500);
}
} catch (error: any) {
// Check if it's a python script error that outputted JSON
if (error.stdout) {
try {
const json = JSON.parse(error.stdout);
if (json.error) {
logger.warn({ error: json.error }, 'GADM boundary script returned error');
return c.json({ error: json.error }, 404);
}
} catch (e) {
// ignore parsing error
}
}
logger.error(error, 'Error fetching boundary');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
async handleGetRegionNames(c: any) {
const { admin, content_level, depth } = c.req.valid('query');
try {
const wrapperPath = path.join(__dirname, 'gadm_wrapper.py');
let cmd = `python "${wrapperPath}" names --admin "${admin}"`;
if (content_level) cmd += ` --content_level "${content_level}"`;
if (depth) cmd += ` --depth "${depth}"`;
const { stdout, stderr } = await execAsync(cmd, { maxBuffer: 1024 * 1024 * 50 }); // 50MB buffer
if (stderr) {
console.error('[PyGADM] Stderr:', stderr);
}
try {
const json = JSON.parse(stdout);
if (json.error) {
return c.json({ error: json.error }, 500);
}
return c.json(json, 200);
} catch (e) {
logger.error({ stdout }, 'Failed to parse python output');
return c.json({ error: 'Failed to parse GADM names results' }, 500);
}
} catch (error: any) {
// Check if it's a python script error that outputted JSON
if (error.stdout) {
try {
const json = JSON.parse(error.stdout);
if (json.error) {
logger.warn({ error: json.error }, 'GADM names script returned error');
return c.json({ error: json.error }, 400);
}
} catch (e) {
// ignore parsing error
}
}
logger.error(error, 'Error fetching region names');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
async handleGetRegionReverse(c: any) {
const { lat, lon } = c.req.valid('query');
try {
// Mock LocalResult structure expected by reverse()
const loc: any = {
gps_coordinates: {
latitude: parseFloat(lat),
longitude: parseFloat(lon)
}
};
// Use 'reverse' from search package
// It expects opts.bigdata.key. We try to read it from env.
const config = CONFIG_DEFAULT() as any
await reverse(loc, { bigdata: { key: config.bigdata.key } });
if (loc.geo) {
return c.json({ data: loc.geo }, 200);
} else {
return c.json({ error: 'Reverse geocoding failed' }, 404);
}
} catch (error: any) {
logger.error(error, 'Error reverse geocoding');
return c.json({ error: error.message || 'Internal Server Error' }, 500);
}
}
hash(data: LocationsJobData) {
const { userId, ...params } = data;
return this.generateHash(params);
}
meta(userId: string) {
return {
cost: COST_PER_SEARCH,
userId,
timestamp: new Date().toISOString()
};
}
async handleGetLocations(c: any) {
const query = c.req.valid('query');
const { refresh } = query;
const forceRefresh = refresh === 'true';
const userId = c.get('userId');
const jobData = {
query: query.query,
location: query.location,
filterCity: query.filterCity,
filterContinent: query.filterContinent,
filterCountry: query.filterCountry,
filterType: query.filterType,
limit: query.limit ? parseInt(query.limit) : 250,
excludedTypes: query.excludedTypes,
userId: userId
};
try {
const inputHash = params_hash({
query: query.query,
location: query.location,
filterCity: query.filterCity,
filterContinent: query.filterContinent,
filterCountry: query.filterCountry,
filterType: query.filterType,
limit: query.limit ? parseInt(query.limit) : 250,
excludedTypes: query.excludedTypes,
zoom: query.zoom ? parseInt(query.zoom) : undefined
});
if (!forceRefresh) {
const cached = await get_cached(inputHash);
if (cached) {
return c.json({
message: 'Get locations success (cached)',
data: deduplicateResults(cached)
}, 200);
}
}
const jobId = await this.sendJob(JOB_NAME, jobData, this.jobOptions);
if (!jobId) {
const cached = await get_cached(inputHash);
if (cached) {
return c.json({ message: 'Get locations success (cached)', data: deduplicateResults(cached) }, 200);
}
return c.json({ message: 'Job already submitted', jobId: null }, 202);
}
try {
await this.waitForJob(jobId, TIMEOUT_MS_LOCATIONS);
const cached = await get_cached(inputHash);
if (cached) {
return c.json({ message: 'Get locations success (fresh)', data: deduplicateResults(cached) }, 200);
}
} catch (e) {
if ((e as Error).message === 'Job timeout') {
// Fallthrough to return 202
} else {
throw e;
}
}
} catch (error: any) {
logger.error(error, 'Error fetching locations');
return c.json({ message: error.message || 'Internal Server Error' }, 500);
}
return c.json({ message: 'Job submitted' }, 202);
}
async handleStreamGet(c: any) {
const query = c.req.valid('query');
const { refresh } = query;
const forceRefresh = refresh === 'true';
const userId = c.get('userId');
if (!userId) {
return c.json({ message: 'Unauthorized' }, 401);
}
return this.handleStream(c, {
data: {
query: query.query,
location: query.location,
filterCity: query.filterCity,
filterContinent: query.filterContinent,
filterCountry: query.filterCountry,
filterType: query.filterType,
limit: query.limit ? parseInt(query.limit) : 250,
zoom: query.zoom ? parseInt(query.zoom) : undefined,
excludedTypes: query.excludedTypes,
},
userId,
forceRefresh,
fetcher: async (data: any, uid: string) => {
const inputHash = this.generateHash(data);
const existing = await get_cached(inputHash);
if (existing) return deduplicateResults(existing) as any[];
const jobData = { ...data, userId: uid };
const options = { ...this.jobOptions, singletonKey: this.generateHash(data) };
logger.info(`Fetching locations for ${inputHash}`, jobData);
const jobId = await this.sendJob(JOB_NAME, jobData, options);
if (jobId) {
await this.waitForJob(jobId, TIMEOUT_MS_LOCATIONS);
} else {
const checkAgain = await get_cached(inputHash);
if (checkAgain) return deduplicateResults(checkAgain) as any[];
logger.info({ inputHash }, 'Waiting for debounced job...');
await this.waitForHash(inputHash, TIMEOUT_MS_LOCATIONS);
}
const result = await get_cached(inputHash);
return deduplicateResults(result) || [];
},
cacheChecker: async (hash: string) => deduplicateResults(await get_cached(hash)) || null
});
}
async handleStreamEmail(c: any) {
const { place_ids } = c.req.valid('query');
let userId = c.get('userId') as string | undefined;
if (!userId) {
return c.json({ error: 'Unauthorized' }, 401);
}
const placeIdArray = place_ids.split(',').map((id: string) => id.trim()).filter((id: string) => id.length > 0);
if (placeIdArray.length === 0) {
return c.json({ error: 'No place IDs provided' }, 400);
}
if (placeIdArray.length > EMAIL_MAX_BATCH_SIZE) {
return c.json({ error: `Maximum ${EMAIL_MAX_BATCH_SIZE} locations can be processed at once` }, 400);
}
logger.info(`[LocationsProduct] Starting email search via worker for ${placeIdArray.length} locations`);
return streamSSE(c, async (stream) => {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: 0,
total: placeIdArray.length,
percent: 0,
message: `Queueing jobs for ${placeIdArray.length} location(s)`
})
});
const pendingJobIds = new Set<string>();
const jobsMap = new Map<string, string>();
let completedCount = 0;
let successCount = 0;
let failedCount = 0;
let emailsFound = 0;
const onJobComplete = async (event: any) => {
if (pendingJobIds.has(event.jobId)) {
const place_id = jobsMap.get(event.jobId);
pendingJobIds.delete(event.jobId);
const result = event.result;
await stream.writeSSE({
event: 'location-start',
data: JSON.stringify({
place_id,
title: 'Processing...',
index: completedCount + 1,
total: placeIdArray.length
})
});
if (result && result.emails) {
successCount++;
emailsFound += result.emails.length;
for (const emailMeta of result.emails) {
await stream.writeSSE({
event: 'location-email',
data: JSON.stringify({
place_id,
email: emailMeta.email,
source: emailMeta.source,
cached: false
})
});
}
}
await stream.writeSSE({
event: 'location-complete',
data: JSON.stringify({
place_id,
emailCount: result?.emails?.length || 0,
cached: false
})
});
completedCount++;
const percent = Math.round((completedCount / placeIdArray.length) * 100);
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: completedCount,
total: placeIdArray.length,
percent,
message: `Processed ${completedCount} of ${placeIdArray.length}`
})
});
}
};
const onJobFailed = async (event: any) => {
if (pendingJobIds.has(event.jobId)) {
const place_id = jobsMap.get(event.jobId);
pendingJobIds.delete(event.jobId);
failedCount++;
await stream.writeSSE({
event: 'location-error',
data: JSON.stringify({
place_id,
error: event.error || 'Unknown error',
canRetry: true
})
});
completedCount++;
const percent = Math.round((completedCount / placeIdArray.length) * 100);
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: completedCount,
total: placeIdArray.length,
percent,
message: `Processed ${completedCount} of ${placeIdArray.length}`
})
});
}
};
// Register Listeners BEFORE submitting jobs to capture fast completions
EventBus.on('job:complete', onJobComplete);
EventBus.on('job:failed', onJobFailed);
stream.onAbort(async () => {
logger.info('[LocationsProduct] Stream aborted by client, cancelling pending jobs');
for (const jobId of pendingJobIds) {
try {
if (this.boss) {
await this.boss.cancel(EMAIL_JOB_NAME, jobId);
}
} catch (e) {
logger.warn({ jobId, err: e }, 'Failed to cancel job on abort');
}
}
clearTimeout(listTimeout);
EventBus.off('job:complete', onJobComplete);
EventBus.off('job:failed', onJobFailed);
});
const listTimeout = setTimeout(() => {
logger.warn('[LocationsProduct] Stream global timeout reached');
EventBus.off('job:complete', onJobComplete);
EventBus.off('job:failed', onJobFailed);
stream.writeSSE({ event: 'error', data: JSON.stringify({ error: 'Global Timeout' }) });
stream.close();
}, EMAIL_STREAM_TIMEOUT_MS);
try {
let submittedCount = 0;
for (const place_id of placeIdArray) {
const jobData = { place_id, userId };
const options = { ...EMAIL_JOB_OPTIONS };
const jobId = await this.sendJob(EMAIL_JOB_NAME, jobData, options);
if (jobId) {
pendingJobIds.add(jobId);
jobsMap.set(jobId, place_id);
// Emit job info immediately for cancellation support
await stream.writeSSE({
event: 'location-job',
data: JSON.stringify({ place_id, jobId })
});
}
submittedCount++;
}
// Wait loop
while (pendingJobIds.size > 0) {
// logger.info(`[LocationsProduct] ${pendingJobIds.size} pending jobs`);
await new Promise(resolve => setTimeout(resolve, 1000));
}
} catch (error: any) {
logger.error({ err: error }, 'Error in email stream loop');
// Attempt to send error event
try {
await stream.writeSSE({
event: 'error',
data: JSON.stringify({ error: error.message || 'Stream processing error' })
});
} catch (e) { /* ignore stream write errors if closed */ }
} finally {
clearTimeout(listTimeout);
EventBus.off('job:complete', onJobComplete);
EventBus.off('job:failed', onJobFailed);
}
await stream.writeSSE({
event: 'complete',
data: JSON.stringify({
total: placeIdArray.length,
successful: successCount,
failed: failedCount,
emailsFound
})
});
});
}
async handleStreamEnrich(c: any) {
const { place_ids, enrichers } = c.req.valid('query');
const userId = c.get('userId');
if (!userId) {
return c.json({ error: 'Unauthorized' }, 401);
}
const placeIdArray = place_ids.split(',').map((id: string) => id.trim()).filter((id: string) => id.length > 0);
const enricherNames = enrichers.split(',').map((e: string) => e.trim()).filter((e: string) => e.length > 0);
if (placeIdArray.length === 0) {
return c.json({ error: 'No place IDs provided' }, 400);
}
const requestedEnrichers = enricherNames.map((name: string) => EnricherRegistry.get(name)).filter((e: any) => e !== undefined);
if (requestedEnrichers.length === 0) {
return c.json({ error: 'No valid enrichers requested' }, 400);
}
logger.info({ count: placeIdArray.length, enrichers: enricherNames }, '[LocationsProduct] Starting enrichment stream');
return streamSSE(c, async (stream) => {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: 0,
total: placeIdArray.length,
message: `Starting enrichment for ${placeIdArray.length} items`
})
});
// Fetch current data for these places
// For now, we fetch them from DB to have the base object.
// If they don't exist in DB, we can't enrich them (unless we accept partials from client, but that's insecure/complex)
// Ideally we assume they exist or we just proceed.
const existingLocations = await getLocationsByPlaceIds(placeIdArray);
const locationMap = new Map(existingLocations.map(l => [l.place_id, l]));
let processed = 0;
for (const placeId of placeIdArray) {
const location = locationMap.get(placeId);
if (!location) {
logger.warn({ placeId }, 'Location not found for enrichment');
// Emit error or skip
continue;
}
// Run each enricher
let updates: any = {};
for (const enricher of requestedEnrichers) {
try {
const result = await enricher!.enrich(location, { userId, logger });
updates = { ...updates, ...result };
// Update our local copy for subsequent enrichers if needed
// Object.assign(location, result);
// Note: Deep merge might be safer, but for now top-level partials are expected.
} catch (e: any) {
logger.error({ placeId, enricher: enricher!.name, err: e }, 'Enricher failed');
}
}
if (Object.keys(updates).length > 0) {
// Persist to DB
// We need to merge updates into the location.
// upsertLocations expects arrays.
const updatedLocation = {
...location,
...updates,
// If deep merging is needed for raw_data or others, handle it.
// For now assume top level fields or replacement of raw_data chunks.
raw_data: {
...(location.raw_data || {}),
...(updates.raw_data || {})
}
};
try {
await upsertLocations([updatedLocation]);
await stream.writeSSE({
event: 'enrichment-update',
data: JSON.stringify({
place_id: placeId,
updates: updates
})
});
} catch (e: any) {
logger.error({ placeId, err: e }, 'Failed to persist enrichment updates');
}
}
processed++;
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({
current: processed,
total: placeIdArray.length,
message: `Enriched ${processed} of ${placeIdArray.length}`
})
});
}
await stream.writeSSE({ event: 'complete', data: JSON.stringify({ processed }) });
});
}
}

View File

@ -0,0 +1,172 @@
import { googleMaps, ResolveFlags } from '@polymech/search';
import { upsertLocations, storeSearch, getLocationsMeta } from './db.js';
import { supabase } from '../../commons/supabase.js';
import { params_hash } from './cache.js';
import { logger } from './logger.js';
export interface SearchParams {
query: string;
location: string;
filterCity?: string;
filterContinent?: string;
filterCountry?: string;
filterType?: string;
concurrency?: number;
limit?: number;
zoom?: number;
excludedTypes?: string[];
}
export const searchLocations = async (params: SearchParams, userId: string) => {
logger.info({ params }, 'Fetching locations');
const { query, location, filterCity, filterContinent, filterCountry, filterType, limit, zoom, excludedTypes } = params;
// 1. Fetch from SerpApi
const results = await googleMaps({
query: query,
searchFrom: location,
resolve: [ResolveFlags.PHOTOS],
filterCity,
filterContinent,
filterCountry,
filterType,
limit,
zoom,
excludedTypes
});
// Flatten results
const flatResults = results ? results.flat(Infinity) : [];
// 2. Map Results
const locationsToUpsert = flatResults
.filter((r: any) => r.place_id)
.map((r: any) => ({
place_id: r.place_id,
title: r.title,
description: r.description,
address: r.address,
gps_coordinates: r.gps_coordinates,
phone: r.phone,
website: r.website,
operating_hours: r.operating_hours,
thumbnail: r.thumbnail,
types: r.types,
raw_data: r,
continent: r.geo?.continent,
country: r.geo?.countryName,
city: r.geo?.city,
updated_at: new Date().toISOString(),
user_id: userId
}));
// 2b. Update Known Types (Side Effect)
if (userId) {
try {
const { data: profile } = await supabase
.from('profiles')
.select('settings')
.eq('id', userId)
.maybeSingle();
const settings = profile?.settings || {};
const knownTypes: string[] = Array.isArray(settings.known_types) ? settings.known_types : [];
const newTypesSet = new Set(knownTypes);
locationsToUpsert.forEach((l: any) => {
if (l.types && Array.isArray(l.types)) {
l.types.forEach((t: string) => newTypesSet.add(t));
}
});
if (newTypesSet.size > knownTypes.length) {
const updatedKnownTypes = Array.from(newTypesSet).sort();
await supabase
.from('profiles')
.update({
settings: { ...settings, known_types: updatedKnownTypes }
})
.eq('id', userId);
}
} catch (err) {
logger.error(err, 'Error processing known types');
}
}
// 3. Preserve Meta (Emails)
// 2a. Deduplicate Locations (Fix 21000 error)
const uniqueLocationsMap = new Map();
locationsToUpsert.forEach((l: any) => {
const pid = String(l.place_id).trim();
// Ensure place_id is cleaned
l.place_id = pid;
if (!uniqueLocationsMap.has(pid)) {
uniqueLocationsMap.set(pid, l);
} else {
logger.debug({ place_id: pid }, 'Duplicate place_id found in batch, skipping');
}
});
const uniqueLocations = Array.from(uniqueLocationsMap.values());
logger.info({
total: locationsToUpsert.length,
unique: uniqueLocations.length,
diff: locationsToUpsert.length - uniqueLocations.length
}, 'Deduplication stats');
// 3. Preserve Meta (Emails)
const placeIds = uniqueLocations.map((l: any) => l.place_id);
if (placeIds.length > 0) {
const existingLocations = await getLocationsMeta(placeIds);
if (existingLocations) {
const metaMap = new Map(existingLocations.map((l: any) => [l.place_id, l.meta]));
uniqueLocations.forEach((l: any) => {
const existingMeta = metaMap.get(l.place_id);
if (existingMeta) {
l.raw_data.meta = {
...(l.raw_data.meta || {}),
...existingMeta
};
}
});
}
}
// 4. Upsert Locations & Store Search
try {
// Attempt single batch upsert for performance
logger.info({ count: uniqueLocations.length }, 'Attempting batch upsert');
await upsertLocations(uniqueLocations);
logger.info({ count: uniqueLocations.length }, 'Batch upsert successful');
} catch (err: any) {
// Fallback to serial processing if duplicates exist within batch (Postgres 21000)
// or any other batch-level error that might be resolvable row-by-row
logger.warn({ err }, 'Batch upsert failed, falling back to sequential upserts');
// Execute upserts sequentially to safely handle "ON CONFLICT ... row a second time"
// This effectively implements "Last Write Wins" for duplicates in the batch
for (const loc of uniqueLocations) {
try {
await upsertLocations([loc]);
} catch (innerErr) {
logger.error({ place_id: loc.place_id, err: innerErr }, 'Failed to upsert individual location');
// Continue processing others
}
}
}
await storeSearch(params_hash({
query,
location,
filterCity,
filterContinent,
filterCountry,
filterType,
limit,
zoom,
excludedTypes
}), { query, location }, placeIds);
return uniqueLocations;
};

View File

@ -0,0 +1,2 @@
import { logger as rootLogger } from '@/commons/logger.js';
export const logger = rootLogger.child({ product: 'locations' });

View File

@ -0,0 +1,167 @@
import { Job, PgBoss } from 'pg-boss';
import { AbstractWorker } from '@/jobs/boss/AbstractWorker.js';
import { Worker } from '@/commons/decorators.js';
import { searchLocations, SearchParams } from './jobs-find.js';
import { JOB_NAME, EMAIL_JOB_NAME, COST_PER_SEARCH, EMAIL_SEARCH_COST, EMAIL_META_TIMEOUT_MS, EMAIL_SEARCH_TIMEOUT_MS, EMAIL_SEARCH_MAX_PAGES, EMAIL_SEARCH_PAGE_TIMEOUT_MS, EMAIL_SEARCH_PAGE_CONCURRENCY, EMAIL_MAX_CONCURRENT_JOBS } from './constants.js';
import { params_hash } from './cache.js';
import { findEmailEach, parseHtml, Page, Meta } from '@polymech/search';
import { getLocationByPlaceId } from '../../endpoints/competitors/getLocationByPlaceId.js';
import { supabase } from '../../commons/supabase.js';
import { logger } from '@/commons/logger.js';
export interface LocationsJobData extends SearchParams {
userId: string;
}
export interface EmailJobData {
place_id: string;
userId: string;
}
@Worker(JOB_NAME)
export class LocationsWorker extends AbstractWorker<LocationsJobData> {
readonly queueName = JOB_NAME;
calculateCost(job: Job<LocationsJobData>, result: any): number {
return COST_PER_SEARCH + (result?.length || 0) * 0.1;
}
protected async process(job: Job<LocationsJobData>) {
const { userId, ...params } = job.data;
const results = await searchLocations(params, userId);
return { count: results.length, placeIds: results.map((r: any) => r.place_id) };
}
}
@Worker(EMAIL_JOB_NAME)
export class EmailWorker extends AbstractWorker<EmailJobData> {
readonly queueName = EMAIL_JOB_NAME;
readonly teamSize = EMAIL_MAX_CONCURRENT_JOBS;
calculateCost(job: Job<EmailJobData>, result: any): number {
return EMAIL_SEARCH_COST;
}
protected async process(job: Job<EmailJobData>) {
logger.info(`[EmailWorker] Processing job ${job.id} for place_id: ${job.data.place_id}`);
const { place_id, userId } = job.data;
// 1. Fetch location details
const location = await getLocationByPlaceId(place_id);
if (!location) {
throw new Error('Location not found');
}
// 2. Check for existing emails (maybe skip this if forceRefresh is handled by caller? assuming always fresh search requested or caller handles cache)
// For job simplicity, we always assume we want to enrich if we are here.
// But we should check if website exists
if (!location.website) {
throw new Error('No website available');
}
// 3. Ensure Meta Data
if (!location.meta || !location.meta.pages) {
logger.info(`[EmailWorker] Meta missing for ${place_id} : ${location}, fetching...`);
const meta = await parseHtml(location.website, null, {
headless: true,
timeout: EMAIL_META_TIMEOUT_MS
});
if (meta) {
location.meta = { ...location.meta, ...meta } as any;
// Update DB with meta
await supabase.from('locations').update({ meta: location.meta }).eq('place_id', place_id);
}
}
// 4. Execute Email Scraping
logger.info(`[EmailWorker] Starting email search for ${place_id} : ${location.title}`);
let isCancelled = false;
const checkCancelled = async () => {
if (isCancelled) return true;
if (!this.boss) return false;
try {
const currentJob = await this.boss.getJobById(job.name, job.id);
const dbCancelled = !!(currentJob && (currentJob as any).state === 'cancelled');
if (dbCancelled) isCancelled = true;
return isCancelled;
} catch (e) {
return false;
}
};
const searchPromise = findEmailEach(location, {
headless: process.env.EMAIL_SEARCH_HEADLESS === 'false' ? false : true,
searchFrom: 'api',
abortAfter: 1,
maxPages: EMAIL_SEARCH_MAX_PAGES,
pageTimeout: EMAIL_SEARCH_PAGE_TIMEOUT_MS,
concurrency: EMAIL_SEARCH_PAGE_CONCURRENCY,
checkCancelled
}, async (page: Page) => {
logger.info(`[EmailWorker] Found email for ${place_id}`);
if (await checkCancelled()) {
logger.info(`[EmailWorker] Job cancelled for ${place_id}`);
throw new Error('Cancelled');
}
});
// We use a timeout wrapper similar to original code
let emails: string[] = [];
logger.info(`[EmailWorker] Starting search with timeout: ${EMAIL_SEARCH_TIMEOUT_MS}ms`);
try {
const result = await Promise.race([
searchPromise,
new Promise((_, reject) => setTimeout(() => {
logger.warn(`[EmailWorker] Timeout reached for ${place_id}, signalling cancellation...`);
isCancelled = true;
reject(new Error('Timeout'));
}, EMAIL_SEARCH_TIMEOUT_MS))
]);
if (result) emails = result as string[];
} catch (e: any) {
if (e.message === 'Timeout') {
logger.warn(`[EmailWorker] Timeout searching emails for ${place_id}`);
// Proceed with what we have (empty)
} else if (e.message === 'Cancelled' || e.message === 'CancelledByUser') {
logger.warn(`[EmailWorker] Job cancelled for ${place_id}`);
// Proceed with what we have (likely empty or partial)
} else {
throw e;
}
}
// Format emails
const emailsWithMetadata = emails.map((email) => ({
email,
source: location.website || '',
foundAt: new Date().toISOString(),
tool: 'puppeteer'
}));
// Update location with emails
if (emailsWithMetadata.length > 0) {
logger.info(`[EmailWorker] Updating location ${place_id} with ${emailsWithMetadata.length} emails`);
await supabase
.from('locations')
.update({
meta: {
...location.meta,
emails: emailsWithMetadata // Replaces emails with fresh finding
},
updated_at: new Date().toISOString()
})
.eq('place_id', place_id);
} else {
logger.info(`[EmailWorker] Search finished for ${place_id}. No emails found.`);
}
return {
place_id,
emailsCount: emailsWithMetadata.length,
emails: emailsWithMetadata
};
}
}

View File

@ -0,0 +1,375 @@
import { createRoute } from '@hono/zod-openapi';
import { LocationRequestSchema, LocationResponseSchema, LocationDetailResponseSchema } from './schemas.js';
import { Public } from '@/commons/decorators.js';
import { z } from 'zod';
import { authMiddleware, optionalAuthMiddleware } from '../../middleware/auth.js';
const tags = ['Locations'];
export const RequestSchema = z.object({
query: z.string().default(''),
location: z.string().optional().default('barcelona, spain'),
refresh: z.string().optional().default('false'),
filterCountry: z.string().optional(),
filterCity: z.string().optional(),
filterContinent: z.string().optional(),
filterType: z.string().optional(),
limit: z.string().optional().default('250'),
zoom: z.string().optional().describe('Map zoom level'),
excludedTypes: z.union([z.string(), z.array(z.string())]).optional().transform((val) => {
if (!val) return undefined;
if (typeof val === 'string') return [val];
return val;
}),
}).loose()
export type CompetitorRequest = z.infer<typeof RequestSchema>;
export const cancelJobRoute = createRoute({
method: 'post',
path: '/api/competitors/email/job/{jobId}/cancel',
tags,
middleware: [authMiddleware] as const,
request: {
params: z.object({
jobId: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({
message: z.string(),
}),
},
},
description: 'Job cancelled successfully',
},
404: {
description: 'Job not found',
},
500: {
description: 'Server error',
},
},
});
export const getLocationsRoute = createRoute({
method: 'get',
path: '/api/competitors',
tags,
middleware: [authMiddleware] as const,
request: {
query: RequestSchema,
},
responses: {
202: {
content: {
'application/json': {
schema: z.object({
message: z.string(),
jobId: z.string().nullable(),
}),
},
},
description: 'Job submitted',
},
200: {
content: {
'application/json': {
schema: LocationResponseSchema,
},
},
description: 'Retrieve locations (cached)',
},
500: {
description: 'Server error',
},
},
});
export const getLocationsStreamRoute = createRoute({
method: 'get',
path: '/api/competitors/stream',
tags,
middleware: [authMiddleware] as const,
request: {
query: LocationRequestSchema,
},
responses: {
200: {
description: 'Server-Sent Events stream of location results',
content: {
'text/event-stream': {
schema: {
type: 'string'
}
}
}
},
500: {
description: 'Server error',
},
},
})
export const getCompetitorByIdRoute = Public(createRoute({
method: 'get',
path: '/api/competitors/{place_id}',
tags,
request: {
params: z.object({
place_id: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: LocationDetailResponseSchema,
},
},
description: 'Retrieve competitor details',
},
404: {
description: 'Competitor not found',
},
500: {
description: 'Server error',
},
},
}));
export const getLlmRegionInfoRoute = Public(createRoute({
method: 'get',
path: '/api/locations/llm-info',
tags,
request: {
query: z.object({
location: z.string().describe('Location name to get info for'),
}),
},
responses: {
200: {
description: 'LLM Region Info',
content: {
'application/json': {
schema: z.object({
data: z.any(),
}),
},
},
},
500: {
description: 'Server error',
},
},
}));
export const getFindEmailStreamRoute = createRoute({
method: 'get',
path: '/api/competitors/email/stream',
tags,
middleware: [authMiddleware] as const,
request: {
query: z.object({
place_ids: z.string().describe('Comma-separated list of place IDs'),
token: z.string().optional().describe('Auth token for SSE'),
}),
},
responses: {
200: {
description: 'Server-Sent Events stream of email finding results',
content: {
'text/event-stream': {
schema: {
type: 'string'
}
}
}
},
500: {
description: 'Server error',
},
},
});
export const getRegionSearchRoute = Public(createRoute({
method: 'get',
path: '/api/regions/search',
tags,
request: {
query: z.object({
query: z.string().describe('Region name to search for'),
content_level: z.string().optional().describe('Filter by content level (e.g. 1)'),
geojson: z.string().optional().describe('Include GeoJSON geometry (true/false)'),
}),
},
responses: {
200: {
description: 'Search results',
content: {
'application/json': {
schema: z.object({
data: z.array(z.any()),
}),
},
},
},
500: {
description: 'Server error',
},
},
}));
export const getRegionBoundaryRoute = Public(createRoute({
method: 'get',
path: '/api/regions/boundary/{id}',
tags,
request: {
params: z.object({
id: z.string().describe('GADM ID of the region'),
}),
query: z.object({
name: z.string().optional().describe('Region Name (Required for GADM fetch)'),
level: z.string().optional().describe('Content Level'),
}),
},
responses: {
200: {
description: 'GeoJSON boundary',
content: {
'application/json': {
schema: z.object({
data: z.any(), // GeoJSON object
}),
},
},
},
404: {
description: 'Region not found',
},
500: {
description: 'Server error',
},
},
}));
export const getRegionReverseRoute = Public(createRoute({
method: 'get',
path: '/api/regions/reverse',
tags,
request: {
query: z.object({
lat: z.string().describe('Latitude'),
lon: z.string().describe('Longitude'),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({
data: z.any().describe('Reverse geocoding result')
}),
},
},
description: 'Reverse geocoding successful',
},
500: {
content: {
'application/json': {
schema: z.object({
error: z.string(),
}),
},
},
description: 'Server error',
},
},
}));
export const getRegionNamesRoute = Public(createRoute({
method: 'get',
path: '/api/regions/names',
tags,
request: {
query: z.object({
admin: z.string().describe('Admin code (e.g. FRA, FRA.1_1)'),
content_level: z.string().describe('Content level (e.g. 2)').default('2'),
}),
},
responses: {
200: {
description: 'Region names results',
content: {
'application/json': {
schema: z.object({
data: z.array(z.any()),
}),
},
},
},
500: {
description: 'Server error',
},
},
}));
export const getWikiSearchRoute = Public(createRoute({
method: 'get',
path: '/api/locations/wiki',
tags,
request: {
query: z.object({
lat: z.string().describe('Latitude'),
lon: z.string().describe('Longitude'),
radius: z.string().optional().default('10000').describe('Radius in meters (max 10000)'), // WP limit is usually 10000
limit: z.string().optional().default('10').describe('Limit results'),
}),
},
responses: {
200: {
description: 'Wiki geosearch results',
content: {
'application/json': {
schema: z.object({
data: z.array(z.any()),
}),
},
},
},
500: {
description: 'Server error',
},
},
}));
export const getEnrichStreamRoute = createRoute({
method: 'get',
path: '/api/competitors/enrich/stream',
tags,
middleware: [authMiddleware] as const,
request: {
query: z.object({
place_ids: z.string().describe('Comma-separated list of place IDs'),
enrichers: z.string().describe('Comma-separated list of enrichers (e.g. meta)'),
}),
},
responses: {
200: {
description: 'Server-Sent Events stream of enrichment results',
content: {
'text/event-stream': {
schema: {
type: 'string'
}
}
}
},
500: {
description: 'Server error',
},
},
});

View File

@ -0,0 +1,28 @@
import { z } from '@hono/zod-openapi';
import { CompetitorResponseSchema, CompetitorDetailResponseSchema } from '@polymech/shared';
// Define Request Schema locally for server/OpenAPI compatibility
export const LocationRequestSchema = z.object({
query: z.string().default('plastichub'),
location: z.string().optional().default('barcelona, spain'),
refresh: z.string().optional().default('false'),
filterCountry: z.string().optional(),
filterCity: z.string().optional(),
filterContinent: z.string().optional(),
filterType: z.string().optional(),
concurrency: z.string().optional().default('5'),
limit: z.string().optional().default('250'),
zoom: z.string().optional().describe('Map zoom level'),
excludedTypes: z.union([z.string(), z.array(z.string())]).optional().transform((val) => {
if (!val) return undefined;
if (typeof val === 'string') return [val];
return val;
}),
});
export const LocationResponseSchema = CompetitorResponseSchema;
export const LocationDetailResponseSchema = CompetitorDetailResponseSchema;
export type LocationRequest = z.infer<typeof LocationRequestSchema>;
export type LocationResponse = z.infer<typeof LocationResponseSchema>;
export type LocationDetailResponse = z.infer<typeof LocationDetailResponseSchema>;

View File

@ -0,0 +1,107 @@
import { createRoute, RouteHandler } from '@hono/zod-openapi';
import { streamSSE } from 'hono/streaming';
import { LocationRequestSchema } from './schemas.js';
import { searchLocations } from './jobs-find.js';
import { get_cached, params_hash } from './cache.js';
import { HonoEnv } from '@/commons/types.js';
import { logger } from './logger.js';
const tags = ['Locations'];
export const getLocationsStreamRoute = createRoute({
method: 'get',
path: '/api/locations/stream',
tags,
request: {
query: LocationRequestSchema,
},
responses: {
200: {
description: 'Server-Sent Events stream of location results',
content: {
'text/event-stream': {
schema: {
type: 'string'
}
}
}
},
500: {
description: 'Server error',
},
},
});
export const stream_get: RouteHandler<typeof getLocationsStreamRoute, HonoEnv> = async (c) => {
const query = c.req.valid('query');
const { refresh } = query;
const forceRefresh = refresh === 'true';
const userId = c.get('userId');
if (!userId) {
logger.error('Unauthorized : no userId');
return c.json({ message: 'Unauthorized' }, 401);
}
const inputHash = params_hash({ query: query.query, location: query.location });
return streamSSE(c, async (stream) => {
try {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({ stage: 'starting', percent: 0 })
});
if (!forceRefresh) {
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({ stage: 'checking_cache', percent: 10 })
});
const cached = await get_cached(inputHash);
if (cached) {
for (let i = 0; i < cached.length; i++) {
await stream.writeSSE({
event: 'result',
data: JSON.stringify(cached[i])
});
}
await stream.writeSSE({
event: 'complete',
data: JSON.stringify({ total: cached.length, cached: true })
});
return;
}
}
await stream.writeSSE({
event: 'progress',
data: JSON.stringify({ stage: 'fetching_from_api', percent: 20 })
});
// Note: searchLocations currently awaits all results.
// Ideally, we would refactor searchLocations to yield results or accept a callback.
// For now, we fetch all then stream.
const results = await searchLocations(query as any, userId);
for (let i = 0; i < results.length; i++) {
await stream.writeSSE({
event: 'result',
data: JSON.stringify(results[i])
});
}
await stream.writeSSE({
event: 'complete',
data: JSON.stringify({ total: results.length, cached: false })
});
} catch (error: any) {
logger.error(error, 'Stream error');
await stream.writeSSE({
event: 'error',
data: JSON.stringify({ error: error.message || 'Internal Server Error' })
});
}
});
};

View File

@ -0,0 +1,34 @@
import { PgBoss } from 'pg-boss';
import { AbstractProduct } from './AbstractProduct.js';
import { LocationsProduct } from './locations/index.js';
import './subscriber.js';
export const ALL_PRODUCTS: AbstractProduct[] =
[
new LocationsProduct()
];
// Helper to get all workers
export const getAllWorkers = () => {
return ALL_PRODUCTS.flatMap(p => p.workers || []);
};
// Helper to register routes
export const registerProductRoutes = (app: any) => {
ALL_PRODUCTS.forEach(product => {
product.routes.forEach(route => {
// @ts-ignore - Hono types might mismatch slightly
app.openapi(route.definition, route.handler);
});
});
};
// Helper to initialize products (lifecycle: start)
export const startProducts = async (boss: PgBoss) => {
for (const product of ALL_PRODUCTS) {
await product.start(boss);
}
};

View File

@ -0,0 +1,42 @@
import { ALL_PRODUCTS } from './registry.js';
import { EventBus } from './EventBus.js';
const findProductByQueue = (queue: string) => {
return ALL_PRODUCTS.find(p =>
p.workers?.some(w => {
try {
const worker = new (w as any)();
return worker.queueName === queue;
} catch (e) {
return false;
}
})
);
};
EventBus.on('job:create', (event: any) => {
const product = findProductByQueue(event.queue);
if (!product) return;
// Apply default job options from product if available
if (product.jobOptions) {
event.options = { ...product.jobOptions, ...event.options };
}
const singletonKey = product.hash(event.data);
if (singletonKey) {
event.options.singletonKey = singletonKey;
// Default to 5 minutes if not specified
if (!event.options.singletonSeconds) {
event.options.singletonSeconds = 300;
}
}
const { userId } = event.data;
if (userId) {
const metadata = product.meta(userId);
event.data = { ...event.data, ...metadata };
}
});