diff --git a/packages/shared/src/server/commons/decorators.ts b/packages/shared/src/server/commons/decorators.ts new file mode 100644 index 00000000..1aa55207 --- /dev/null +++ b/packages/shared/src/server/commons/decorators.ts @@ -0,0 +1,153 @@ +import { trackUsage, updateUsageRecord } from '../middleware/usageTracking.js'; +import { FunctionRegistry, PublicEndpointRegistry, AdminEndpointRegistry } from './registry.js'; +import { logger } from './logger.js'; +import { WorkerRegistry } from '../jobs/boss/registry.js'; +import { Job } from 'pg-boss'; + +/** + * Decorator/Wrapper to mark an endpoint as public + * Registers the route in PublicEndpointRegistry + */ +export function Public(route: T): T { + PublicEndpointRegistry.register(route.path, route.method); + return route; +} + +/** + * Decorator/Wrapper to mark an endpoint as admin-only + * Registers the route in AdminEndpointRegistry + */ +export function Admin(route: T): T { + AdminEndpointRegistry.register(route.path, route.method); + return route; +} + +export interface BillableContext { + userId: string; + jobId: string; + signal?: AbortSignal; + metadata?: Record; +} + +export interface BillableOptions { + productId: string; + actionId: string; + cancellable?: boolean; +} + +/** + * Decorator to mark a method as billable + * Handles usage tracking, context injection, and cancellation + */ +export function Billable(options: BillableOptions) { + return function ( + target: any, + propertyKey: string, + descriptor: PropertyDescriptor + ) { + const originalMethod = descriptor.value; + + descriptor.value = async function (...args: any[]) { + // 1. Extract context + // Assumes the first argument is BillableContext, or it's part of the first argument object + let context: BillableContext | undefined; + + if (args.length > 0 && typeof args[0] === 'object') { + // Check if first arg is context + if ('userId' in args[0] && 'jobId' in args[0]) { + context = args[0] as BillableContext; + } + } + + if (!context) { + // If no context provided, we can't track usage properly + // For now, we'll log a warning and proceed without tracking + // In strict mode, we might want to throw an error + logger.warn(`[Billable] No context provided for ${options.productId}:${options.actionId}`); + return originalMethod.apply(this, args); + } + + // 2. Get config + const config = FunctionRegistry.get(options.productId, options.actionId); + if (!config) { + logger.warn(`[Billable] No config found for ${options.productId}:${options.actionId}`); + return originalMethod.apply(this, args); + } + + // 3. Start tracking + const usageId = await trackUsage({ + userId: context.userId, + endpoint: 'function', // Internal function call + method: 'CALL', + product: options.productId, + action: options.actionId, + costUnits: config.costUnits, + cancellable: options.cancellable || false, + jobId: context.jobId, + metadata: context.metadata + }); + + const startTime = Date.now(); + let error: Error | null = null; + let result: any; + + try { + // 4. Execute method + // If cancellable, we should ideally wrap the execution or check signal + if (options.cancellable && context.signal) { + if (context.signal.aborted) { + throw new Error('Operation cancelled'); + } + + // Add abort listener + context.signal.addEventListener('abort', () => { + logger.info(`[Billable] Job ${context?.jobId} aborted via signal`); + }); + } + + result = await originalMethod.apply(this, args); + return result; + } catch (err) { + error = err as Error; + throw err; + } finally { + // 5. End tracking + if (usageId) { + const endTime = Date.now(); + await updateUsageRecord({ + usageId, + responseStatus: error ? 500 : 200, + responseTimeMs: endTime - startTime, + error + }); + } + } + }; + + return descriptor; + }; +} + +/** + * Class Decorator: Registers the worker queue name + */ +export function Worker(queueName: string) { + return function (constructor: T) { + // We can't easily access the instance method 'handler' here without instantiating + // So we assume the class has a 'handler' method or we register the class itself + // For simplicity, let's assume we'll instantiate it later or the registry handles it. + // But wait, pg-boss needs a function. + // Let's store the constructor in the registry, and the registry (or bootstrap) will instantiate and bind. + + // Actually, let's just attach the queue name to the class for now, + // and let a separate scanner or manual registration use it. + // OR, we can register a factory. + + // Better approach for now: Register the prototype's handler if it exists. + // But 'handler' is on the instance usually. + + // Let's just modify the class to have a static 'queueName' property + // and register it. + (constructor as any).queueName = queueName; + }; +} diff --git a/packages/shared/src/server/commons/logger.ts b/packages/shared/src/server/commons/logger.ts new file mode 100644 index 00000000..7ccf1424 --- /dev/null +++ b/packages/shared/src/server/commons/logger.ts @@ -0,0 +1,34 @@ +import pino from 'pino'; +import path from 'path'; + +const fileTransport = pino.transport({ + target: 'pino/file', + options: { destination: path.join(process.cwd(), 'app.log') }, +}); + +const consoleTransport = pino.transport({ + target: 'pino-pretty', + options: { + colorize: true, + ignore: 'pid,hostname', + destination: 1, + }, +}); + +export const logger = pino( + { + level: process.env.PINO_LOG_LEVEL || 'info', + formatters: { + level: (label) => { + return { level: label.toUpperCase() }; + }, + }, + timestamp: pino.stdTimeFunctions.isoTime, + }, + pino.multistream([ + { stream: fileTransport, level: 'info' }, + { stream: consoleTransport, level: 'info' }, + ]) +); + +export default logger; diff --git a/packages/shared/src/server/commons/registry.ts b/packages/shared/src/server/commons/registry.ts new file mode 100644 index 00000000..264e38ab --- /dev/null +++ b/packages/shared/src/server/commons/registry.ts @@ -0,0 +1,174 @@ +import { ProductActionConfig, PRODUCT_ACTIONS } from '../config/products.js'; + +export interface BillableFunctionConfig extends ProductActionConfig { + productId: string; + actionId: string; +} + +/** + * Central registry for all billable functions + * Manages configuration, costs, and metadata + */ +export class FunctionRegistry { + private static registry = new Map(); + private static initialized = false; + + /** + * Initialize the registry with default configurations + */ + static initialize() { + if (this.initialized) return; + + // Load legacy PRODUCT_ACTIONS + for (const [productId, actions] of Object.entries(PRODUCT_ACTIONS)) { + for (const [actionId, config] of Object.entries(actions)) { + this.register({ + productId, + actionId, + ...config + }); + } + } + + this.initialized = true; + } + + /** + * Register a new billable function + */ + static register(config: BillableFunctionConfig) { + const key = this.getKey(config.productId, config.actionId); + this.registry.set(key, config); + } + + /** + * Get configuration for a specific function + */ + static get(productId: string, actionId: string): BillableFunctionConfig | null { + if (!this.initialized) this.initialize(); + const key = this.getKey(productId, actionId); + return this.registry.get(key) || null; + } + + /** + * Get all registered functions + */ + static getAll(): BillableFunctionConfig[] { + if (!this.initialized) this.initialize(); + return Array.from(this.registry.values()); + } + + /** + * Find a configuration by matching route endpoint and method + * (Used for middleware backward compatibility) + */ + static findByRoute(path: string, method: string): BillableFunctionConfig | null { + if (!this.initialized) this.initialize(); + + for (const config of this.registry.values()) { + if (this.matchesRoute(path, config.endpoint) && method === config.method) { + return config; + } + } + return null; + } + + private static getKey(productId: string, actionId: string): string { + return `${productId}:${actionId}`; + } + + private static matchesRoute(path: string, pattern: string): boolean { + // Convert pattern to regex + // Handle both :param (Express/Hono style) and {param} (OpenAPI style) + // e.g., '/api/competitors/:place_id' or '/api/competitors/{place_id}' -> /^\/api\/competitors\/[^\/]+$/ + const regexPattern = pattern + .replace(/:[^\\/]+/g, '[^/]+') // Replace :param with regex + .replace(/\{[^}]+\}/g, '[^/]+') // Replace {param} with regex + .replace(/\//g, '\\/'); + + // Allow optional trailing slash + const regex = new RegExp(`^${regexPattern}\\/?$`); + return regex.test(path); + } +} + +/** + * Registry for public endpoints that don't require authentication + */ +export class PublicEndpointRegistry { + private static registry = new Set(); + + static register(path: string, method: string) { + this.registry.add(`${method.toUpperCase()}:${path}`); + } + + static isPublic(path: string, method: string): boolean { + const methodUpper = method.toUpperCase(); + + for (const registered of this.registry) { + const [regMethod, regPath] = registered.split(':'); + + if (regMethod !== methodUpper) continue; + + // Check if path matches pattern + if (this.matchesRoute(path, regPath)) { + return true; + } + } + return false; + } + + private static matchesRoute(path: string, pattern: string): boolean { + // Convert pattern to regex + // Handle both :param (Express/Hono style) and {param} (OpenAPI style) + // e.g., '/api/competitors/:place_id' or '/api/competitors/{place_id}' -> /^\/api\/competitors\/[^\/]+$/ + const regexPattern = pattern + .replace(/:[^\\/]+/g, '[^/]+') // Replace :param with regex + .replace(/\{[^}]+\}/g, '[^/]+') // Replace {param} with regex + .replace(/\//g, '\\/'); + + // Allow optional trailing slash + const regex = new RegExp(`^${regexPattern}\\/?$`); + return regex.test(path); + } +} + +/** + * Registry for admin-only endpoints + */ +export class AdminEndpointRegistry { + private static registry = new Set(); + + static register(path: string, method: string) { + this.registry.add(`${method.toUpperCase()}:${path}`); + } + + static isAdmin(path: string, method: string): boolean { + const methodUpper = method.toUpperCase(); + + for (const registered of this.registry) { + const [regMethod, regPath] = registered.split(':'); + + if (regMethod !== methodUpper) continue; + + // Check if path matches pattern + if (this.matchesRoute(path, regPath)) { + return true; + } + } + return false; + } + + private static matchesRoute(path: string, pattern: string): boolean { + // Convert pattern to regex + // Handle both :param (Express/Hono style) and {param} (OpenAPI style) + // e.g., '/api/competitors/:place_id' or '/api/competitors/{place_id}' -> /^\/api\/competitors\/[^\/]+$/ + const regexPattern = pattern + .replace(/:[^\\/]+/g, '[^/]+') // Replace :param with regex + .replace(/\{[^}]+\}/g, '[^/]+') // Replace {param} with regex + .replace(/\//g, '\\/'); + + const regex = new RegExp(`^${regexPattern}\\/?$`); + return regex.test(path); + } +} diff --git a/packages/shared/src/server/commons/supabase.ts b/packages/shared/src/server/commons/supabase.ts new file mode 100644 index 00000000..50a268df --- /dev/null +++ b/packages/shared/src/server/commons/supabase.ts @@ -0,0 +1,26 @@ +import { createClient } from '@supabase/supabase-js' +import 'dotenv/config' + +const supabaseUrl = process.env.SUPABASE_URL +const supabaseKey = process.env.SUPABASE_SERVICE_KEY + +import { logger } from './logger.js' + +if (!supabaseUrl || !supabaseKey) { + logger.fatal('Missing Supabase environment variables') + process.exit(1) +} + +export const supabase = createClient(supabaseUrl, supabaseKey) + +/** + * Test Supabase connection by attempting a simple query + */ +export async function testSupabaseConnection(): Promise { + try { + const { error } = await supabase.from('products').select('id').limit(1) + return !error + } catch { + return false + } +} diff --git a/packages/shared/src/server/commons/types.ts b/packages/shared/src/server/commons/types.ts new file mode 100644 index 00000000..ee7a4037 --- /dev/null +++ b/packages/shared/src/server/commons/types.ts @@ -0,0 +1,10 @@ +import { Env } from 'hono' + +export interface HonoEnv extends Env { + Variables: { + jobId?: string; + userId?: string; + usageId?: string; + skipUsageStatusUpdate?: boolean; + } +} diff --git a/packages/shared/src/server/endpoints/admin.ts b/packages/shared/src/server/endpoints/admin.ts new file mode 100644 index 00000000..366a16a2 --- /dev/null +++ b/packages/shared/src/server/endpoints/admin.ts @@ -0,0 +1,26 @@ +import { createRoute } from '@hono/zod-openapi' +import { StatsSchema } from '../schemas/index.js' +import type { Context } from 'hono' +import { Admin } from '../commons/decorators.js' + +export const getStatsRoute = Admin(createRoute({ + method: 'get', + path: '/api/admin/stats', + responses: { + 200: { + content: { + 'application/json': { + schema: StatsSchema, + }, + }, + description: 'Retrieve admin stats', + }, + }, +})) + +export function getStatsHandler(c: Context) { + return c.json({ + users: 100, + revenue: 5000, + }, 200) +} diff --git a/packages/shared/src/server/endpoints/boss.ts b/packages/shared/src/server/endpoints/boss.ts new file mode 100644 index 00000000..fee5cac9 --- /dev/null +++ b/packages/shared/src/server/endpoints/boss.ts @@ -0,0 +1,304 @@ +import { createRoute, OpenAPIHono, z } from '@hono/zod-openapi'; +import { RouteHandler } from '@hono/zod-openapi'; +import { boss } from '../jobs/boss/client.js'; +import { QUEUE_MOCK_JOB } from '../jobs/boss/workers.js'; +import { HonoEnv } from '../commons/types.js'; + +const tags = ['PgBoss']; + +export const postBossJobRoute = createRoute({ + method: 'post', + path: '/api/boss/job', + tags, + request: { + body: { + content: { + 'application/json': { + schema: z.object({ + delayMs: z.number().default(100), + shouldFail: z.boolean().default(false), + retryLimit: z.number().optional() + }), + }, + }, + }, + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ + jobId: z.string().nullable(), + message: z.string() + }), + }, + }, + description: 'PgBoss job started', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Server error', + }, + }, +}); + +export const postBossJobHandler: RouteHandler = async (c) => { + if (!boss) { + // Check if there was an initialization error we can report + const { bossInitError } = await import('../jobs/boss/client.js'); + return c.json({ error: `PgBoss not initialized. Init error: ${bossInitError}` }, 500); + } + + const { delayMs, shouldFail, retryLimit } = c.req.valid('json'); + const payload = { delayMs, shouldFail }; + const options = retryLimit !== undefined ? { retryLimit } : {}; + try { + const jobId = await boss.send(QUEUE_MOCK_JOB, payload, options); + return c.json({ jobId, message: 'Job submitted to PgBoss' }, 200); + } catch (error: any) { + return c.json({ error: error.message }, 500); + } +}; + +export const getBossJobRoute = createRoute({ + method: 'get', + path: '/api/boss/job/{id}', + tags, + request: { + params: z.object({ + id: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ + id: z.string(), + name: z.string(), + data: z.any(), + state: z.string(), + createdOn: z.string().optional(), + startedOn: z.string().optional(), + completedOn: z.string().optional(), + }), + }, + }, + description: 'Job status', + }, + 404: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Job not found', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Server error', + }, + }, +}); + +export const getBossJobHandler: RouteHandler = async (c) => { + const { id } = c.req.valid('param'); + + // Use pg directly to bypass PostgREST schema permissions + const { Client } = await import('pg'); + const client = new Client({ connectionString: process.env.DATABASE_URL, }); + + try { + await client.connect(); + const result = await client.query('SELECT * FROM pgboss.job WHERE id = $1', [id]); + const job = result.rows[0]; + + if (!job) { + return c.json({ error: 'Job not found' }, 404); + } + + return c.json({ + id: job.id, + name: job.name, + data: job.data, + state: job.state, + createdOn: job.createdon, + startedOn: job.startedon, + completedOn: job.completedon, + output: job.output, + }, 200); + } catch (error: any) { + console.error('Error in getBossJobHandler:', error); + return c.json({ error: error.message }, 500); + } finally { + await client.end().catch(() => { }); + } +}; + +export const cancelBossJobRoute = createRoute({ + method: 'post', + path: '/api/boss/job/{id}/cancel', + tags, + request: { + params: z.object({ + id: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ message: z.string() }), + }, + }, + description: 'Job cancelled', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Server error', + }, + }, +}); + +export const cancelBossJobHandler: RouteHandler = async (c) => { + if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500); + const { id } = c.req.valid('param'); + try { + await boss.cancel(QUEUE_MOCK_JOB, id); + return c.json({ message: 'Job cancelled' }, 200); + } catch (error: any) { + return c.json({ error: error.message }, 500); + } +}; + +export const resumeBossJobRoute = createRoute({ + method: 'post', + path: '/api/boss/job/{id}/resume', + tags, + request: { + params: z.object({ + id: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ message: z.string() }), + }, + }, + description: 'Job resumed', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Server error', + }, + }, +}); + +export const resumeBossJobHandler: RouteHandler = async (c) => { + if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500); + const { id } = c.req.valid('param'); + try { + await boss.resume(QUEUE_MOCK_JOB, id); + return c.json({ message: 'Job resumed' }, 200); + } catch (error: any) { + return c.json({ error: error.message }, 500); + } +}; + +export const completeBossJobRoute = createRoute({ + method: 'post', + path: '/api/boss/job/{id}/complete', + tags, + request: { + params: z.object({ + id: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ message: z.string() }), + }, + }, + description: 'Job completed', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Server error', + }, + }, +}); + +export const completeBossJobHandler: RouteHandler = async (c) => { + if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500); + const { id } = c.req.valid('param'); + try { + await boss.complete(QUEUE_MOCK_JOB, id); + return c.json({ message: 'Job completed' }, 200); + } catch (error: any) { + return c.json({ error: error.message }, 500); + } +}; + +export const failBossJobRoute = createRoute({ + method: 'post', + path: '/api/boss/job/{id}/fail', + tags, + request: { + params: z.object({ + id: z.string(), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: z.object({ message: z.string() }), + }, + }, + description: 'Job failed', + }, + 500: { + content: { + 'application/json': { + schema: z.object({ error: z.string() }), + }, + }, + description: 'Server error', + }, + }, +}); + +export const failBossJobHandler: RouteHandler = async (c) => { + if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500); + const { id } = c.req.valid('param'); + try { + await boss.fail(QUEUE_MOCK_JOB, id); + return c.json({ message: 'Job failed' }, 200); + } catch (error: any) { + return c.json({ error: error.message }, 500); + } +}; diff --git a/packages/shared/src/server/endpoints/competitors/getLocationByPlaceId.ts b/packages/shared/src/server/endpoints/competitors/getLocationByPlaceId.ts new file mode 100644 index 00000000..5b8b2b77 --- /dev/null +++ b/packages/shared/src/server/endpoints/competitors/getLocationByPlaceId.ts @@ -0,0 +1,21 @@ +import { CompetitorFull, LocationType } from '@polymech/shared' +import { supabase } from '../../commons/supabase.js' + +/** + * Shared utility to fetch location details by place_id + * Used by both competitor details and find email endpoints + */ +export async function getLocationByPlaceId(place_id: string): Promise { + + const { data: location, error } = await supabase + .from('locations') + .select('*') + .eq('place_id', place_id) + .single() + + if (error || !location) { + return null + } + + return location +} diff --git a/packages/shared/src/server/endpoints/find/email.ts b/packages/shared/src/server/endpoints/find/email.ts new file mode 100644 index 00000000..63ff37eb --- /dev/null +++ b/packages/shared/src/server/endpoints/find/email.ts @@ -0,0 +1,233 @@ +import { createRoute, RouteHandler, z } from '@hono/zod-openapi' +import { ErrorSchema } from '../../schemas/index.js' +import { supabase } from '../../commons/supabase.js' +import { getLocationByPlaceId } from '../competitors/getLocationByPlaceId.js' +import { logger } from '../../commons/logger.js' +import { findEmailEach, parseHtml, Page, LocationSiteMeta } from '@polymech/search' + +const tags = ['Find'] + +// Email metadata schema +const EmailMetadataSchema = z.object({ + email: z.string().email(), + source: z.string().url(), + foundAt: z.string().datetime(), + tool: z.string() +}) + +// Response schema +const FindEmailResponseSchema = z.object({ + job_id: z.string().nullable(), + place_id: z.string(), + emails: z.array(EmailMetadataSchema), + status: z.enum(['cached', 'processing', 'completed', 'failed']), + message: z.string() +}) + +export const getFindEmailRoute = createRoute({ + method: 'get', + path: '/api/find/email/{place_id}', + tags, + request: { + params: z.object({ + place_id: z.string(), + cache: z.boolean().optional().default(false), + }), + }, + responses: { + 200: { + content: { + 'application/json': { + schema: FindEmailResponseSchema, + }, + }, + description: 'Email finding results', + }, + 401: { + content: { + 'application/json': { + schema: ErrorSchema, + }, + }, + description: 'Unauthorized', + }, + 404: { + content: { + 'application/json': { + schema: ErrorSchema, + }, + }, + description: 'Location not found', + }, + 429: { + content: { + 'application/json': { + schema: ErrorSchema, + }, + }, + description: 'Too many active jobs', + }, + 500: { + content: { + 'application/json': { + schema: ErrorSchema, + }, + }, + description: 'Server error', + }, + }, +}) + +export const getFindEmailHandler: any = async (c: any) => { + const { place_id, cache } = c.req.valid('param') + const jobId = c.get('jobId') as string | undefined + const userId = c.get('userId') as string | undefined + + if (!userId) { + return c.json({ error: 'Unauthorized' }, 401) + } + + logger.info(`[FindEmail] Starting email finding for ${place_id} - User: ${userId} - JobId: ${jobId}`) + + try { + // 1. Fetch location details + const location = await getLocationByPlaceId(place_id) + + if (!location) { + return c.json({ error: 'Location not found' }, 404) + } + + // 2. Check for existing emails in metadata + const existingEmails = location.meta?.emails || [] + + if (existingEmails.length > 0 && !cache) { + logger.info(`[FindEmail] Returning cached emails for ${place_id}`) + return c.json({ + job_id: jobId || null, + place_id, + emails: existingEmails, + status: 'cached' as const, + message: 'Emails already found (cached)' + }, 200) + } + + // 3. Check if location has a website + if (!location.website) { + return c.json({ + job_id: jobId || null, + place_id, + emails: [], + status: 'completed' as const, + message: 'No website available for this location' + }, 200) + } + + // 4. Get timeout from environment (already fetched above) + + // 5. Ensure Meta Data + if (!location.meta || !location.meta.pages || !cache) { + logger.info(`[FindEmail] Meta missing for ${place_id}, fetching...`) + try { + const meta = await parseHtml(location.website, null, { + headless: true, + timeout: 1000 * 60 + }) + + if (meta) { + location.meta = { ...location.meta, ...meta } as any + // Update DB with initial meta + const { error: updateError } = await supabase.from('locations').update({ meta: location.meta }).eq('place_id', place_id) + if (updateError) { + logger.error({ err: updateError }, `[FindEmail] Error updating initial meta for ${place_id}`) + } + } + } catch (err) { + logger.error({ err }, `[FindEmail] Error fetching meta for ${place_id}`) + } + } + + // 6. Execute Iterative Scraping + logger.info(`[FindEmail] Starting iterative email search for ${place_id}`) + + const foundEmails: string[] = [] + let isTimedOut = false + const timeoutMs = 60 * 1000 + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => { + isTimedOut = true + reject(new Error('Timeout')) + }, timeoutMs) + }) + + const searchPromise = findEmailEach(location, { + headless: true, + searchFrom: 'api', + }, async (page: Page) => { + if (isTimedOut) return + + // Progress Callback: Update DB + logger.info(`[FindEmail] Scraped ${page.url}, status: ${page.status}`) + + const { error: progressError } = await supabase.from('locations').update({ + meta: location.meta + }).eq('place_id', place_id) + + if (progressError) { + logger.error({ err: progressError }, `[FindEmail] Error updating progress meta for ${place_id}`) + } + }) + + try { + await Promise.race([searchPromise, timeoutPromise]) + } catch (error: any) { + if (error.message === 'Timeout') { + logger.warn(`[FindEmail] Timeout exceeded for ${place_id}`) + } else { + throw error + } + } + + const emails = (await searchPromise) || [] + + // Format emails + const emailsWithMetadata = emails.map((email) => ({ + email, + source: location.website || '', + foundAt: new Date().toISOString(), + tool: 'puppeteer' + })) + + // Merge and Update + const allEmails = [...existingEmails, ...emailsWithMetadata] + + // Deduplicate + const uniqueEmails = Array.from(new Map(allEmails.map(item => [item.email, item])).values()); + + const { error: finalUpdateError } = await supabase + .from('locations') + .update({ + meta: { + ...location.meta, + emails: uniqueEmails + }, + updated_at: new Date().toISOString() + }) + .eq('place_id', place_id) + + if (finalUpdateError) { + logger.error({ err: finalUpdateError }, `[FindEmail] Error saving emails for ${place_id}`) + } + + return c.json({ + job_id: jobId || null, + place_id, + emails: uniqueEmails, + status: isTimedOut ? 'completed' : 'completed', + message: isTimedOut ? 'Search timed out, returning partial results' : `Found ${emails.length} new email(s)` + }, 200) + + } catch (error: any) { + logger.error({ err: error }, '[FindEmail] Error') + return c.json({ error: error.message || 'Internal Server Error' }, 500) + } +} \ No newline at end of file diff --git a/packages/shared/src/server/endpoints/images/index.ts b/packages/shared/src/server/endpoints/images/index.ts new file mode 100644 index 00000000..41204261 --- /dev/null +++ b/packages/shared/src/server/endpoints/images/index.ts @@ -0,0 +1,126 @@ +import { createRoute, z } from '@hono/zod-openapi' +import { ImageSchema, ImageResponseSchema, ErrorSchema } from '../../schemas/index.js' +import type { Context } from 'hono' + +const tags = ['Images'] + +export const getImagesRoute = createRoute({ + method: 'get', + path: '/api/images/get', + tags, + responses: { + 200: { + content: { + 'application/json': { + schema: ImageResponseSchema, + }, + }, + description: 'Retrieve images', + }, + 500: { + content: { + 'application/json': { + schema: ErrorSchema, + }, + }, + description: 'Server error', + }, + }, +}) + +export const postImagesRoute = createRoute({ + method: 'post', + path: '/api/images/post', + tags, + responses: { + 200: { + content: { + 'application/json': { + schema: ImageResponseSchema, + }, + }, + description: 'Create image', + }, + }, +}) + +export const putImagesRoute = createRoute({ + method: 'put', + path: '/api/images/put', + tags, + responses: { + 200: { + content: { + 'application/json': { + schema: ImageResponseSchema, + }, + }, + description: 'Put image', + }, + }, +}) + +export const updateImagesRoute = createRoute({ + method: 'patch', // Using patch for update as is common, though user said 'update' which isn't a method. + // Wait, user said endpoints: /api/images/get,put,post,update + // 'update' is likely the path, not the method. + // I will assume POST or PATCH to /api/images/update. Let's use POST for 'update' path to be safe or PATCH. + // Actually, let's stick to standard methods for the paths if possible, but the paths are explicit. + // Let's use PATCH method for /api/images/update path. + path: '/api/images/update', + tags, + responses: { + 200: { + content: { + 'application/json': { + schema: ImageResponseSchema, + }, + }, + description: 'Update image', + }, + }, +}) + + +// Stubbed data +const stubbedImages = [ + { + idx: 0, + id: 6, + name: "images", + slug: "images", + description: "fcghdfgh", + price: "10.00", + variants: "[]", + created_at: "2025-11-22 10:46:09.77718+00", + updated_at: "2025-11-22 10:46:09.77718+00" + } +] + +export async function getImagesHandler(c: Context) { + return c.json({ + message: 'Get images success', + data: stubbedImages + }, 200) +} + +export async function postImagesHandler(c: Context) { + return c.json({ + message: 'Post images success', + data: stubbedImages + }, 200) +} + +export async function putImagesHandler(c: Context) { + return c.json({ + message: 'Put images success', + data: stubbedImages + }, 200) +} + +export async function updateImagesHandler(c: Context) { + return c.json({ + message: 'Update images success', + data: stubbedImages + }, 200) +} diff --git a/packages/shared/src/server/endpoints/products.ts b/packages/shared/src/server/endpoints/products.ts new file mode 100644 index 00000000..a2b80745 --- /dev/null +++ b/packages/shared/src/server/endpoints/products.ts @@ -0,0 +1,39 @@ +import { createRoute, z } from '@hono/zod-openapi' +import { ProductSchema, ErrorSchema } from '../schemas/index.js' +import { supabase } from '../commons/supabase.js' +import type { Context } from 'hono' + +export const getProductsRoute = createRoute({ + method: 'get', + path: '/api/products', + responses: { + 200: { + content: { + 'application/json': { + schema: z.array(ProductSchema), + }, + }, + description: 'Retrieve all products', + }, + 500: { + content: { + 'application/json': { + schema: ErrorSchema, + }, + }, + description: 'Server error', + }, + }, +}) + +export async function getProductsHandler(c: Context) { + const { data, error } = await supabase + .from('products') + .select('*') + + if (error) { + return c.json({ error: error.message }, 500) + } + + return c.json(data, 200) +} diff --git a/packages/shared/src/server/endpoints/subscriptions.ts b/packages/shared/src/server/endpoints/subscriptions.ts new file mode 100644 index 00000000..f8185887 --- /dev/null +++ b/packages/shared/src/server/endpoints/subscriptions.ts @@ -0,0 +1,25 @@ +import { createRoute, z } from '@hono/zod-openapi' +import { SubscriptionSchema } from '../schemas/index.js' +import type { Context } from 'hono' + +export const getSubscriptionsRoute = createRoute({ + method: 'get', + path: '/api/subscriptions', + responses: { + 200: { + content: { + 'application/json': { + schema: z.array(SubscriptionSchema), + }, + }, + description: 'Retrieve subscription plans', + }, + }, +}) + +export function getSubscriptionsHandler(c: Context) { + return c.json([ + { id: 1, name: 'Basic', price: 10 }, + { id: 2, name: 'Pro', price: 20 }, + ], 200) +} diff --git a/packages/shared/src/server/jobs/boss/AbstractWorker.ts b/packages/shared/src/server/jobs/boss/AbstractWorker.ts new file mode 100644 index 00000000..096ef350 --- /dev/null +++ b/packages/shared/src/server/jobs/boss/AbstractWorker.ts @@ -0,0 +1,63 @@ +import { Job, PgBoss } from 'pg-boss'; +import EventEmitter from 'events'; + +import { logger } from '../../commons/logger.js'; +import { EventBus } from '../../products/EventBus.js'; + +export abstract class AbstractWorker { + abstract readonly queueName: string; + readonly queueOptions?: any; // pg-boss QueueOptions + protected emitter?: EventEmitter; + public boss?: PgBoss; + readonly teamSize?: number; + + // Cost calculation can be static or dynamic based on results + abstract calculateCost(job: Job, result?: any): number; + + // The core business logic + protected abstract process(job: Job): Promise; + + // Main entry point for pg-boss + public async handler(jobOrJobs: Job | Job[]) { + + const job = Array.isArray(jobOrJobs) ? jobOrJobs[0] : jobOrJobs; + + // Safety check + if (!job) { + logger.error(`[${this.queueName}] Received null or empty job`); + return; + } + + const jobId = job.id; + logger.info(`[${this.queueName}] Starting job ${jobId}`); + try { + // 2. Execute Business Logic + const result = await this.process(job); + + // 3. Calculate Cost + const cost = this.calculateCost(job, result); + + // Emit completion on global EventBus + EventBus.emit('job:complete', { + jobId, + result, + data: job.data + }); + + + return result; + + } catch (error: any) { + + logger.error({ err: error }, `[${this.queueName}] Job failed`); + + EventBus.emit('job:failed', { + jobId, + error: error.message || 'Unknown error', + data: job.data + }); + + throw error; // Let pg-boss handle retry/failure + } + } +} diff --git a/packages/shared/src/server/jobs/boss/client.ts b/packages/shared/src/server/jobs/boss/client.ts new file mode 100644 index 00000000..1fc3330a --- /dev/null +++ b/packages/shared/src/server/jobs/boss/client.ts @@ -0,0 +1,44 @@ +import { PgBoss } from 'pg-boss'; +import { logger } from '../../commons/logger.js'; + +const connectionString = process.env.DATABASE_URL; + +if (!connectionString) { + logger.warn('DATABASE_URL not found, PgBoss will not be initialized'); +} + +export const boss = connectionString ? new PgBoss({ + connectionString, + __test__enableSpies: true +} as any) : null; +export let bossInitError: Error | null = null; + +export async function startBoss() { + if (!boss) return; + + boss.on('error', (error: Error) => logger.error({ error }, 'PgBoss error')); + + try { + await boss.start(); + logger.info('PgBoss started'); + return boss; + } catch (error: any) { + bossInitError = error; + logger.error({ error }, 'Failed to start PgBoss'); + const fs = await import('fs'); + fs.writeFileSync('debug_pgboss_error.txt', JSON.stringify(error, Object.getOwnPropertyNames(error))); + } +} + +export async function stopBoss() { + if (!boss) { + console.info('PgBoss not initialized, skipping stop.') + return + } + try { + await boss.stop({ timeout: 5000 }); // 5s timeout + console.info('PgBoss stopped'); + } catch (error) { + console.error({ error }, 'Failed to stop PgBoss'); + } +} \ No newline at end of file diff --git a/packages/shared/src/server/jobs/boss/registry.ts b/packages/shared/src/server/jobs/boss/registry.ts new file mode 100644 index 00000000..e90a5319 --- /dev/null +++ b/packages/shared/src/server/jobs/boss/registry.ts @@ -0,0 +1,25 @@ +import { Job } from 'pg-boss'; + +type WorkerHandler = (job: Job) => Promise; + +interface WorkerConfig { + queueName: string; + handler: WorkerHandler; + options?: any; +} + +export class WorkerRegistry { + private static workers: Map = new Map(); + + static register(queueName: string, handler: WorkerHandler, options?: any) { + this.workers.set(queueName, { queueName, handler, options }); + } + + static get(queueName: string): WorkerConfig | undefined { + return this.workers.get(queueName); + } + + static getAll(): WorkerConfig[] { + return Array.from(this.workers.values()); + } +} diff --git a/packages/shared/src/server/jobs/boss/search/SearchWorker.ts b/packages/shared/src/server/jobs/boss/search/SearchWorker.ts new file mode 100644 index 00000000..a027ce97 --- /dev/null +++ b/packages/shared/src/server/jobs/boss/search/SearchWorker.ts @@ -0,0 +1,127 @@ +import { Job } from 'pg-boss'; +import { AbstractWorker } from '../AbstractWorker.js'; +import { googleMaps, ResolveFlags } from '@polymech/search'; +import { supabase } from '../../../commons/supabase.js'; +import { logger } from '../../../commons/logger.js'; +import { Worker } from '../../../commons/decorators.js'; +import { error } from 'console'; + +export interface SearchJobData { + query: string; + location: string; + filters?: { + filterCity?: string; + filterContinent?: string; + filterType?: string; + concurrency?: number; + }; + userId: string; + usageId?: string; +} + +@Worker('search-worker') +export class SearchWorker extends AbstractWorker { + readonly queueName = 'search-worker'; + + calculateCost(job: Job, result: any): number { + // Example: 1 credit per search + 0.1 per result + return 1 + (result?.length || 0) * 0.1; + } + + protected async process(job: Job) { + const { query, location, filters, userId } = job.data; + + // Call existing logic (refactored from endpoints/competitors/index.ts) + const results = await googleMaps({ + query, + searchFrom: location, + resolve: [ResolveFlags.PHOTOS], + filterCity: filters?.filterCity, + filterContinent: filters?.filterContinent, + filterType: filters?.filterType, + concurrency: filters?.concurrency || 5 + }); + + // Flatten results + const flatResults = results ? results.flat(Infinity) : []; + + // Map and Upsert Locations + const locationsToUpsert = flatResults + .filter((r: any) => r.place_id) + .map((r: any) => ({ + place_id: r.place_id, + title: r.title, + description: r.description, + address: r.address, + gps_coordinates: r.gps_coordinates, + phone: r.phone, + website: r.website, + operating_hours: r.operating_hours, + thumbnail: r.thumbnail, + types: r.types, + raw_data: r, + continent: r.geo?.continent, + country: r.geo?.countryName, + city: r.geo?.city, + updated_at: new Date().toISOString(), // Update timestamp + user_id: userId + })); + + // Fetch existing locations to preserve meta (emails) + const placeIds = locationsToUpsert.map(l => l.place_id); + if (placeIds.length > 0) { + const { data: existingLocations } = await supabase + .from('locations') + .select('place_id, meta') + .in('place_id', placeIds); + + if (existingLocations) { + const metaMap = new Map(existingLocations.map(l => [l.place_id, l.meta])); + locationsToUpsert.forEach(l => { + const existingMeta = metaMap.get(l.place_id); + if (existingMeta) { + // Merge existing meta into raw_data for the client + l.raw_data.meta = { + ...(l.raw_data.meta || {}), + ...existingMeta + }; + } + }); + } + } + + if (locationsToUpsert.length > 0) { + const { error: upsertError } = await supabase + .from('locations') + .upsert(locationsToUpsert, { onConflict: 'place_id' }); + + if (upsertError) { + logger.error(upsertError, 'Error upserting locations'); + throw upsertError; + } + } + + // Store Search (for caching) + // Re-create hash logic from handler + const { createHash } = await import('crypto'); + const inputParams = { query, location }; + const normalizedInput = JSON.stringify(inputParams, Object.keys(inputParams).sort()); + const inputHash = createHash('sha256').update(normalizedInput).digest('hex'); + + const { error: searchStoreError } = await supabase + .from('searches') + .upsert({ + input_hash: inputHash, + input_params: inputParams, + result_place_ids: placeIds, + created_at: new Date().toISOString() + }, { onConflict: 'input_hash' }); + + if (searchStoreError) { + logger.error(searchStoreError, `Error storing search ${searchStoreError.message}`); + // Don't fail the job just because caching failed + } + + return { count: locationsToUpsert.length, placeIds }; + } +} diff --git a/packages/shared/src/server/jobs/boss/workers.ts b/packages/shared/src/server/jobs/boss/workers.ts new file mode 100644 index 00000000..9463e935 --- /dev/null +++ b/packages/shared/src/server/jobs/boss/workers.ts @@ -0,0 +1,40 @@ +import { boss } from './client.js'; +import { getAllWorkers } from '@/products/registry.js'; +import { logger } from '@/commons/logger.js'; + +export const QUEUE_MOCK_JOB = 'mock-job'; + +interface MockJobData { + subtasks: number; + delayMs: number; + shouldFail: boolean; +} + +export async function registerMockWorkers() { + if (!boss) return; + + // Product workers are now registered by the products themselves in AbstractProduct.start() + + await boss.createQueue(QUEUE_MOCK_JOB); + await boss.work(QUEUE_MOCK_JOB, async (jobs: any) => { + // PgBoss might pass an array of jobs or a single job depending on config/version + const job = Array.isArray(jobs) ? jobs[0] : jobs; + + const data = job.data || {}; + const { delayMs = 100, shouldFail = false } = data; + const jobId = job.id; + + logger.info({ jobId, data }, 'Processing PgBoss mock job'); + + await new Promise(resolve => setTimeout(resolve, delayMs)); + + if (shouldFail) { + throw new Error('Simulated PgBoss job failure'); + } + + logger.info({ jobId }, 'PgBoss mock job completed'); + return { success: true }; + }); + + logger.info('PgBoss workers registered'); +}