diff --git a/packages/shared/src/server/AbstractProduct.ts b/packages/shared/src/server/AbstractProduct.ts new file mode 100644 index 00000000..6c6f55e4 --- /dev/null +++ b/packages/shared/src/server/AbstractProduct.ts @@ -0,0 +1,275 @@ +import EventEmitter from 'events'; +import { PgBoss } from 'pg-boss'; +import { createHash } from 'crypto'; +import { streamSSE } from 'hono/streaming'; +import { EventBus } from './EventBus.js'; +import { ProductErrorCode } from './enums.js'; +import { ProductError } from './errors.js'; +import { logger } from '../commons/logger.js'; + +export interface JobCreationEvent { + queue: string; + data: any; + options: any; +} + +export interface StreamOptions { + data: TData; + userId: string; + forceRefresh?: boolean; + fetcher: (data: TData, userId: string) => Promise; + cacheChecker?: (hash: string) => Promise; +} + +export abstract class AbstractProduct extends EventEmitter { + abstract readonly id: string; + abstract readonly jobOptions: any; + abstract readonly actions: Record; + abstract readonly workers: any[]; + abstract readonly routes: any[]; + + protected boss: PgBoss | null = null; + protected workerSubscriptions: string[] = []; + + async start(boss: PgBoss) { + try { + this.boss = boss; + await this.registerWorkers(boss); + await this.onStart(boss); + } catch (error: any) { + throw new ProductError(ProductErrorCode.START_FAILED, { + message: `Failed to start product ${this.id}: ${error.message}`, + originalError: error + }); + } + } + + protected async onStart(boss: PgBoss) { + // Optional hook for subclasses + } + + async stop() { + try { + await this.unregisterWorkers(); + await this.onStop(); + } catch (error: any) { + throw new ProductError(ProductErrorCode.STOP_FAILED, { + message: `Failed to stop product ${this.id}: ${error.message}`, + originalError: error + }); + } + } + + protected async onStop() { + // Optional hook + } + + async pause() { + try { + await this.unregisterWorkers(); + } catch (error: any) { + throw new ProductError(ProductErrorCode.PAUSE_FAILED, { + message: `Failed to pause product ${this.id}: ${error.message}`, + originalError: error + }); + } + } + + async resume() { + if (!this.boss) { + throw new ProductError(ProductErrorCode.RESUME_FAILED, 'PgBoss not initialized'); + } + try { + await this.registerWorkers(this.boss); + } catch (error: any) { + throw new ProductError(ProductErrorCode.RESUME_FAILED, { + message: `Failed to resume product ${this.id}: ${error.message}`, + originalError: error + }); + } + } + + protected async registerWorkers(boss: PgBoss) { + if (!this.workers) return; + + for (const WorkerClass of this.workers) { + try { + // @ts-ignore + const workerInstance = new WorkerClass(); + // Inject the EventBus so the worker can emit job events + (workerInstance as any).emitter = EventBus; + // Inject boss instance for advanced operations like cancellation check + (workerInstance as any).boss = boss; + + logger.info(`[${this.id}] Registering worker for queue: ${workerInstance.queueName}`); + + await boss.createQueue(workerInstance.queueName, workerInstance.queueOptions); + + const workOptions = (workerInstance as any).teamSize ? { teamSize: (workerInstance as any).teamSize } : {}; + const subscriptionId = await boss.work(workerInstance.queueName, workOptions as any, (job: any) => workerInstance.handler(job)); + this.workerSubscriptions.push(subscriptionId); + } catch (error: any) { + throw new ProductError(ProductErrorCode.WORKER_REGISTRATION_FAILED, { + message: `Failed to register worker for ${this.id}: ${error.message}`, + worker: WorkerClass.name + }); + } + } + } + + protected async unregisterWorkers() { + if (!this.boss) return; + + for (const subId of this.workerSubscriptions) { + try { + // @ts-ignore - Assuming offWork exists in PgBoss type or at runtime + await this.boss.offWork(subId); + } catch (error: any) { + logger.warn(`[${this.id}] Failed to unregister worker subscription ${subId}: ${error.message}`); + } + } + this.workerSubscriptions = []; + } + + async sendJob(queue: string, data: TJobData, options: any = {}) { + if (!this.boss) { + throw new ProductError(ProductErrorCode.JOB_SUBMISSION_FAILED, 'PgBoss not initialized'); + } + const event: JobCreationEvent = { queue, data, options }; + // Emit event to allow subscribers to modify data/options + EventBus.emit('job:create', event); + + try { + return await this.boss.send(queue, event.data, event.options); + } catch (error: any) { + throw new ProductError(ProductErrorCode.JOB_SUBMISSION_FAILED, { + message: `Failed to send job to ${queue}: ${error.message}`, + queue + }); + } + } + + async waitForJob(jobId: string, timeoutMs: number = 60000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + reject(new ProductError(ProductErrorCode.JOB_TIMEOUT, { message: 'Job timeout', jobId })); + }, timeoutMs); + + const onComplete = (event: any) => { + if (event.jobId === jobId) { + cleanup(); + resolve(event.result); + } + }; + + const cleanup = () => { + clearTimeout(timer); + EventBus.off('job:complete', onComplete); + }; + + EventBus.on('job:complete', onComplete); + }); + } + + async waitForHash(targetHash: string, timeoutMs: number = 60000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + reject(new ProductError(ProductErrorCode.JOB_TIMEOUT, { message: 'Job timeout (hash wait)', hash: targetHash })); + }, timeoutMs); + + const onComplete = (event: any) => { + if (!event.data) return; + try { + const eventHash = this.hash(event.data); + if (eventHash === targetHash) { + cleanup(); + resolve(event.result); + } + } catch (e) { + // Ignore hashing errors (mismatched data types from other queues) + } + }; + + const cleanup = () => { + clearTimeout(timer); + EventBus.off('job:complete', onComplete); + }; + + EventBus.on('job:complete', onComplete); + }); + } + + protected async handleStream(c: any, options: StreamOptions) { + const { data, userId, forceRefresh, fetcher, cacheChecker } = options; + + const inputHash = this.generateHash(data); + + return streamSSE(c, async (stream) => { + try { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ stage: 'starting', percent: 0 }) + }); + + if (!forceRefresh && cacheChecker) { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ stage: 'checking_cache', percent: 10 }) + }); + + const cached = await cacheChecker(inputHash); + if (cached) { + for (let i = 0; i < cached.length; i++) { + await stream.writeSSE({ + event: 'result', + data: JSON.stringify(cached[i]) + }); + } + await stream.writeSSE({ + event: 'complete', + data: JSON.stringify({ total: cached.length, cached: true }) + }); + return; + } + } + + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ stage: 'fetching_from_api', percent: 20 }) + }); + + const results = await fetcher(data, userId); + + for (let i = 0; i < results.length; i++) { + await stream.writeSSE({ + event: 'result', + data: JSON.stringify(results[i]) + }); + } + + await stream.writeSSE({ + event: 'complete', + data: JSON.stringify({ total: results.length, cached: false }) + }); + + } catch (error: any) { + logger.error(error, `[${this.id}] Stream error`); + await stream.writeSSE({ + event: 'error', + data: JSON.stringify({ error: error.message || 'Internal Server Error' }) + }); + } + }); + } + + // Helper for hashing + protected generateHash(params: any) { + const normalizedInput = JSON.stringify(params, Object.keys(params).sort()); + return createHash('sha256').update(normalizedInput).digest('hex'); + } + + abstract hash(data: TJobData): string; + abstract meta(userId: string): any; +} diff --git a/packages/shared/src/server/EventBus.ts b/packages/shared/src/server/EventBus.ts new file mode 100644 index 00000000..b532e439 --- /dev/null +++ b/packages/shared/src/server/EventBus.ts @@ -0,0 +1,3 @@ +import EventEmitter from 'events'; + +export const EventBus = new EventEmitter(); diff --git a/packages/shared/src/server/enums.ts b/packages/shared/src/server/enums.ts new file mode 100644 index 00000000..fe66ec2b --- /dev/null +++ b/packages/shared/src/server/enums.ts @@ -0,0 +1,22 @@ +export enum ProductErrorCode { + // Lifecycle Errors + START_FAILED = 'PRODUCT_START_FAILED', + STOP_FAILED = 'PRODUCT_STOP_FAILED', + PAUSE_FAILED = 'PRODUCT_PAUSE_FAILED', + RESUME_FAILED = 'PRODUCT_RESUME_FAILED', + + // Worker Errors + WORKER_REGISTRATION_FAILED = 'WORKER_REGISTRATION_FAILED', + WORKER_NOT_FOUND = 'WORKER_NOT_FOUND', + + // Job Errors + JOB_SUBMISSION_FAILED = 'JOB_SUBMISSION_FAILED', + JOB_TIMEOUT = 'JOB_TIMEOUT', + + // Configuration Errors + INVALID_CONFIG = 'INVALID_CONFIG', + MISSING_DEPENDENCY = 'MISSING_DEPENDENCY', + + // Generic + UNKNOWN_ERROR = 'UNKNOWN_ERROR' +} diff --git a/packages/shared/src/server/errors.ts b/packages/shared/src/server/errors.ts new file mode 100644 index 00000000..a9f5644b --- /dev/null +++ b/packages/shared/src/server/errors.ts @@ -0,0 +1,29 @@ +import { ProductErrorCode } from './enums.js'; + +export interface ProductErrorPayload { + message: string; + [key: string]: any; +} + +export class ProductError extends Error { + public readonly code: ProductErrorCode; + public readonly payload: ProductErrorPayload; + + constructor(code: ProductErrorCode, payload: ProductErrorPayload | string) { + const message = typeof payload === 'string' ? payload : payload.message; + super(message); + this.code = code; + this.payload = typeof payload === 'string' ? { message: payload } : payload; + + // Restore prototype chain + Object.setPrototypeOf(this, new.target.prototype); + } + + toJSON() { + return { + code: this.code, + message: this.message, + payload: this.payload + }; + } +} diff --git a/packages/shared/src/server/locations/__tests__/e2e-gadm.test.ts b/packages/shared/src/server/locations/__tests__/e2e-gadm.test.ts new file mode 100644 index 00000000..774a6d6e --- /dev/null +++ b/packages/shared/src/server/locations/__tests__/e2e-gadm.test.ts @@ -0,0 +1,94 @@ +import { describe, it, expect, beforeAll } from 'vitest'; +import * as dotenv from 'dotenv'; +import path from 'path'; + +// Load env from server root +dotenv.config({ path: path.resolve(__dirname, '../../../../.env') }); + +describe('PyGADM Integration', () => { + let LocationsProduct: any; + + beforeAll(async () => { + const productModule = await import('../index.js'); + LocationsProduct = new productModule.LocationsProduct(); + }); + + it('should search for regions using python wrapper', async () => { + const c = { + req: { + valid: () => ({ query: 'France' }) + }, + json: (data: any, status: number) => ({ data, status }) + }; + + const result = await LocationsProduct.handleGetRegionSearch(c); + if (result.status !== 200) { + console.error('PyGADM Search Error:', JSON.stringify(result, null, 2)); + } + expect(result.status).toBe(200); + expect(result.data).toBeDefined(); + // console.log('PyGADM Search Result:', JSON.stringify(result.data, null, 2)); + }); + + it('should search for regions with content_level', async () => { + const c = { + req: { + valid: () => ({ query: 'France', content_level: '1' }) + }, + json: (data: any, status: number) => ({ data, status }) + }; + + const result = await LocationsProduct.handleGetRegionSearch(c); + expect(result.status).toBe(200); + expect(result.data).toBeDefined(); + }); + + it('should search for regions with geojson output', async () => { + const c = { + req: { + valid: () => ({ query: 'France', content_level: '1', geojson: 'true' }) + }, + json: (data: any, status: number) => ({ data, status }) + }; + + const result = await LocationsProduct.handleGetRegionSearch(c); + // console.log('PyGADM GeoJSON Search Result:', JSON.stringify(result.data, null, 2)); + expect(result.status).toBe(200); + expect(result.data).toBeDefined(); + expect(result.data.type).toBe('FeatureCollection'); + expect(result.data.features).toBeDefined(); + expect(Array.isArray(result.data.features)).toBe(true); + }); + + it('should get boundary using python wrapper', async () => { + const c = { + req: { + valid: () => ({ id: 'FRA.1_1' }) // Region for Auvergne-Rhône-Alpes + }, + json: (data: any, status: number) => ({ data, status }) + }; + + const result = await LocationsProduct.handleGetRegionBoundary(c); + if (result.status === 200) { + expect(result.data).toBeDefined(); + } else { + console.log('PyGADM Boundary Result Status:', result.status, result.data); + } + }); + it('should get region names using python wrapper', async () => { + const c = { + req: { + valid: () => ({ admin: 'FRA', content_level: '2' }) + }, + json: (data: any, status: number) => ({ data, status }) + }; + + const result = await LocationsProduct.handleGetRegionNames(c); + if (result.status !== 200) { + console.error('PyGADM Names Error:', JSON.stringify(result, null, 2)); + } + expect(result.status).toBe(200); + expect(result.data).toBeDefined(); + console.log('PyGADM Names Result (Subset):', JSON.stringify(result.data.data.slice(0, 2), null, 2)); + }); +}); diff --git a/packages/shared/src/server/locations/__tests__/e2e.test.ts b/packages/shared/src/server/locations/__tests__/e2e.test.ts new file mode 100644 index 00000000..698b35e6 --- /dev/null +++ b/packages/shared/src/server/locations/__tests__/e2e.test.ts @@ -0,0 +1,313 @@ + +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import * as dotenv from 'dotenv'; + +import path from 'path'; +// Load env from server root +dotenv.config({ path: path.resolve(__dirname, '../../../../.env') }); +import { PgBoss } from 'pg-boss'; +// import { submitJob } from '../../events.js'; +// Ensure registry is loaded to register subscriber +import '../../registry.js'; +import { LOCATIONS_JOB_OPTIONS } from '../constants.js'; + +// Dynamic imports to ensure env vars are loaded before modules that use them +let boss: PgBoss; +let startBoss: any; +let SearchWorker: any; +let supabase: any; +let logger: any; + +describe('Locations Product E2E', () => { + let worker: any; + let TEST_USER_ID: string; + const TEST_QUERY = 'plastichub'; + const TEST_LOCATION = 'Barcelona, Spain'; + + beforeAll(async () => { + // Import modules now that env vars are loaded + // Adjust paths relative to this file: src/products/locations/__tests__/e2e.test.ts + const clientModule = await import('../../../jobs/boss/client.js'); + boss = clientModule.boss as PgBoss; + startBoss = clientModule.startBoss; + + // Import the worker from the product + const workerModule = await import('../pgboss.js'); + SearchWorker = workerModule.LocationsWorker; + + const supabaseModule = await import('../../../commons/supabase.js'); + supabase = supabaseModule.supabase; + + const loggerModule = await import('../../../commons/logger.js'); + logger = loggerModule.logger; + + + // Ensure boss is connected + if (!boss) { + throw new Error('PgBoss not initialized (check DATABASE_URL)'); + } + + await startBoss(); + + // Initialize product (lifecycle start) + // We need to import LocationsProduct to call start + const productModule = await import('../index.js'); + const LocationsProduct = productModule.LocationsProduct; + if ((LocationsProduct as any).start) { + await (LocationsProduct as any).start(boss); + } + + // Authenticate as the test user + const { data, error } = await supabase.auth.signInWithPassword({ + email: 'sales@plastic-hub.com', + password: 'customer', + }); + + if (error || !data.session) { + console.error('Failed to sign in test user:', error); + throw new Error('Failed to sign in test user'); + } + + TEST_USER_ID = data.user.id; + logger.info(`E2E Test: Authenticated as ${TEST_USER_ID}`); + + // Clean up existing data + const { error: deleteError } = await supabase + .from('locations') + .delete() + .eq('user_id', TEST_USER_ID) + .eq('title', 'Plastic Hub'); + + if (deleteError) { + logger.warn('Failed to clean up existing locations:', deleteError); + } else { + logger.info('Cleaned up existing locations for test'); + } + + // Register the worker in this process + worker = new SearchWorker(); + logger.info(`Worker Queue Name: ${worker.queueName}`); + await boss.createQueue(worker.queueName); + await boss.work(worker.queueName, worker.handler.bind(worker)); + + logger.info('E2E Test: Worker started'); + }); + + afterAll(async () => { + if (boss) { + // Delete jobs from the queue to clean up + // Note: pg-boss doesn't have a simple "delete all from queue" method exposed easily on the instance + // but we can try to stop the boss which stops processing. + // The user asked: "make sure that all jobs are being deleted after the test" + // We can use raw SQL via supabase or just stop the boss. + // Let's try to clear the queue using SQL if possible, or just rely on stop. + // Actually, we can use boss.fetch() to drain the queue or just leave it. + // But to be "clean", maybe we can delete the specific job we created if we had the ID. + + await boss.stop(); + } + }); + + it('should process a real search job and update Supabase', async () => { + if (!boss) return; + + // 1. Send Job + const jobData = { + query: TEST_QUERY, + location: TEST_LOCATION, + userId: TEST_USER_ID, + filters: { + concurrency: 1 + } + }; + + let jobId: string | null = null; + + try { + // Use submitJob helper to trigger event-based metadata injection + // Use submitJob helper to trigger event-based metadata injection + // jobId = await submitJob(worker.queueName, jobData, {}, boss); + jobId = await boss.send(worker.queueName, jobData); + expect(jobId).toBeDefined(); + logger.info(`Job submitted: ${jobId}`); + + // Wait for job to be processed. + // Since we are running the worker in the same process, it should pick it up. + // We can poll the database for the result. + + let attempts = 0; + const maxAttempts = 20; // 20 * 1s = 20s + let found = false; + + while (attempts < maxAttempts && !found) { + await new Promise(resolve => setTimeout(resolve, 1000)); + const { data: locations } = await supabase + .from('locations') + .select('place_id, title') + .eq('user_id', TEST_USER_ID) + .eq('title', 'Plastic Hub'); + + if (locations && locations.length > 0) { + found = true; + logger.info('Locations found:', locations.length); + expect(locations).toBeDefined(); + expect(locations.length).toBeGreaterThan(0); + expect(locations[0].title).toBe('Plastic Hub'); + } + attempts++; + } + + if (!found) { + throw new Error('Timeout waiting for job to process and update DB'); + } + + } catch (err: any) { + logger.error({ err }, 'Error sending/processing job'); + throw err; + } finally { + if (jobId) { + // Try to cancel/delete the job to clean up + try { + await boss.deleteJob(worker.queueName, jobId); + logger.info(`Cleaned up job ${jobId}`); + } catch (e) { + logger.warn(`Failed to clean up job ${jobId}: ${e}`); + } + } + } + }, 30000); // Increased timeout + + it('should prevent duplicate jobs using singleton check and verify metadata', async () => { + if (!boss) return; + + const jobData = { + query: 'duplicate-check', + userId: '39d5210c-23b0-430e-bbfc-00b48f67c899', + filters: { + concurrency: 1 + }, + location: 'Nowhere' + }; + + // First submission should succeed + // First submission should succeed + // const jobId1 = await submitJob(worker.queueName, jobData, {}, boss); + const jobId1 = await boss.send(worker.queueName, jobData); + expect(jobId1).toBeDefined(); + logger.info(`Singleton Test: First job submitted: ${jobId1}`); + + // Verify metadata + const job = await boss.getJobById(worker.queueName, jobId1!); + expect(job).not.toBeNull(); + if (job) { + expect(job.data).toBeDefined(); + // Check if metadata was injected (cost, timestamp) + // Note: userId is already in jobData, but getMetadata adds cost and timestamp + expect((job.data as any).cost).toBeDefined(); + expect((job.data as any).timestamp).toBeDefined(); + expect((job.data as any).userId).toBe(jobData.userId); + + // Verify job options from constants + // Priority Low = 10 + expect(job.priority).toBe(10); + expect(job.retryLimit).toBe(5); + + logger.info('Singleton Test: Metadata verification successful'); + } else { + logger.warn('Singleton Test: Job not found (might be completed/archived)'); + } + + // Second submission should fail (return null) because of singletonKey + // Second submission should fail (return null) because of singletonKey + // const jobId2 = await submitJob(worker.queueName, jobData, {}, boss); + const jobId2 = await boss.send(worker.queueName, jobData); + expect(jobId2).toBeNull(); + logger.info('Singleton Test: Duplicate job correctly rejected (returned null)'); + + // Clean up + if (jobId1) { + try { + await boss.deleteJob(worker.queueName, jobId1); + logger.info(`Singleton Test: Cleaned up job ${jobId1}`); + } catch (e) { + logger.warn(`Singleton Test: Failed to clean up job ${jobId1}: ${e}`); + } + } + }); + + it('should resume cancelled jobs on product start', async () => { + + const options = { retryDelay: 1, retryBackoff: true, retryDelayMax: 10 } + const batchSize = 4 + /* + await Promise.all([ + boss.send(worker.queueName, null, options), + boss.send(worker.queueName, null, options), + boss.send(worker.queueName, null, options), + boss.send(worker.queueName, null, options) + ])*/ + + // const jobs = await boss.fetch(worker.queueName, { batchSize, includeMetadata: true }) + + // 1. Submit a job with delay so we can cancel it before it runs + + const jobData = { + query: TEST_QUERY, + location: TEST_LOCATION, + userId: TEST_USER_ID + }; + + // Use a unique query to avoid singleton check (or ensure previous jobs are gone) + // We can append timestamp to query + jobData.query = `${TEST_QUERY}-${Date.now()}`; + + // const jobId = await submitJob(boss, worker.queueName, jobData, { startAfter: 10 }); + const job1 = await boss.send(worker.queueName, LOCATIONS_JOB_OPTIONS, options); + await boss.work(worker.queueName, async ([job]) => { + debugger + console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`) + }) + const jobs2 = await boss.fetch(worker.queueName) + const jobId = '' + expect(jobId).toBeDefined(); + logger.info(`Resume Test: Job submitted: ${jobId}`); + + + // 2. Cancel the job + await boss.cancel(worker.queueName, jobId!); + + // 3. Verify it is cancelled + let job = await boss.getJobById(worker.queueName, jobId!); + expect(job).not.toBeNull(); + expect(job!.state).toBe('cancelled'); + logger.info(`Resume Test: Job cancelled`); + + // 4. Call LocationsProduct.start to resume + const productModule = await import('../index.js'); + const LocationsProduct = productModule.LocationsProduct; + if ((LocationsProduct as any).start) { + await (LocationsProduct as any).start(boss); + } + + // 5. Verify it is resumed (state should be created or active) + // Wait a bit for the resume to happen (it's async inside start) + await new Promise(resolve => setTimeout(resolve, 1000)); + + job = await boss.getJobById(worker.queueName, jobId!); + expect(job).not.toBeNull(); + logger.info(`Resume Test: Job state after resume: ${job!.state}`); + expect(job!.state).not.toBe('cancelled'); + // It should probably be 'created' (waiting for startAfter?) or 'active' if startAfter was cleared? + // pg-boss resume might reset it to created. + expect(['created', 'active', 'completed']).toContain(job!.state); + + // Clean up + if (jobId) { + try { + await boss.deleteJob(worker.queueName, jobId); + } catch (e) { + // ignore + } + } + }); +}); diff --git a/packages/shared/src/server/locations/cache.ts b/packages/shared/src/server/locations/cache.ts new file mode 100644 index 00000000..ed566ed1 --- /dev/null +++ b/packages/shared/src/server/locations/cache.ts @@ -0,0 +1,19 @@ +import { createHash } from 'crypto'; +import { getSearchByHash, getLocationsByPlaceIds } from './db.js'; + +export const params_hash = (params: any) => { + // Normalize input for hashing + // Sort keys to ensure consistent hash + const normalizedInput = JSON.stringify(params, Object.keys(params).sort()); + return createHash('sha256').update(normalizedInput).digest('hex'); +}; + +export const get_cached = async (inputHash: string) => { + const searchData = await getSearchByHash(inputHash); + + if (searchData && searchData.result_place_ids) { + const locations = await getLocationsByPlaceIds(searchData.result_place_ids); + return locations; + } + return null; +}; diff --git a/packages/shared/src/server/locations/constants.ts b/packages/shared/src/server/locations/constants.ts new file mode 100644 index 00000000..8e7a7ccd --- /dev/null +++ b/packages/shared/src/server/locations/constants.ts @@ -0,0 +1,37 @@ +// Locations Constants +export const COST_PER_SEARCH = parseInt(process.env.LOCATIONS_COST || '1'); +export const TIMEOUT_MS_LOCATIONS = parseInt(process.env.LOCATIONS_TIMEOUT_MS || '300000'); +export const MAX_CONCURRENCY_LOCATIONS = 1; // Kept hardcoded as it wasn't requested to be moved, but could be. + +export const LOCATIONS_JOB_OPTIONS = { + expireInSeconds: parseInt(process.env.LOCATIONS_JOB_EXPIRE_SECONDS || '600'), + retryBackoff: true, + retryDelayMax: parseInt(process.env.LOCATIONS_JOB_RETRY_DELAY_MAX || '1000') +}; + +// Email Constants +export const EMAIL_SEARCH_COST = parseInt(process.env.EMAIL_SEARCH_COST || '5'); +export const EMAIL_MAX_BATCH_SIZE = parseInt(process.env.MAX_PLACES_PER_REQUEST || '100'); + +export const EMAIL_META_TIMEOUT_MS = parseInt(process.env.EMAIL_META_TIMEOUT_SECONDS || '30') * 1000; +export const EMAIL_SEARCH_TIMEOUT_MS = parseInt(process.env.EMAIL_SEARCH_TIMEOUT_SECONDS || '60') * 1000; +export const EMAIL_SEARCH_PAGE_TIMEOUT_MS = parseInt(process.env.EMAIL_SEARCH_PAGE_TIMEOUT_SECONDS || '10') * 1000; +export const EMAIL_SEARCH_MAX_PAGES = parseInt(process.env.EMAIL_SEARCH_MAX_PAGES || '15'); +export const EMAIL_SEARCH_PAGE_CONCURRENCY = parseInt(process.env.EMAIL_SEARCH_PAGE_CONCURRENCY || '2'); +export const EMAIL_STREAM_TIMEOUT_MS = parseInt(process.env.EMAIL_STREAM_TIMEOUT_SECONDS || '300') * 1000; +export const EMAIL_MAX_CONCURRENT_JOBS = parseInt(process.env.EMAIL_MAX_CONCURRENT_JOBS || '5'); + +export const JOB_NAME = 'locations-find'; +export const EMAIL_JOB_NAME = 'email-find'; + +export enum JobPriority { + Low = 10, + Medium = 20, + High = 30 +} + +export const EMAIL_JOB_OPTIONS = { + priority: JobPriority.Medium, + retryLimit: parseInt(process.env.EMAIL_JOB_RETRY_LIMIT || '1'), + expireInSeconds: parseInt(process.env.EMAIL_JOB_EXPIRE_SECONDS || '60'), +}; diff --git a/packages/shared/src/server/locations/db.ts b/packages/shared/src/server/locations/db.ts new file mode 100644 index 00000000..7d35f54d --- /dev/null +++ b/packages/shared/src/server/locations/db.ts @@ -0,0 +1,71 @@ +import { supabase } from '@/commons/supabase.js'; +import { logger } from './logger.js'; + + +export const getSearchByHash = async (inputHash: string) => { + const { data, error } = await supabase + .from('searches') + .select('result_place_ids') + .eq('input_hash', inputHash) + .single(); + + if (error && error.code !== 'PGRST116') { // PGRST116 is "The result contains 0 rows" + logger.error({ err: error }, 'Error fetching search by hash'); + } + return data; +}; + +export const getLocationsByPlaceIds = async (placeIds: string[]) => { + if (placeIds.length === 0) return []; + const { data, error } = await supabase + .from('locations') + .select('*') + .in('place_id', placeIds); + + if (error) { + logger.error({ err: error }, 'Error fetching locations'); + throw error; + } + return data; +}; + +export const getLocationsMeta = async (placeIds: string[]) => { + if (placeIds.length === 0) return []; + const { data, error } = await supabase + .from('locations') + .select('place_id, meta') + .in('place_id', placeIds); + + if (error) { + logger.error({ err: error }, 'Error fetching locations meta'); + return []; + } + return data; +} + +export const upsertLocations = async (locations: any[]) => { + if (locations.length === 0) return; + const { error } = await supabase + .from('locations') + .upsert(locations, { onConflict: 'place_id' }); + + if (error) { + logger.error({ err: error }, 'Error upserting locations '); + throw error; + } +}; + +export const storeSearch = async (inputHash: string, inputParams: any, placeIds: string[]) => { + const { error } = await supabase + .from('searches') + .upsert({ + input_hash: inputHash, + input_params: inputParams, + result_place_ids: placeIds, + created_at: new Date().toISOString() + }, { onConflict: 'input_hash' }); + + if (error) { + logger.error({ err: error }, 'Error storing search'); + } +}; diff --git a/packages/shared/src/server/locations/enrichers/__tests__/meta.test.ts b/packages/shared/src/server/locations/enrichers/__tests__/meta.test.ts new file mode 100644 index 00000000..87f2dc98 --- /dev/null +++ b/packages/shared/src/server/locations/enrichers/__tests__/meta.test.ts @@ -0,0 +1,27 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { MetaEnricher } from '../meta.js'; + +// Mock axios and puppeteer if necessary, but for basic unit testing we might just want to +// check the interface or mock the private methods if we could. +// Since parseHtml is private, we test enrich(). + +describe('MetaEnricher', () => { + let enricher: MetaEnricher; + + beforeEach(() => { + enricher = new MetaEnricher(); + }); + + it('should have correct name and type', () => { + expect(enricher.name).toBe('meta'); + expect(enricher.type).toBe('meta'); + }); + + it('should return empty object if no website', async () => { + const result = await enricher.enrich({ place_id: '1', title: 'test' } as any, { userId: 'u1' }); + expect(result).toEqual({}); + }); + + // Deeper testing requires mocking parseHtml logic which uses external libs. + // For this scope, ensuring it handles basic input covers the wiring. +}); diff --git a/packages/shared/src/server/locations/enrichers/__tests__/registry.test.ts b/packages/shared/src/server/locations/enrichers/__tests__/registry.test.ts new file mode 100644 index 00000000..ec45076a --- /dev/null +++ b/packages/shared/src/server/locations/enrichers/__tests__/registry.test.ts @@ -0,0 +1,33 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { EnricherRegistry, IEnricher, EnrichmentContext } from '../registry.js'; +import { CompetitorFull } from '@polymech/shared'; + +class MockEnricher implements IEnricher { + name = 'mock'; + type = 'test'; + async enrich(location: CompetitorFull, context: EnrichmentContext) { + return { title: 'enriched' }; + } +} + +describe('EnricherRegistry', () => { + // Clear registry before each test if possible, but map is static. + // We can rely on unique names or just testing existence. + + it('should register and retrieve an enricher', () => { + const enricher = new MockEnricher(); + EnricherRegistry.register('test-mock', enricher); + + expect(EnricherRegistry.get('test-mock')).toBe(enricher); + }); + + it('should return undefined for unknown enricher', () => { + expect(EnricherRegistry.get('unknown')).toBeUndefined(); + }); + + it('should list all enrichers', () => { + const initialCount = EnricherRegistry.getAll().length; + EnricherRegistry.register('test-mock-2', new MockEnricher()); + expect(EnricherRegistry.getAll().length).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/packages/shared/src/server/locations/enrichers/meta.ts b/packages/shared/src/server/locations/enrichers/meta.ts new file mode 100644 index 00000000..614cb54b --- /dev/null +++ b/packages/shared/src/server/locations/enrichers/meta.ts @@ -0,0 +1,306 @@ +import { CompetitorFull } from '@polymech/shared'; +import { IEnricher, EnrichmentContext } from './registry.js'; +import axios, { AxiosRequestConfig } from 'axios'; +import * as cheerio from 'cheerio'; +import { URL } from 'url'; +import puppeteer from 'puppeteer'; +import puppeteerExtra from 'puppeteer-extra'; +import StealthPlugin from 'puppeteer-extra-plugin-stealth'; +import https from 'https'; +import { logger } from '@polymech/search'; + +const puppeteerExtraAny = puppeteerExtra as any; +puppeteerExtraAny.use(StealthPlugin()); + +interface Og { + [key: string]: string | undefined; +} + +interface Meta { + [key: string]: string | undefined; +} + +interface Image { + src: string; +} + +interface PageType { + url: string; + source: string; + status: string; +} + +interface Structured { + [key: string]: any; +} + +interface LocationSiteMeta { + title?: string; + description?: string; + image?: string; + url?: string; + social?: PageType[]; + seo?: { + keywords?: string[]; + structured?: Structured[]; + og?: Og; + metaTags?: Meta; + }; + pages?: PageType[]; + externalLinks?: PageType[]; + images?: Image[]; +} + +export class MetaEnricher implements IEnricher { + name = 'meta'; + type = 'meta'; + + async enrich(location: CompetitorFull, context: EnrichmentContext): Promise> { + if (!context.logger) { + context.logger = logger; + } + if (!location.website) { + if (context.logger) context.logger.debug({ placeId: location.place_id }, 'No website to enrich'); + return {}; + } + + try { + if (context.logger) context.logger.info({ website: location.website }, 'Starting meta enrichment'); + const meta = await this.parseHtml(location.website, null, { headless: true }); // Default to headless puppeteer for better success rate + + const updates: Partial = { + raw_data: { + ...location.raw_data, + meta: meta + } + }; + + if (context.logger) { + const socialCount = meta.social?.length || 0; + const emailCount = (meta.seo?.structured || []).length; // structured data count, rough proxy + context.logger.info({ + website: location.website, + socials: meta.social?.map(s => s.source), + title: meta.title ? 'Found' : 'Missing', + emails: meta.seo?.metaTags?.['email'] || 'None' + }, 'Meta enrichment complete'); + } + + // Extract social links to main object if not present + if (meta.social) { + // We can't directly assign to unmapped fields if schema doesn't exist yet, + // but implementation plan said 'schemas.ts' might be updated. + // For now, sticking to raw_data.meta is safe, but we can also return them + // and let the caller/schema handle it. + } + + return updates; + } catch (error: any) { + if (context.logger) { + context.logger.error(`Error enriching meta for ${location.website}: ${error.message}`); + } + return {}; // Return empty update on failure + } + } + + private isValidUrl(url: string) { + try { + new URL(url); + return true; + } catch (error) { + return false; + } + } + + private readMetaTags($: cheerio.CheerioAPI, name: string) { + return $(`meta[name="${name}"]`).attr('content') || $(`meta[property="${name}"]`).attr('content') || null; + } + + private static browserPromise: Promise | null = null; + private static idleTimer: NodeJS.Timeout | null = null; + private static IDLE_TIMEOUT_SECONDS = parseInt(process.env.ENRICHER_META_IDLE_TIMEOUT || '60'); + + private static resetIdleTimer() { + if (MetaEnricher.idleTimer) clearTimeout(MetaEnricher.idleTimer); + MetaEnricher.idleTimer = setTimeout(async () => { + if (MetaEnricher.browserPromise) { + // context.logger.info(`[Puppeteer] Browser idle timeout (${60}s) reached` // No Logger context here, use console or ignore + const browser = await MetaEnricher.browserPromise; + await browser.close(); + MetaEnricher.browserPromise = null; + } + }, MetaEnricher.IDLE_TIMEOUT_SECONDS * 1000); + } + + private static async getBrowser(): Promise { + MetaEnricher.resetIdleTimer(); + if (MetaEnricher.browserPromise) return MetaEnricher.browserPromise; + + logger.info(`[Puppeteer] Launching new browser`); + MetaEnricher.browserPromise = (async () => { + const browser = await puppeteerExtraAny.launch({ + headless: "new", + args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'] + }); + return browser; + })(); + + return MetaEnricher.browserPromise; + } + + private async parseHtml(url: string, config: AxiosRequestConfig | null, options: any): Promise { + if (!/(^http(s?):\/\/[^\s$.?#].[^\s]*)/i.test(url)) return {} as LocationSiteMeta; + + let content = ''; + + if (options && options.headless) { + try { + const browser = await MetaEnricher.getBrowser(); + const page = await browser.newPage(); + try { + await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'); + + // Allow shorter timeout for meta tags + const timeout = options.timeout || 15000; + + await page.goto(url, { waitUntil: 'domcontentloaded', timeout }); + content = await page.content(); + } finally { + await page.close(); + MetaEnricher.resetIdleTimer(); + } + } catch (e: any) { + // Fallback to axios if puppeteer fails (or specific connection/timeout error) + // console.error(`Puppeteer failed for ${url}: ${e.message}`); + } + } + + if (!content) { + try { + const { data } = await axios(url, { + ...config, + httpsAgent: new https.Agent({ rejectUnauthorized: false }), + headers: { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' + }, + timeout: 10000 + }); + content = data; + } catch (e: any) { + // logger.error(`Axios failed for ${url}: ${e.message}`); + return {} as LocationSiteMeta; + } + } + + const $ = cheerio.load(content); + const og: Og = {}; + const meta: Meta = {}; + const images: Image[] = []; + const links: string[] = []; + let allLinks: string[] = []; + + const title = $('title').text(); + if (title) meta.title = title; + + const canonical = $('link[rel=canonical]').attr('href'); + if (canonical) meta.url = canonical; + + ['title', 'description', 'image'].forEach(s => { + const val = this.readMetaTags($, s); + if (val) meta[s] = val; + }); + + ['og:title', 'og:description', 'og:image', 'og:url', 'og:site_name', 'og:type'].forEach(s => { + const val = this.readMetaTags($, s); + if (val) og[s.split(':')[1]] = val; + }); + + $('img').each((i: number, el: any) => { + let src = $(el).attr('src'); + if (src) { + try { + src = new URL(src, url).href; + images.push({ src }); + } catch (e) { + // ignore invalid urls + } + } + }); + + const jsonLdArray: Structured[] = []; + $('script[type="application/ld+json"]').each((_: number, element: any) => { + const jsonLdContent = $(element).html(); + if (jsonLdContent) { + try { + const jsonData = JSON.parse(jsonLdContent); + jsonLdArray.push(jsonData); + } catch (e) { + // logger.error(`Error parsing JSON-LD: ${e.message} @ ${url}`); + } + } + }); + + $('a').each((index: number, element: any) => { + let href = $(element).attr('href'); + if (href) { + try { + href = new URL(href, url).href; + + if (this.isValidUrl(href)) { + if (href.indexOf('contact') !== -1 && !links.includes(href)) { + links.push(href); + } + allLinks.push(href); + } + } catch (e) { + // Ignore invalid URLs + } + } + }); + allLinks = [...new Set(allLinks)]; + + const socialLinks: PageType[] = []; + const internalPages: PageType[] = []; + const externalLinks: PageType[] = []; + + allLinks.forEach(link => { + if (link.includes('instagram.com')) socialLinks.push({ url: link, source: 'instagram', status: 'PENDING' }); + else if (link.includes('facebook.com')) socialLinks.push({ url: link, source: 'facebook', status: 'PENDING' }); + else if (link.includes('linkedin.com')) socialLinks.push({ url: link, source: 'linkedin', status: 'PENDING' }); + else if (link.includes('youtube.com')) socialLinks.push({ url: link, source: 'youtube', status: 'PENDING' }); + else if (link.includes('twitter.com')) socialLinks.push({ url: link, source: 'twitter', status: 'PENDING' }); + else if (link.includes('mailto:')) { /* ignore mailto */ } + else { + try { + const baseUrl = new URL(url).hostname; + const linkUrl = new URL(link).hostname; + if (linkUrl === baseUrl || linkUrl.endsWith('.' + baseUrl)) { + internalPages.push({ url: link, source: 'site', status: 'PENDING' }); + } else { + externalLinks.push({ url: link, source: 'external', status: 'PENDING' }); + } + } catch (e) { + externalLinks.push({ url: link, source: 'external', status: 'PENDING' }); + } + } + }); + + return { + title: meta.title || og.title, + description: meta.description || og.description, + image: meta.image || og.image, + url: meta.url || og.url || url, + social: socialLinks, + seo: { + keywords: ($('meta[property="og:keywords"]').attr("content") || + $('meta[name="keywords"]').attr("content") || "").split(',').map((s: any) => s.trim()).filter((s: any) => s), + structured: jsonLdArray, + og, + metaTags: meta + }, + pages: internalPages, + externalLinks: externalLinks, + images + }; + } +} diff --git a/packages/shared/src/server/locations/enrichers/registry.ts b/packages/shared/src/server/locations/enrichers/registry.ts new file mode 100644 index 00000000..9171d4f6 --- /dev/null +++ b/packages/shared/src/server/locations/enrichers/registry.ts @@ -0,0 +1,34 @@ +import { CompetitorFull } from '@polymech/shared'; + +export interface EnrichmentContext { + userId: string; + logger?: any; +} + +export interface IEnricher { + name: string; + type: 'meta' | 'email' | string; + + /** + * Enrich a single location. + * @param location The partial competitor data available + * @param context Execution context + */ + enrich(location: CompetitorFull, context: EnrichmentContext): Promise>; +} + +export class EnricherRegistry { + private static enrichers: Map = new Map(); + + static register(name: string, enricher: IEnricher) { + this.enrichers.set(name, enricher); + } + + static get(name: string): IEnricher | undefined { + return this.enrichers.get(name); + } + + static getAll(): IEnricher[] { + return Array.from(this.enrichers.values()); + } +} diff --git a/packages/shared/src/server/locations/enrichers/service.ts b/packages/shared/src/server/locations/enrichers/service.ts new file mode 100644 index 00000000..eeb175b6 --- /dev/null +++ b/packages/shared/src/server/locations/enrichers/service.ts @@ -0,0 +1,135 @@ +import { EnricherRegistry } from './registry.js'; +import { getLocationsByPlaceIds, upsertLocations } from '../db.js'; +import { logger } from '../logger.js'; +import pMap from 'p-map'; + +export interface StreamInterface { + writeSSE(data: { event: string; data: string }): Promise; +} + +export class EnrichmentService { + + /** + * Orchestrates the enrichment process for a list of places and enrichers. + * Optionally streams progress and updates if a stream is provided. + */ + async enrichAndStream( + placeIds: string[], + enricherNames: string[], + userId: string, + stream?: StreamInterface + ) { + const requestedEnrichers = enricherNames.map(name => EnricherRegistry.get(name)).filter(e => e !== undefined); + + if (requestedEnrichers.length === 0) { + throw new Error('No valid enrichers found'); + } + + if (stream) { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: 0, + total: placeIds.length, + message: `Starting enrichment for ${placeIds.length} items` + }) + }); + } + + // Fetch current data + const existingLocations = await getLocationsByPlaceIds(placeIds); + const locationMap = new Map(existingLocations.map((l: any) => [l.place_id, l])); + + // ... class definition ... + + let processed = 0; + let successCount = 0; + let failedCount = 0; + + await pMap(placeIds, async (placeId: string) => { + const location = locationMap.get(placeId); + if (!location) { + logger.warn({ placeId }, 'Location not found for enrichment'); + failedCount++; + return; + } + + let updates: any = {}; + for (const enricher of requestedEnrichers) { + try { + // Enricher handles its own timeouts/shared resources + const result = await enricher!.enrich(location, { userId, logger }); + if (Object.keys(result).length > 0) { + logger.info({ placeId, enricher: enricher!.name, keys: Object.keys(result) }, 'Enricher found data'); + } else { + logger.info({ placeId, enricher: enricher!.name }, 'Enricher found no new data'); + } + updates = { ...updates, ...result }; + } catch (e: any) { + logger.error({ placeId, enricher: enricher!.name, err: e }, 'Enricher failed'); + } + } + + if (Object.keys(updates).length > 0) { + const updatedLocation = { + ...location, + ...updates, + raw_data: { + ...(location.raw_data || {}), + ...(updates.raw_data || {}) + } + }; + + try { + logger.info({ placeId, updates }, 'Updating location in db'); + await upsertLocations([updatedLocation]); + successCount++; + + if (stream) { + await stream.writeSSE({ + event: 'enrichment-update', + data: JSON.stringify({ + place_id: placeId, + updates: updates + }) + }); + } + } catch (e: any) { + logger.error({ placeId, err: e }, 'Failed to persist enrichment updates'); + failedCount++; + } + } else { + // No updates found, but technically processed successfully without error + successCount++; + } + + processed++; + if (stream) { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: processed, + total: placeIds.length, + message: `Enriched ${processed} of ${placeIds.length}` + }) + }); + } + }, { concurrency: parseInt(process.env.ENRICHER_META_CONCURRENCY || '5') }); + + + if (stream) { + await stream.writeSSE({ + event: 'complete', + data: JSON.stringify({ + processed, + success: successCount, + failed: failedCount + }) + }); + } + + return { processed, success: successCount, failed: failedCount }; + } +} + +export const enrichmentService = new EnrichmentService(); diff --git a/packages/shared/src/server/locations/gadm_wrapper.py b/packages/shared/src/server/locations/gadm_wrapper.py new file mode 100644 index 00000000..f2972547 --- /dev/null +++ b/packages/shared/src/server/locations/gadm_wrapper.py @@ -0,0 +1,302 @@ + +import sys +import json +import argparse +import pygadm +import traceback +import hashlib +import os + +CACHE_DIR = os.environ.get('GADM_CACHE', './cache/gadm') + +def get_cache_path(prefix, value): + hash_object = hashlib.md5(value.encode('utf-8')) + hash_hex = hash_object.hexdigest() + return os.path.join(CACHE_DIR, f"{prefix}_{hash_hex}.json") + +def read_cache(path): + if os.path.exists(path): + try: + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + except: + return None + return None + +def write_cache(path, data): + try: + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'w', encoding='utf-8') as f: + json.dump(data, f) + except Exception as e: + sys.stderr.write(f"Cache write failed: {e}\n") + +def search_regions(query, content_level=None, geojson=False, admin=None): + path = get_cache_path(f"search_{content_level if content_level else 'all'}_{'geo' if geojson else 'meta'}_{admin if admin else 'all'}", query) + cached = read_cache(path) + if cached: + print(json.dumps(cached, default=str)) + return + + try: + # If geojson is requested, we need pygadm.Items to get the geometry + if geojson: + # NOTE: This might still fail if name is ambiguous. + # Ideally the UI should search first (getting GIDs), then request boundary by ID. + # If a user requests search+geojson by name, they better be specific. + kwargs = {'name': [query]} + if content_level: + kwargs['content_level'] = int(content_level) + if admin: + kwargs['admin'] = admin + + try: + gdf = pygadm.Items(**kwargs) + except Exception as e: + # Check if query contains comma + if ',' in query: + first_part = query.split(',')[0].strip() + kwargs['name'] = [first_part] + try: + gdf = pygadm.Items(**kwargs) + except: + print(json.dumps({"data": []})) + return + else: + raise e + else: + # Metadata search -> Use pygadm.Names to find all matches (handles ambiguity) + kwargs = {'name': query} + if content_level: + kwargs['content_level'] = int(content_level) + if admin: + kwargs['admin'] = admin + + try: + gdf = pygadm.Names(**kwargs) + except Exception as e: + # Check if query contains comma + if ',' in query: + # Try searching for the first part + first_part = query.split(',')[0].strip() + kwargs['name'] = first_part + try: + gdf = pygadm.Names(**kwargs) + except: + # Still failed, raise original error or return empty? + # Let's print empty for consistent client handling + print(json.dumps({"data": []})) + return + else: + # No comma, propagate empty result + print(json.dumps({"data": []})) + return + + if gdf is None or gdf.empty: + print(json.dumps({"data": []})) + return + + # Convert to list of dicts for JSON serialization + # safe stringify helper + def safe_str(v): + return str(v) if v is not None else "" + + if geojson: + # Full GeoJSON conversion with manual fallback + features = [] + cols_props = [c for c in gdf.columns if c != 'geometry'] + + for index, row in gdf.iterrows(): + properties = {c: safe_str(row[c]) for c in cols_props} + try: + geometry = row['geometry'].__geo_interface__ if row['geometry'] else None + except: + geometry = None + + features.append({ + "type": "Feature", + "properties": properties, + "geometry": geometry + }) + + output = { + "type": "FeatureCollection", + "features": features + } + else: + # Metadata only (from Names or Items) + results = [] + cols = gdf.columns.tolist() + for index, row in gdf.iterrows(): + r = {} + for c in cols: + r[c] = safe_str(row[c]) + results.append(r) + output = {"data": results} + + write_cache(path, output) + print(json.dumps(output, default=str)) + + except Exception as e: + print(json.dumps({"error": traceback.format_exc()})) + sys.exit(1) + +def get_boundary(gadm_id, name, content_level): + path = get_cache_path("boundary", gadm_id) + cached = read_cache(path) + if cached: + print(json.dumps(cached, default=str)) + return + + try: + # Use admin code (GID) directly to avoid ambiguity + # pygadm.Items supports 'admin' to fetch by GID + kwargs = {'admin': [gadm_id]} + + # content_level is not strictly needed if GID is specific, + # but passing it doesn't hurt if we want to be explicit, + # however 'admin' usually overrides or implies level. + # Let's rely on GID as the primary key. + + gdf = pygadm.Items(**kwargs) + + if gdf is None or gdf.empty: + print(json.dumps({"error": "Region not found"})) + sys.exit(1) + + # Convert to GeoJSON manually to avoid pandas errors + features = [] + cols_props = [c for c in gdf.columns if c != 'geometry'] + + # Helper for safe string conversion + def safe_str(v): + return str(v) if v is not None else "" + + for index, row in gdf.iterrows(): + properties = {c: safe_str(row[c]) for c in cols_props} + try: + geometry = row['geometry'].__geo_interface__ if row['geometry'] else None + except: + geometry = None + + features.append({ + "type": "Feature", + "properties": properties, + "geometry": geometry + }) + + output = { + "type": "FeatureCollection", + "features": features + } + + write_cache(path, output) + print(json.dumps(output, default=str)) + + except Exception as e: + print(json.dumps({"error": str(e)})) + sys.exit(1) + +def get_names(admin, content_level=None, depth=1): + path = get_cache_path(f"names_{admin}_{content_level}_{depth}", "names") + cached = read_cache(path) + if cached: + print(json.dumps(cached, default=str)) + return + + try: + all_results = [] + + # Determine starting level + # If content_level is explicit, start there. + # Else derive from admin code: e.g. "ESP.6_1" (1 dot) -> Level 1. Next is 2. + # "ESP" (0 dots) -> Level 0. Next is 1. + if content_level: + start_level = int(content_level) + else: + # GID format: XXX.L1.L2_1 + # Count dots: + # ESP -> 0 -> start 1 + # ESP.1_1 -> 1 -> start 2 + start_level = admin.count('.') + 1 + + # Handle infinite depth (passed as -1 or similar string) + if str(depth).lower() in ['infinite', 'inf', '-1']: + limit = 5 # Reasonable cap for GADM (rarely goes beyond level 5) + else: + limit = int(depth) + + current_level = start_level + + for i in range(limit): + # Fetch names for this level + kwargs = {'admin': admin, 'content_level': current_level} + + try: + gdf = pygadm.Names(**kwargs) + except: + # Likely level doesn't exist + break + + if gdf is None or gdf.empty: + # No more levels + break + + # Convert to list of dicts + cols = gdf.columns.tolist() + def safe_str(v): + return str(v) if v is not None else "" + + for index, row in gdf.iterrows(): + r = {} + for c in cols: + r[c] = safe_str(row[c]) + all_results.append(r) + + current_level += 1 + + output = {"data": all_results} + write_cache(path, output) + print(json.dumps(output, default=str)) + + except Exception as e: + print(json.dumps({"error": traceback.format_exc()})) + sys.exit(1) + +def main(): + parser = argparse.ArgumentParser(description='PyGADM Wrapper') + subparsers = parser.add_subparsers(dest='command', help='Command to execute') + + # Search command + search_parser = subparsers.add_parser('search', help='Search for regions') + search_parser.add_argument('--query', required=True, help='Name to search for') + search_parser.add_argument('--content_level', required=False, help='Content level filter') + search_parser.add_argument('--geojson', action='store_true', help='Include GeoJSON geometry') + search_parser.add_argument('--country', required=False, help='Country filter (Admin code or name)') + + # Boundary command + boundary_parser = subparsers.add_parser('boundary', help='Get region boundary') + boundary_parser.add_argument('--id', required=True, help='GADM ID (for cache)') + boundary_parser.add_argument('--name', required=True, help='Region Name') + boundary_parser.add_argument('--content_level', required=False, help='Content Level') + + # Names command + names_parser = subparsers.add_parser('names', help='Get region names') + names_parser.add_argument('--admin', required=True, help='Admin code (e.g. FRA)') + names_parser.add_argument('--content_level', required=False, help='Content level') + names_parser.add_argument('--depth', required=False, default=1, help='Depth of recursion (1=default, -1=infinite)') + + args = parser.parse_args() + + if args.command == 'search': + search_regions(args.query, args.content_level, args.geojson, args.country) + elif args.command == 'boundary': + get_boundary(args.id, args.name, args.content_level) + elif args.command == 'names': + get_names(args.admin, args.content_level, args.depth) + else: + parser.print_help() + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/packages/shared/src/server/locations/index.ts b/packages/shared/src/server/locations/index.ts new file mode 100644 index 00000000..b63d8d91 --- /dev/null +++ b/packages/shared/src/server/locations/index.ts @@ -0,0 +1,844 @@ +import { getLocationsRoute, getLocationsStreamRoute, getCompetitorByIdRoute, getFindEmailStreamRoute, cancelJobRoute, getRegionSearchRoute, getRegionBoundaryRoute, getRegionNamesRoute, getRegionReverseRoute, getWikiSearchRoute, getLlmRegionInfoRoute, getEnrichStreamRoute } from './routes.js'; +import { run, IKBotTask } from '@polymech/kbot-d'; +import { EnricherRegistry } from './enrichers/registry.js'; +import { MetaEnricher } from './enrichers/meta.js'; +import { enrichmentService } from './enrichers/service.js'; +import { getLocationsByPlaceIds, upsertLocations } from './db.js'; +import { reverse } from '@polymech/search'; +import { LocationsWorker, LocationsJobData, EmailWorker, EmailJobData } from './pgboss.js'; +import { LOCATIONS_JOB_OPTIONS, COST_PER_SEARCH, JOB_NAME, EMAIL_JOB_NAME, EMAIL_JOB_OPTIONS, EMAIL_SEARCH_COST, EMAIL_STREAM_TIMEOUT_MS, EMAIL_MAX_BATCH_SIZE, TIMEOUT_MS_LOCATIONS } from './constants.js'; +import { logger } from '@/commons/logger.js'; +import { get_cached, params_hash } from './cache.js'; +import { streamSSE } from 'hono/streaming'; +import { EventBus } from '../EventBus.js'; +import { exec } from 'child_process'; +import { promisify } from 'util'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const execAsync = promisify(exec); +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +import { AbstractProduct } from '../AbstractProduct.js'; +import { CONFIG_DEFAULT } from '@polymech/commons'; + +function deduplicateResults(results: any[] | null | undefined) { + if (!Array.isArray(results)) return results; + const seen = new Set(); + return results.filter(item => { + const id = item.place_id || item.title; // Fallback to title if place_id missing + if (!id) return true; + if (seen.has(id)) return false; + seen.add(id); + return true; + }); +} + +export class LocationsProduct extends AbstractProduct { + id = 'locations'; + workers = [LocationsWorker, EmailWorker]; + jobOptions = LOCATIONS_JOB_OPTIONS; + + actions = { + search: { + costUnits: COST_PER_SEARCH, + cancellable: true, + description: 'Search for locations', + }, + email: { + costUnits: EMAIL_SEARCH_COST, + cancellable: true, + description: 'Find emails for locations' + } + }; + + routes!: any[]; + + constructor() { + super(); + this.routes = [ + { definition: getLocationsRoute, handler: this.handleGetLocations.bind(this) }, + { definition: getLocationsStreamRoute, handler: this.handleStreamGet.bind(this) }, + { definition: getCompetitorByIdRoute, handler: this.handleGetLocationById.bind(this) }, + { definition: getFindEmailStreamRoute, handler: this.handleStreamEmail.bind(this) }, + { definition: cancelJobRoute, handler: this.handleCancelJob.bind(this) }, + { definition: getRegionSearchRoute, handler: this.handleGetRegionSearch.bind(this) }, + { definition: getRegionBoundaryRoute, handler: this.handleGetRegionBoundary.bind(this) }, + { definition: getRegionReverseRoute, handler: this.handleGetRegionReverse.bind(this) }, + { definition: getRegionBoundaryRoute, handler: this.handleGetRegionBoundary.bind(this) }, + { definition: getRegionReverseRoute, handler: this.handleGetRegionReverse.bind(this) }, + { definition: getRegionNamesRoute, handler: this.handleGetRegionNames.bind(this) }, + { definition: getRegionNamesRoute, handler: this.handleGetRegionNames.bind(this) }, + { definition: getWikiSearchRoute, handler: this.handleGetWikiSearch.bind(this) }, + { definition: getLlmRegionInfoRoute, handler: this.handleGetLlmRegionInfo.bind(this) }, + { definition: getEnrichStreamRoute, handler: this.handleStreamEnrich.bind(this) } + ]; + + // Register default enrichers + EnricherRegistry.register('meta', new MetaEnricher()); + } + + async handleGetLlmRegionInfo(c: any) { + const { location } = c.req.valid('query'); + try { + const promptContent = `Provide a concise summary for ${location} strictly in valid JSON format with the following keys: +- population: string (current estimate) +- industry: string (key industries) +- status: string (current geopolitical status, e.g. "Stable", "Conflict Zone", "Developing") +- currency: string (currency name and code) +- description: string (brief general description) + +Do not include markdown formatting (like \`\`\`json). Just the raw JSON object.`; + + const task: IKBotTask = { + router: 'openai', + model: 'gpt-4o', // Using gpt-4o as gpt-5 might not be available/stable in all envs yet, or user req said gpt-5. Let's use gpt-4o or gpt-5 if confident. User said gpt-5. Correct. + prompt: promptContent, + // mode: 'completion' // Default is completion? No, default might be needed. + mode: 'completion', + path: '.', // unused but required? + }; + + // Override model to gpt-5 as requested if supported, otherwise fallback to gpt-4o. + // But checking kbot source, valid models are enums. If 'gpt-5' is not in enum, it might fail if validated strictly. + // The user explicitly asked for "router=openai model=gpt-5". + task.model = 'gpt-5'; + + const results = await run(task); + const result = results.length > 0 ? results[0] : null; + + // KBot completion mode usually returns string or the completion object + let jsonStr = ''; + if (typeof result === 'string') { + jsonStr = result; + } else if (result && typeof result === 'object' && 'content' in result) { + jsonStr = (result as any).content as string; + } else { + // Fallback or check structure + jsonStr = JSON.stringify(result); + } + + // Cleanup jsonStr if it has markdown code blocks + jsonStr = jsonStr.replace(/```json\n?|```/g, '').trim(); + + let data; + try { + data = JSON.parse(jsonStr); + } catch (e) { + logger.error({ jsonStr }, 'Failed to parse LLM response'); + return c.json({ error: 'Failed to parse LLM response' }, 500); + } + + return c.json({ data }, 200); + + } catch (error: any) { + logger.error(error, 'Error fetching LLM info'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + async handleGetWikiSearch(c: any) { + const { lat, lon, radius, limit } = c.req.valid('query'); + try { + // Wikipedia API: https://en.wikipedia.org/w/api.php?action=query&list=geosearch&gscoord=37.7891838|-122.4033522&gsradius=10000&gslimit=10&format=json + // We need a user agent: https://meta.wikimedia.org/wiki/User-Agent_policy + + const apiUrl = `https://en.wikipedia.org/w/api.php`; + const params = new URLSearchParams({ + action: 'query', + list: 'geosearch', + gscoord: `${lat}|${lon}`, + gsradius: radius || '10000', + gslimit: limit || '10', + format: 'json' + }); + + const headers = { + 'User-Agent': 'PolymechSaaS/1.0 (test@example.com)' // Replace with real contact if/when production + }; + + const res = await fetch(`${apiUrl}?${params.toString()}`, { headers }); + + if (!res.ok) { + return c.json({ error: 'Failed to fetch from Wikipedia' }, res.status); + } + + const json = await res.json(); + + if (json.error) { + logger.error({ wikiError: json.error }, 'Wikipedia API Error'); + return c.json({ error: json.error.info || 'Wikipedia API Error' }, 500); + } + + const results = json.query?.geosearch || []; + + return c.json({ data: results }, 200); + + } catch (error: any) { + logger.error(error, 'Error fetching Wiki data'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + async handleCancelJob(c: any) { + console.log('[LocationsProduct] handleCancelJob called', c.req.param('jobId')); + const { jobId } = c.req.valid('param'); + try { + if (!this.boss) { + logger.error('[LocationsProduct] Job system not initialized'); + return c.json({ error: 'Job system not initialized' }, 503); + } + await (this.boss as any).cancel(EMAIL_JOB_NAME, jobId); + logger.info({ jobId }, '[LocationsProduct] Job cancellation requested via API'); + return c.json({ message: 'Job cancellation requested' }, 200); + } catch (e: any) { + logger.error({ err: e, jobId }, '[LocationsProduct] Failed to cancel job'); + return c.json({ error: 'Failed to cancel job' }, 500); + } + } + + async handleGetLocationById(c: any) { + const { place_id } = c.req.valid('param'); + try { + const results = await getLocationsByPlaceIds([place_id]); + const location = results && results.length > 0 ? results[0] : null; + + if (!location) { + return c.json({ error: 'Competitor not found' }, 404); + } + + return c.json({ + message: 'Get competitor details success', + data: location + }, 200); + + } catch (error: any) { + logger.error(error, 'Error fetching competitor details'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + async handleGetRegionSearch(c: any) { + const { query, content_level, geojson, country } = c.req.valid('query'); + try { + const wrapperPath = path.join(__dirname, 'gadm_wrapper.py'); + // Ensure python is in path + let cmd = `python "${wrapperPath}" search --query "${query}"`; + if (content_level) { + cmd += ` --content_level "${content_level}"`; + } + if (geojson === 'true') { + cmd += ` --geojson`; + } + if (country) { + cmd += ` --country "${country}"`; + } + const { stdout, stderr } = await execAsync(cmd, { maxBuffer: 1024 * 1024 * 50 }); // 50MB buffer + + if (stderr) { + console.error('[PyGADM] Stderr:', stderr); + } + + try { + const json = JSON.parse(stdout); + if (json.error) { + return c.json({ error: json.error }, 500); + } + return c.json(json, 200); + } catch (e) { + logger.error({ stdout }, 'Failed to parse python output'); + return c.json({ error: 'Failed to parse GADM results' }, 500); + } + + } catch (error: any) { + // Check if it's a python script error that outputted JSON + if (error.stdout) { + try { + const json = JSON.parse(error.stdout); + if (json.error) { + logger.warn({ error: json.error }, 'GADM script returned error'); + return c.json({ error: json.error }, 400); // Bad request / Search failed + } + } catch (e) { + // ignore parsing error, fall through to generic error + } + } + logger.error(error, 'Error searching regions'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + async handleGetRegionBoundary(c: any) { + const { id } = c.req.valid('param'); + const { name, level } = c.req.valid('query'); + + try { + const wrapperPath = path.join(__dirname, 'gadm_wrapper.py'); + // We need name to fetch. If not provided, we can't do much with PyGADM Items class unless cached. + // But we can try to rely on cache by ID if it exists. + // If not cached and name missing -> error. + if (!name) { + // Check cache only? Or just require name. + // For now, let's require name from client. + } + + const cmd = `python "${wrapperPath}" boundary --id "${id}" --name "${name || ''}" --content_level "${level || ''}"`; + const { stdout, stderr } = await execAsync(cmd, { maxBuffer: 1024 * 1024 * 50 }); // 50MB buffer + + if (stderr) { + console.error('[PyGADM Boundary] Stderr:', stderr); + } + + try { + const json = JSON.parse(stdout); + if (json.error) { + return c.json({ error: json.error }, 404); + } + return c.json(json, 200); + } catch (e) { + logger.error({ stdout }, 'Failed to parse python output'); + return c.json({ error: 'Failed to parse GADM boundary' }, 500); + } + + } catch (error: any) { + // Check if it's a python script error that outputted JSON + if (error.stdout) { + try { + const json = JSON.parse(error.stdout); + if (json.error) { + logger.warn({ error: json.error }, 'GADM boundary script returned error'); + return c.json({ error: json.error }, 404); + } + } catch (e) { + // ignore parsing error + } + } + logger.error(error, 'Error fetching boundary'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + async handleGetRegionNames(c: any) { + const { admin, content_level, depth } = c.req.valid('query'); + try { + const wrapperPath = path.join(__dirname, 'gadm_wrapper.py'); + let cmd = `python "${wrapperPath}" names --admin "${admin}"`; + if (content_level) cmd += ` --content_level "${content_level}"`; + if (depth) cmd += ` --depth "${depth}"`; + + const { stdout, stderr } = await execAsync(cmd, { maxBuffer: 1024 * 1024 * 50 }); // 50MB buffer + + if (stderr) { + console.error('[PyGADM] Stderr:', stderr); + } + + try { + const json = JSON.parse(stdout); + if (json.error) { + return c.json({ error: json.error }, 500); + } + return c.json(json, 200); + } catch (e) { + logger.error({ stdout }, 'Failed to parse python output'); + return c.json({ error: 'Failed to parse GADM names results' }, 500); + } + + } catch (error: any) { + // Check if it's a python script error that outputted JSON + if (error.stdout) { + try { + const json = JSON.parse(error.stdout); + if (json.error) { + logger.warn({ error: json.error }, 'GADM names script returned error'); + return c.json({ error: json.error }, 400); + } + } catch (e) { + // ignore parsing error + } + } + logger.error(error, 'Error fetching region names'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + async handleGetRegionReverse(c: any) { + const { lat, lon } = c.req.valid('query'); + try { + // Mock LocalResult structure expected by reverse() + const loc: any = { + gps_coordinates: { + latitude: parseFloat(lat), + longitude: parseFloat(lon) + } + }; + + // Use 'reverse' from search package + // It expects opts.bigdata.key. We try to read it from env. + + const config = CONFIG_DEFAULT() as any + await reverse(loc, { bigdata: { key: config.bigdata.key } }); + if (loc.geo) { + return c.json({ data: loc.geo }, 200); + } else { + return c.json({ error: 'Reverse geocoding failed' }, 404); + } + } catch (error: any) { + logger.error(error, 'Error reverse geocoding'); + return c.json({ error: error.message || 'Internal Server Error' }, 500); + } + } + + hash(data: LocationsJobData) { + const { userId, ...params } = data; + return this.generateHash(params); + } + + meta(userId: string) { + return { + cost: COST_PER_SEARCH, + userId, + timestamp: new Date().toISOString() + }; + } + + async handleGetLocations(c: any) { + const query = c.req.valid('query'); + const { refresh } = query; + const forceRefresh = refresh === 'true'; + const userId = c.get('userId'); + const jobData = { + query: query.query, + location: query.location, + filterCity: query.filterCity, + filterContinent: query.filterContinent, + filterCountry: query.filterCountry, + filterType: query.filterType, + limit: query.limit ? parseInt(query.limit) : 250, + excludedTypes: query.excludedTypes, + userId: userId + }; + + try { + const inputHash = params_hash({ + query: query.query, + location: query.location, + filterCity: query.filterCity, + filterContinent: query.filterContinent, + filterCountry: query.filterCountry, + filterType: query.filterType, + limit: query.limit ? parseInt(query.limit) : 250, + excludedTypes: query.excludedTypes, + zoom: query.zoom ? parseInt(query.zoom) : undefined + }); + if (!forceRefresh) { + const cached = await get_cached(inputHash); + if (cached) { + return c.json({ + message: 'Get locations success (cached)', + data: deduplicateResults(cached) + }, 200); + } + } + const jobId = await this.sendJob(JOB_NAME, jobData, this.jobOptions); + if (!jobId) { + const cached = await get_cached(inputHash); + if (cached) { + return c.json({ message: 'Get locations success (cached)', data: deduplicateResults(cached) }, 200); + } + return c.json({ message: 'Job already submitted', jobId: null }, 202); + } + + try { + await this.waitForJob(jobId, TIMEOUT_MS_LOCATIONS); + const cached = await get_cached(inputHash); + if (cached) { + return c.json({ message: 'Get locations success (fresh)', data: deduplicateResults(cached) }, 200); + } + } catch (e) { + if ((e as Error).message === 'Job timeout') { + // Fallthrough to return 202 + } else { + throw e; + } + } + + } catch (error: any) { + logger.error(error, 'Error fetching locations'); + return c.json({ message: error.message || 'Internal Server Error' }, 500); + } + + return c.json({ message: 'Job submitted' }, 202); + } + + async handleStreamGet(c: any) { + const query = c.req.valid('query'); + const { refresh } = query; + const forceRefresh = refresh === 'true'; + const userId = c.get('userId'); + + if (!userId) { + return c.json({ message: 'Unauthorized' }, 401); + } + + return this.handleStream(c, { + data: { + query: query.query, + location: query.location, + filterCity: query.filterCity, + filterContinent: query.filterContinent, + filterCountry: query.filterCountry, + filterType: query.filterType, + limit: query.limit ? parseInt(query.limit) : 250, + zoom: query.zoom ? parseInt(query.zoom) : undefined, + excludedTypes: query.excludedTypes, + }, + userId, + forceRefresh, + fetcher: async (data: any, uid: string) => { + + const inputHash = this.generateHash(data); + + const existing = await get_cached(inputHash); + if (existing) return deduplicateResults(existing) as any[]; + + const jobData = { ...data, userId: uid }; + const options = { ...this.jobOptions, singletonKey: this.generateHash(data) }; + + logger.info(`Fetching locations for ${inputHash}`, jobData); + const jobId = await this.sendJob(JOB_NAME, jobData, options); + if (jobId) { + await this.waitForJob(jobId, TIMEOUT_MS_LOCATIONS); + } else { + const checkAgain = await get_cached(inputHash); + if (checkAgain) return deduplicateResults(checkAgain) as any[]; + + logger.info({ inputHash }, 'Waiting for debounced job...'); + await this.waitForHash(inputHash, TIMEOUT_MS_LOCATIONS); + } + + const result = await get_cached(inputHash); + return deduplicateResults(result) || []; + }, + cacheChecker: async (hash: string) => deduplicateResults(await get_cached(hash)) || null + }); + } + + async handleStreamEmail(c: any) { + const { place_ids } = c.req.valid('query'); + let userId = c.get('userId') as string | undefined; + + if (!userId) { + return c.json({ error: 'Unauthorized' }, 401); + } + + const placeIdArray = place_ids.split(',').map((id: string) => id.trim()).filter((id: string) => id.length > 0); + + if (placeIdArray.length === 0) { + return c.json({ error: 'No place IDs provided' }, 400); + } + + if (placeIdArray.length > EMAIL_MAX_BATCH_SIZE) { + return c.json({ error: `Maximum ${EMAIL_MAX_BATCH_SIZE} locations can be processed at once` }, 400); + } + + logger.info(`[LocationsProduct] Starting email search via worker for ${placeIdArray.length} locations`); + + return streamSSE(c, async (stream) => { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: 0, + total: placeIdArray.length, + percent: 0, + message: `Queueing jobs for ${placeIdArray.length} location(s)` + }) + }); + + const pendingJobIds = new Set(); + const jobsMap = new Map(); + let completedCount = 0; + let successCount = 0; + let failedCount = 0; + let emailsFound = 0; + + const onJobComplete = async (event: any) => { + if (pendingJobIds.has(event.jobId)) { + const place_id = jobsMap.get(event.jobId); + pendingJobIds.delete(event.jobId); + + const result = event.result; + + await stream.writeSSE({ + event: 'location-start', + data: JSON.stringify({ + place_id, + title: 'Processing...', + index: completedCount + 1, + total: placeIdArray.length + }) + }); + + if (result && result.emails) { + successCount++; + emailsFound += result.emails.length; + + for (const emailMeta of result.emails) { + await stream.writeSSE({ + event: 'location-email', + data: JSON.stringify({ + place_id, + email: emailMeta.email, + source: emailMeta.source, + cached: false + }) + }); + } + } + + await stream.writeSSE({ + event: 'location-complete', + data: JSON.stringify({ + place_id, + emailCount: result?.emails?.length || 0, + cached: false + }) + }); + + completedCount++; + + const percent = Math.round((completedCount / placeIdArray.length) * 100); + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: completedCount, + total: placeIdArray.length, + percent, + message: `Processed ${completedCount} of ${placeIdArray.length}` + }) + }); + } + }; + + const onJobFailed = async (event: any) => { + if (pendingJobIds.has(event.jobId)) { + const place_id = jobsMap.get(event.jobId); + pendingJobIds.delete(event.jobId); + + failedCount++; + + await stream.writeSSE({ + event: 'location-error', + data: JSON.stringify({ + place_id, + error: event.error || 'Unknown error', + canRetry: true + }) + }); + + completedCount++; + const percent = Math.round((completedCount / placeIdArray.length) * 100); + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: completedCount, + total: placeIdArray.length, + percent, + message: `Processed ${completedCount} of ${placeIdArray.length}` + }) + }); + } + }; + + // Register Listeners BEFORE submitting jobs to capture fast completions + EventBus.on('job:complete', onJobComplete); + EventBus.on('job:failed', onJobFailed); + + stream.onAbort(async () => { + logger.info('[LocationsProduct] Stream aborted by client, cancelling pending jobs'); + for (const jobId of pendingJobIds) { + try { + if (this.boss) { + await this.boss.cancel(EMAIL_JOB_NAME, jobId); + } + } catch (e) { + logger.warn({ jobId, err: e }, 'Failed to cancel job on abort'); + } + } + clearTimeout(listTimeout); + EventBus.off('job:complete', onJobComplete); + EventBus.off('job:failed', onJobFailed); + }); + + const listTimeout = setTimeout(() => { + logger.warn('[LocationsProduct] Stream global timeout reached'); + EventBus.off('job:complete', onJobComplete); + EventBus.off('job:failed', onJobFailed); + stream.writeSSE({ event: 'error', data: JSON.stringify({ error: 'Global Timeout' }) }); + stream.close(); + }, EMAIL_STREAM_TIMEOUT_MS); + + try { + let submittedCount = 0; + for (const place_id of placeIdArray) { + const jobData = { place_id, userId }; + const options = { ...EMAIL_JOB_OPTIONS }; + + const jobId = await this.sendJob(EMAIL_JOB_NAME, jobData, options); + + if (jobId) { + pendingJobIds.add(jobId); + jobsMap.set(jobId, place_id); + + // Emit job info immediately for cancellation support + await stream.writeSSE({ + event: 'location-job', + data: JSON.stringify({ place_id, jobId }) + }); + } + submittedCount++; + } + + // Wait loop + while (pendingJobIds.size > 0) { + // logger.info(`[LocationsProduct] ${pendingJobIds.size} pending jobs`); + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + } catch (error: any) { + logger.error({ err: error }, 'Error in email stream loop'); + // Attempt to send error event + try { + await stream.writeSSE({ + event: 'error', + data: JSON.stringify({ error: error.message || 'Stream processing error' }) + }); + } catch (e) { /* ignore stream write errors if closed */ } + } finally { + clearTimeout(listTimeout); + EventBus.off('job:complete', onJobComplete); + EventBus.off('job:failed', onJobFailed); + } + + await stream.writeSSE({ + event: 'complete', + data: JSON.stringify({ + total: placeIdArray.length, + successful: successCount, + failed: failedCount, + emailsFound + }) + }); + }); + } + + async handleStreamEnrich(c: any) { + const { place_ids, enrichers } = c.req.valid('query'); + const userId = c.get('userId'); + + if (!userId) { + return c.json({ error: 'Unauthorized' }, 401); + } + + const placeIdArray = place_ids.split(',').map((id: string) => id.trim()).filter((id: string) => id.length > 0); + const enricherNames = enrichers.split(',').map((e: string) => e.trim()).filter((e: string) => e.length > 0); + + if (placeIdArray.length === 0) { + return c.json({ error: 'No place IDs provided' }, 400); + } + + const requestedEnrichers = enricherNames.map((name: string) => EnricherRegistry.get(name)).filter((e: any) => e !== undefined); + + if (requestedEnrichers.length === 0) { + return c.json({ error: 'No valid enrichers requested' }, 400); + } + + logger.info({ count: placeIdArray.length, enrichers: enricherNames }, '[LocationsProduct] Starting enrichment stream'); + + return streamSSE(c, async (stream) => { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: 0, + total: placeIdArray.length, + message: `Starting enrichment for ${placeIdArray.length} items` + }) + }); + + // Fetch current data for these places + // For now, we fetch them from DB to have the base object. + // If they don't exist in DB, we can't enrich them (unless we accept partials from client, but that's insecure/complex) + // Ideally we assume they exist or we just proceed. + const existingLocations = await getLocationsByPlaceIds(placeIdArray); + const locationMap = new Map(existingLocations.map(l => [l.place_id, l])); + + let processed = 0; + + for (const placeId of placeIdArray) { + const location = locationMap.get(placeId); + if (!location) { + logger.warn({ placeId }, 'Location not found for enrichment'); + // Emit error or skip + continue; + } + + // Run each enricher + let updates: any = {}; + for (const enricher of requestedEnrichers) { + try { + const result = await enricher!.enrich(location, { userId, logger }); + updates = { ...updates, ...result }; + + // Update our local copy for subsequent enrichers if needed + // Object.assign(location, result); + // Note: Deep merge might be safer, but for now top-level partials are expected. + } catch (e: any) { + logger.error({ placeId, enricher: enricher!.name, err: e }, 'Enricher failed'); + } + } + + if (Object.keys(updates).length > 0) { + // Persist to DB + // We need to merge updates into the location. + // upsertLocations expects arrays. + const updatedLocation = { + ...location, + ...updates, + // If deep merging is needed for raw_data or others, handle it. + // For now assume top level fields or replacement of raw_data chunks. + raw_data: { + ...(location.raw_data || {}), + ...(updates.raw_data || {}) + } + }; + + try { + await upsertLocations([updatedLocation]); + + await stream.writeSSE({ + event: 'enrichment-update', + data: JSON.stringify({ + place_id: placeId, + updates: updates + }) + }); + } catch (e: any) { + logger.error({ placeId, err: e }, 'Failed to persist enrichment updates'); + } + } + + processed++; + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ + current: processed, + total: placeIdArray.length, + message: `Enriched ${processed} of ${placeIdArray.length}` + }) + }); + } + + await stream.writeSSE({ event: 'complete', data: JSON.stringify({ processed }) }); + }); + } +} diff --git a/packages/shared/src/server/locations/jobs-find.ts b/packages/shared/src/server/locations/jobs-find.ts new file mode 100644 index 00000000..8a0e815e --- /dev/null +++ b/packages/shared/src/server/locations/jobs-find.ts @@ -0,0 +1,172 @@ +import { googleMaps, ResolveFlags } from '@polymech/search'; +import { upsertLocations, storeSearch, getLocationsMeta } from './db.js'; +import { supabase } from '../../commons/supabase.js'; +import { params_hash } from './cache.js'; +import { logger } from './logger.js'; + +export interface SearchParams { + query: string; + location: string; + filterCity?: string; + filterContinent?: string; + filterCountry?: string; + filterType?: string; + concurrency?: number; + limit?: number; + zoom?: number; + excludedTypes?: string[]; +} + +export const searchLocations = async (params: SearchParams, userId: string) => { + logger.info({ params }, 'Fetching locations'); + + const { query, location, filterCity, filterContinent, filterCountry, filterType, limit, zoom, excludedTypes } = params; + + // 1. Fetch from SerpApi + const results = await googleMaps({ + query: query, + searchFrom: location, + resolve: [ResolveFlags.PHOTOS], + filterCity, + filterContinent, + filterCountry, + filterType, + limit, + zoom, + excludedTypes + }); + + // Flatten results + const flatResults = results ? results.flat(Infinity) : []; + + // 2. Map Results + const locationsToUpsert = flatResults + .filter((r: any) => r.place_id) + .map((r: any) => ({ + place_id: r.place_id, + title: r.title, + description: r.description, + address: r.address, + gps_coordinates: r.gps_coordinates, + phone: r.phone, + website: r.website, + operating_hours: r.operating_hours, + thumbnail: r.thumbnail, + types: r.types, + raw_data: r, + continent: r.geo?.continent, + country: r.geo?.countryName, + city: r.geo?.city, + updated_at: new Date().toISOString(), + user_id: userId + })); + + // 2b. Update Known Types (Side Effect) + if (userId) { + try { + const { data: profile } = await supabase + .from('profiles') + .select('settings') + .eq('id', userId) + .maybeSingle(); + + const settings = profile?.settings || {}; + const knownTypes: string[] = Array.isArray(settings.known_types) ? settings.known_types : []; + const newTypesSet = new Set(knownTypes); + + locationsToUpsert.forEach((l: any) => { + if (l.types && Array.isArray(l.types)) { + l.types.forEach((t: string) => newTypesSet.add(t)); + } + }); + + if (newTypesSet.size > knownTypes.length) { + const updatedKnownTypes = Array.from(newTypesSet).sort(); + await supabase + .from('profiles') + .update({ + settings: { ...settings, known_types: updatedKnownTypes } + }) + .eq('id', userId); + } + } catch (err) { + logger.error(err, 'Error processing known types'); + } + } + + // 3. Preserve Meta (Emails) + // 2a. Deduplicate Locations (Fix 21000 error) + const uniqueLocationsMap = new Map(); + locationsToUpsert.forEach((l: any) => { + const pid = String(l.place_id).trim(); + // Ensure place_id is cleaned + l.place_id = pid; + + if (!uniqueLocationsMap.has(pid)) { + uniqueLocationsMap.set(pid, l); + } else { + logger.debug({ place_id: pid }, 'Duplicate place_id found in batch, skipping'); + } + }); + const uniqueLocations = Array.from(uniqueLocationsMap.values()); + logger.info({ + total: locationsToUpsert.length, + unique: uniqueLocations.length, + diff: locationsToUpsert.length - uniqueLocations.length + }, 'Deduplication stats'); + + // 3. Preserve Meta (Emails) + const placeIds = uniqueLocations.map((l: any) => l.place_id); + if (placeIds.length > 0) { + const existingLocations = await getLocationsMeta(placeIds); + if (existingLocations) { + const metaMap = new Map(existingLocations.map((l: any) => [l.place_id, l.meta])); + uniqueLocations.forEach((l: any) => { + const existingMeta = metaMap.get(l.place_id); + if (existingMeta) { + l.raw_data.meta = { + ...(l.raw_data.meta || {}), + ...existingMeta + }; + } + }); + } + } + + // 4. Upsert Locations & Store Search + try { + // Attempt single batch upsert for performance + logger.info({ count: uniqueLocations.length }, 'Attempting batch upsert'); + await upsertLocations(uniqueLocations); + logger.info({ count: uniqueLocations.length }, 'Batch upsert successful'); + } catch (err: any) { + // Fallback to serial processing if duplicates exist within batch (Postgres 21000) + // or any other batch-level error that might be resolvable row-by-row + logger.warn({ err }, 'Batch upsert failed, falling back to sequential upserts'); + + // Execute upserts sequentially to safely handle "ON CONFLICT ... row a second time" + // This effectively implements "Last Write Wins" for duplicates in the batch + for (const loc of uniqueLocations) { + try { + await upsertLocations([loc]); + } catch (innerErr) { + logger.error({ place_id: loc.place_id, err: innerErr }, 'Failed to upsert individual location'); + // Continue processing others + } + } + } + + await storeSearch(params_hash({ + query, + location, + filterCity, + filterContinent, + filterCountry, + filterType, + limit, + zoom, + excludedTypes + }), { query, location }, placeIds); + + return uniqueLocations; +}; diff --git a/packages/shared/src/server/locations/logger.ts b/packages/shared/src/server/locations/logger.ts new file mode 100644 index 00000000..7d699953 --- /dev/null +++ b/packages/shared/src/server/locations/logger.ts @@ -0,0 +1,2 @@ +import { logger as rootLogger } from '@/commons/logger.js'; +export const logger = rootLogger.child({ product: 'locations' }); diff --git a/packages/shared/src/server/locations/meta.ts b/packages/shared/src/server/locations/meta.ts new file mode 100644 index 00000000..e69de29b diff --git a/packages/shared/src/server/locations/pgboss.ts b/packages/shared/src/server/locations/pgboss.ts new file mode 100644 index 00000000..fbd7f10b --- /dev/null +++ b/packages/shared/src/server/locations/pgboss.ts @@ -0,0 +1,167 @@ +import { Job, PgBoss } from 'pg-boss'; +import { AbstractWorker } from '@/jobs/boss/AbstractWorker.js'; +import { Worker } from '@/commons/decorators.js'; +import { searchLocations, SearchParams } from './jobs-find.js'; +import { JOB_NAME, EMAIL_JOB_NAME, COST_PER_SEARCH, EMAIL_SEARCH_COST, EMAIL_META_TIMEOUT_MS, EMAIL_SEARCH_TIMEOUT_MS, EMAIL_SEARCH_MAX_PAGES, EMAIL_SEARCH_PAGE_TIMEOUT_MS, EMAIL_SEARCH_PAGE_CONCURRENCY, EMAIL_MAX_CONCURRENT_JOBS } from './constants.js'; +import { params_hash } from './cache.js'; +import { findEmailEach, parseHtml, Page, Meta } from '@polymech/search'; +import { getLocationByPlaceId } from '../../endpoints/competitors/getLocationByPlaceId.js'; +import { supabase } from '../../commons/supabase.js'; +import { logger } from '@/commons/logger.js'; + +export interface LocationsJobData extends SearchParams { + userId: string; +} + +export interface EmailJobData { + place_id: string; + userId: string; +} + +@Worker(JOB_NAME) +export class LocationsWorker extends AbstractWorker { + readonly queueName = JOB_NAME; + + calculateCost(job: Job, result: any): number { + return COST_PER_SEARCH + (result?.length || 0) * 0.1; + } + protected async process(job: Job) { + const { userId, ...params } = job.data; + const results = await searchLocations(params, userId); + return { count: results.length, placeIds: results.map((r: any) => r.place_id) }; + } +} + +@Worker(EMAIL_JOB_NAME) +export class EmailWorker extends AbstractWorker { + readonly queueName = EMAIL_JOB_NAME; + readonly teamSize = EMAIL_MAX_CONCURRENT_JOBS; + + calculateCost(job: Job, result: any): number { + return EMAIL_SEARCH_COST; + } + + protected async process(job: Job) { + logger.info(`[EmailWorker] Processing job ${job.id} for place_id: ${job.data.place_id}`); + + const { place_id, userId } = job.data; + + // 1. Fetch location details + const location = await getLocationByPlaceId(place_id); + if (!location) { + throw new Error('Location not found'); + } + + // 2. Check for existing emails (maybe skip this if forceRefresh is handled by caller? assuming always fresh search requested or caller handles cache) + // For job simplicity, we always assume we want to enrich if we are here. + // But we should check if website exists + if (!location.website) { + throw new Error('No website available'); + } + + // 3. Ensure Meta Data + if (!location.meta || !location.meta.pages) { + logger.info(`[EmailWorker] Meta missing for ${place_id} : ${location}, fetching...`); + const meta = await parseHtml(location.website, null, { + headless: true, + timeout: EMAIL_META_TIMEOUT_MS + }); + if (meta) { + location.meta = { ...location.meta, ...meta } as any; + // Update DB with meta + await supabase.from('locations').update({ meta: location.meta }).eq('place_id', place_id); + } + } + + // 4. Execute Email Scraping + logger.info(`[EmailWorker] Starting email search for ${place_id} : ${location.title}`); + + let isCancelled = false; + const checkCancelled = async () => { + if (isCancelled) return true; + if (!this.boss) return false; + try { + const currentJob = await this.boss.getJobById(job.name, job.id); + const dbCancelled = !!(currentJob && (currentJob as any).state === 'cancelled'); + if (dbCancelled) isCancelled = true; + return isCancelled; + } catch (e) { + return false; + } + }; + + + + const searchPromise = findEmailEach(location, { + headless: process.env.EMAIL_SEARCH_HEADLESS === 'false' ? false : true, + searchFrom: 'api', + abortAfter: 1, + maxPages: EMAIL_SEARCH_MAX_PAGES, + pageTimeout: EMAIL_SEARCH_PAGE_TIMEOUT_MS, + concurrency: EMAIL_SEARCH_PAGE_CONCURRENCY, + checkCancelled + }, async (page: Page) => { + logger.info(`[EmailWorker] Found email for ${place_id}`); + if (await checkCancelled()) { + logger.info(`[EmailWorker] Job cancelled for ${place_id}`); + throw new Error('Cancelled'); + } + }); + + // We use a timeout wrapper similar to original code + let emails: string[] = []; + logger.info(`[EmailWorker] Starting search with timeout: ${EMAIL_SEARCH_TIMEOUT_MS}ms`); + try { + const result = await Promise.race([ + searchPromise, + new Promise((_, reject) => setTimeout(() => { + logger.warn(`[EmailWorker] Timeout reached for ${place_id}, signalling cancellation...`); + isCancelled = true; + reject(new Error('Timeout')); + }, EMAIL_SEARCH_TIMEOUT_MS)) + ]); + if (result) emails = result as string[]; + } catch (e: any) { + if (e.message === 'Timeout') { + logger.warn(`[EmailWorker] Timeout searching emails for ${place_id}`); + // Proceed with what we have (empty) + } else if (e.message === 'Cancelled' || e.message === 'CancelledByUser') { + logger.warn(`[EmailWorker] Job cancelled for ${place_id}`); + // Proceed with what we have (likely empty or partial) + } else { + throw e; + } + } + + // Format emails + const emailsWithMetadata = emails.map((email) => ({ + email, + source: location.website || '', + foundAt: new Date().toISOString(), + tool: 'puppeteer' + })); + + // Update location with emails + if (emailsWithMetadata.length > 0) { + logger.info(`[EmailWorker] Updating location ${place_id} with ${emailsWithMetadata.length} emails`); + await supabase + .from('locations') + .update({ + meta: { + ...location.meta, + emails: emailsWithMetadata // Replaces emails with fresh finding + }, + updated_at: new Date().toISOString() + }) + .eq('place_id', place_id); + } else { + logger.info(`[EmailWorker] Search finished for ${place_id}. No emails found.`); + } + + return { + place_id, + emailsCount: emailsWithMetadata.length, + emails: emailsWithMetadata + }; + } +} diff --git a/packages/shared/src/server/locations/routes.ts b/packages/shared/src/server/locations/routes.ts new file mode 100644 index 00000000..2512d667 --- /dev/null +++ b/packages/shared/src/server/locations/routes.ts @@ -0,0 +1,375 @@ +import { createRoute } from '@hono/zod-openapi'; +import { LocationRequestSchema, LocationResponseSchema, LocationDetailResponseSchema } from './schemas.js'; +import { Public } from '@/commons/decorators.js'; +import { z } from 'zod'; + +import { authMiddleware, optionalAuthMiddleware } from '../../middleware/auth.js'; + +const tags = ['Locations']; + +export const RequestSchema = z.object({ + query: z.string().default(''), + location: z.string().optional().default('barcelona, spain'), + refresh: z.string().optional().default('false'), + filterCountry: z.string().optional(), + filterCity: z.string().optional(), + filterContinent: z.string().optional(), + filterType: z.string().optional(), + limit: z.string().optional().default('250'), + zoom: z.string().optional().describe('Map zoom level'), + excludedTypes: z.union([z.string(), z.array(z.string())]).optional().transform((val) => { + if (!val) return undefined; + if (typeof val === 'string') return [val]; + return val; + }), +}).loose() + +export type CompetitorRequest = z.infer; + +export const cancelJobRoute = createRoute({ + method: 'post', + path: '/api/competitors/email/job/{jobId}/cancel', + tags, + middleware: [authMiddleware] as const, + request: { + params: z.object({ + jobId: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ + message: z.string(), + }), + }, + }, + description: 'Job cancelled successfully', + }, + 404: { + description: 'Job not found', + }, + 500: { + description: 'Server error', + }, + }, +}); + +export const getLocationsRoute = createRoute({ + method: 'get', + path: '/api/competitors', + tags, + middleware: [authMiddleware] as const, + request: { + query: RequestSchema, + }, + responses: { + 202: { + content: { + 'application/json': { + schema: z.object({ + message: z.string(), + jobId: z.string().nullable(), + }), + }, + }, + description: 'Job submitted', + }, + 200: { + content: { + 'application/json': { + schema: LocationResponseSchema, + }, + }, + description: 'Retrieve locations (cached)', + }, + 500: { + description: 'Server error', + }, + }, +}); + +export const getLocationsStreamRoute = createRoute({ + method: 'get', + path: '/api/competitors/stream', + tags, + middleware: [authMiddleware] as const, + request: { + query: LocationRequestSchema, + }, + responses: { + 200: { + description: 'Server-Sent Events stream of location results', + content: { + 'text/event-stream': { + schema: { + type: 'string' + } + } + } + }, + 500: { + description: 'Server error', + }, + }, +}) + +export const getCompetitorByIdRoute = Public(createRoute({ + method: 'get', + path: '/api/competitors/{place_id}', + tags, + request: { + params: z.object({ + place_id: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: LocationDetailResponseSchema, + }, + }, + description: 'Retrieve competitor details', + }, + 404: { + description: 'Competitor not found', + }, + 500: { + description: 'Server error', + }, + }, +})); + +export const getLlmRegionInfoRoute = Public(createRoute({ + method: 'get', + path: '/api/locations/llm-info', + tags, + request: { + query: z.object({ + location: z.string().describe('Location name to get info for'), + }), + }, + responses: { + 200: { + description: 'LLM Region Info', + content: { + 'application/json': { + schema: z.object({ + data: z.any(), + }), + }, + }, + }, + 500: { + description: 'Server error', + }, + }, +})); + +export const getFindEmailStreamRoute = createRoute({ + method: 'get', + path: '/api/competitors/email/stream', + tags, + middleware: [authMiddleware] as const, + request: { + query: z.object({ + place_ids: z.string().describe('Comma-separated list of place IDs'), + token: z.string().optional().describe('Auth token for SSE'), + }), + }, + responses: { + 200: { + description: 'Server-Sent Events stream of email finding results', + content: { + 'text/event-stream': { + schema: { + type: 'string' + } + } + } + }, + 500: { + description: 'Server error', + }, + }, +}); + +export const getRegionSearchRoute = Public(createRoute({ + method: 'get', + path: '/api/regions/search', + tags, + request: { + query: z.object({ + query: z.string().describe('Region name to search for'), + content_level: z.string().optional().describe('Filter by content level (e.g. 1)'), + geojson: z.string().optional().describe('Include GeoJSON geometry (true/false)'), + }), + }, + responses: { + 200: { + description: 'Search results', + content: { + 'application/json': { + schema: z.object({ + data: z.array(z.any()), + }), + }, + }, + }, + 500: { + description: 'Server error', + }, + }, +})); + +export const getRegionBoundaryRoute = Public(createRoute({ + method: 'get', + path: '/api/regions/boundary/{id}', + tags, + request: { + params: z.object({ + id: z.string().describe('GADM ID of the region'), + }), + query: z.object({ + name: z.string().optional().describe('Region Name (Required for GADM fetch)'), + level: z.string().optional().describe('Content Level'), + }), + }, + responses: { + 200: { + description: 'GeoJSON boundary', + content: { + 'application/json': { + schema: z.object({ + data: z.any(), // GeoJSON object + }), + }, + }, + }, + 404: { + description: 'Region not found', + }, + 500: { + description: 'Server error', + }, + }, +})); + +export const getRegionReverseRoute = Public(createRoute({ + method: 'get', + path: '/api/regions/reverse', + tags, + request: { + query: z.object({ + lat: z.string().describe('Latitude'), + lon: z.string().describe('Longitude'), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ + data: z.any().describe('Reverse geocoding result') + }), + }, + }, + description: 'Reverse geocoding successful', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ + error: z.string(), + }), + }, + }, + description: 'Server error', + }, + }, +})); + +export const getRegionNamesRoute = Public(createRoute({ + method: 'get', + path: '/api/regions/names', + tags, + request: { + query: z.object({ + admin: z.string().describe('Admin code (e.g. FRA, FRA.1_1)'), + content_level: z.string().describe('Content level (e.g. 2)').default('2'), + }), + }, + responses: { + 200: { + description: 'Region names results', + content: { + 'application/json': { + schema: z.object({ + data: z.array(z.any()), + }), + }, + }, + }, + 500: { + description: 'Server error', + }, + }, +})); +export const getWikiSearchRoute = Public(createRoute({ + method: 'get', + path: '/api/locations/wiki', + tags, + request: { + query: z.object({ + lat: z.string().describe('Latitude'), + lon: z.string().describe('Longitude'), + radius: z.string().optional().default('10000').describe('Radius in meters (max 10000)'), // WP limit is usually 10000 + limit: z.string().optional().default('10').describe('Limit results'), + }), + }, + responses: { + 200: { + description: 'Wiki geosearch results', + content: { + 'application/json': { + schema: z.object({ + data: z.array(z.any()), + }), + }, + }, + }, + 500: { + description: 'Server error', + }, + }, +})); + +export const getEnrichStreamRoute = createRoute({ + method: 'get', + path: '/api/competitors/enrich/stream', + tags, + middleware: [authMiddleware] as const, + request: { + query: z.object({ + place_ids: z.string().describe('Comma-separated list of place IDs'), + enrichers: z.string().describe('Comma-separated list of enrichers (e.g. meta)'), + }), + }, + responses: { + 200: { + description: 'Server-Sent Events stream of enrichment results', + content: { + 'text/event-stream': { + schema: { + type: 'string' + } + } + } + }, + 500: { + description: 'Server error', + }, + }, +}); diff --git a/packages/shared/src/server/locations/schemas.ts b/packages/shared/src/server/locations/schemas.ts new file mode 100644 index 00000000..858e8f1d --- /dev/null +++ b/packages/shared/src/server/locations/schemas.ts @@ -0,0 +1,28 @@ +import { z } from '@hono/zod-openapi'; +import { CompetitorResponseSchema, CompetitorDetailResponseSchema } from '@polymech/shared'; + +// Define Request Schema locally for server/OpenAPI compatibility +export const LocationRequestSchema = z.object({ + query: z.string().default('plastichub'), + location: z.string().optional().default('barcelona, spain'), + refresh: z.string().optional().default('false'), + filterCountry: z.string().optional(), + filterCity: z.string().optional(), + filterContinent: z.string().optional(), + filterType: z.string().optional(), + concurrency: z.string().optional().default('5'), + limit: z.string().optional().default('250'), + zoom: z.string().optional().describe('Map zoom level'), + excludedTypes: z.union([z.string(), z.array(z.string())]).optional().transform((val) => { + if (!val) return undefined; + if (typeof val === 'string') return [val]; + return val; + }), +}); + +export const LocationResponseSchema = CompetitorResponseSchema; +export const LocationDetailResponseSchema = CompetitorDetailResponseSchema; + +export type LocationRequest = z.infer; +export type LocationResponse = z.infer; +export type LocationDetailResponse = z.infer; diff --git a/packages/shared/src/server/locations/stream.ts b/packages/shared/src/server/locations/stream.ts new file mode 100644 index 00000000..be40422d --- /dev/null +++ b/packages/shared/src/server/locations/stream.ts @@ -0,0 +1,107 @@ +import { createRoute, RouteHandler } from '@hono/zod-openapi'; +import { streamSSE } from 'hono/streaming'; +import { LocationRequestSchema } from './schemas.js'; +import { searchLocations } from './jobs-find.js'; +import { get_cached, params_hash } from './cache.js'; +import { HonoEnv } from '@/commons/types.js'; +import { logger } from './logger.js'; + +const tags = ['Locations']; + +export const getLocationsStreamRoute = createRoute({ + method: 'get', + path: '/api/locations/stream', + tags, + request: { + query: LocationRequestSchema, + }, + responses: { + 200: { + description: 'Server-Sent Events stream of location results', + content: { + 'text/event-stream': { + schema: { + type: 'string' + } + } + } + }, + 500: { + description: 'Server error', + }, + }, +}); + +export const stream_get: RouteHandler = async (c) => { + const query = c.req.valid('query'); + const { refresh } = query; + const forceRefresh = refresh === 'true'; + const userId = c.get('userId'); + + if (!userId) { + logger.error('Unauthorized : no userId'); + return c.json({ message: 'Unauthorized' }, 401); + } + + const inputHash = params_hash({ query: query.query, location: query.location }); + + return streamSSE(c, async (stream) => { + try { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ stage: 'starting', percent: 0 }) + }); + + if (!forceRefresh) { + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ stage: 'checking_cache', percent: 10 }) + }); + + const cached = await get_cached(inputHash); + if (cached) { + for (let i = 0; i < cached.length; i++) { + await stream.writeSSE({ + event: 'result', + data: JSON.stringify(cached[i]) + }); + } + await stream.writeSSE({ + event: 'complete', + data: JSON.stringify({ total: cached.length, cached: true }) + }); + return; + } + } + + await stream.writeSSE({ + event: 'progress', + data: JSON.stringify({ stage: 'fetching_from_api', percent: 20 }) + }); + + // Note: searchLocations currently awaits all results. + // Ideally, we would refactor searchLocations to yield results or accept a callback. + // For now, we fetch all then stream. + const results = await searchLocations(query as any, userId); + + for (let i = 0; i < results.length; i++) { + await stream.writeSSE({ + event: 'result', + data: JSON.stringify(results[i]) + }); + } + + await stream.writeSSE({ + event: 'complete', + data: JSON.stringify({ total: results.length, cached: false }) + }); + + } catch (error: any) { + logger.error(error, 'Stream error'); + await stream.writeSSE({ + event: 'error', + data: JSON.stringify({ error: error.message || 'Internal Server Error' }) + }); + } + }); +}; diff --git a/packages/shared/src/server/registry.ts b/packages/shared/src/server/registry.ts new file mode 100644 index 00000000..a18d5167 --- /dev/null +++ b/packages/shared/src/server/registry.ts @@ -0,0 +1,34 @@ +import { PgBoss } from 'pg-boss'; +import { AbstractProduct } from './AbstractProduct.js'; +import { LocationsProduct } from './locations/index.js'; +import './subscriber.js'; + +export const ALL_PRODUCTS: AbstractProduct[] = + [ + new LocationsProduct() + ]; + + +// Helper to get all workers +export const getAllWorkers = () => { + return ALL_PRODUCTS.flatMap(p => p.workers || []); +}; + +// Helper to register routes +export const registerProductRoutes = (app: any) => { + ALL_PRODUCTS.forEach(product => { + product.routes.forEach(route => { + // @ts-ignore - Hono types might mismatch slightly + app.openapi(route.definition, route.handler); + }); + }); +}; + + + +// Helper to initialize products (lifecycle: start) +export const startProducts = async (boss: PgBoss) => { + for (const product of ALL_PRODUCTS) { + await product.start(boss); + } +}; diff --git a/packages/shared/src/server/subscriber.ts b/packages/shared/src/server/subscriber.ts new file mode 100644 index 00000000..28cf8d30 --- /dev/null +++ b/packages/shared/src/server/subscriber.ts @@ -0,0 +1,42 @@ +import { ALL_PRODUCTS } from './registry.js'; +import { EventBus } from './EventBus.js'; + +const findProductByQueue = (queue: string) => { + return ALL_PRODUCTS.find(p => + p.workers?.some(w => { + try { + const worker = new (w as any)(); + return worker.queueName === queue; + } catch (e) { + return false; + } + }) + ); +}; + +EventBus.on('job:create', (event: any) => { + const product = findProductByQueue(event.queue); + + if (!product) return; + + // Apply default job options from product if available + if (product.jobOptions) { + event.options = { ...product.jobOptions, ...event.options }; + } + + const singletonKey = product.hash(event.data); + if (singletonKey) { + event.options.singletonKey = singletonKey; + // Default to 5 minutes if not specified + if (!event.options.singletonSeconds) { + event.options.singletonSeconds = 300; + } + } + + const { userId } = event.data; + if (userId) { + const metadata = product.meta(userId); + event.data = { ...event.data, ...metadata }; + } + +});