pm server - shared - server products 1/3

This commit is contained in:
babayaga 2026-01-29 18:03:36 +01:00
parent f6592bb292
commit e94b35194b
17 changed files with 1470 additions and 0 deletions

View File

@ -0,0 +1,153 @@
import { trackUsage, updateUsageRecord } from '../middleware/usageTracking.js';
import { FunctionRegistry, PublicEndpointRegistry, AdminEndpointRegistry } from './registry.js';
import { logger } from './logger.js';
import { WorkerRegistry } from '../jobs/boss/registry.js';
import { Job } from 'pg-boss';
/**
* Decorator/Wrapper to mark an endpoint as public
* Registers the route in PublicEndpointRegistry
*/
export function Public<T extends { method: string, path: string }>(route: T): T {
PublicEndpointRegistry.register(route.path, route.method);
return route;
}
/**
* Decorator/Wrapper to mark an endpoint as admin-only
* Registers the route in AdminEndpointRegistry
*/
export function Admin<T extends { method: string, path: string }>(route: T): T {
AdminEndpointRegistry.register(route.path, route.method);
return route;
}
export interface BillableContext {
userId: string;
jobId: string;
signal?: AbortSignal;
metadata?: Record<string, any>;
}
export interface BillableOptions {
productId: string;
actionId: string;
cancellable?: boolean;
}
/**
* Decorator to mark a method as billable
* Handles usage tracking, context injection, and cancellation
*/
export function Billable(options: BillableOptions) {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
// 1. Extract context
// Assumes the first argument is BillableContext, or it's part of the first argument object
let context: BillableContext | undefined;
if (args.length > 0 && typeof args[0] === 'object') {
// Check if first arg is context
if ('userId' in args[0] && 'jobId' in args[0]) {
context = args[0] as BillableContext;
}
}
if (!context) {
// If no context provided, we can't track usage properly
// For now, we'll log a warning and proceed without tracking
// In strict mode, we might want to throw an error
logger.warn(`[Billable] No context provided for ${options.productId}:${options.actionId}`);
return originalMethod.apply(this, args);
}
// 2. Get config
const config = FunctionRegistry.get(options.productId, options.actionId);
if (!config) {
logger.warn(`[Billable] No config found for ${options.productId}:${options.actionId}`);
return originalMethod.apply(this, args);
}
// 3. Start tracking
const usageId = await trackUsage({
userId: context.userId,
endpoint: 'function', // Internal function call
method: 'CALL',
product: options.productId,
action: options.actionId,
costUnits: config.costUnits,
cancellable: options.cancellable || false,
jobId: context.jobId,
metadata: context.metadata
});
const startTime = Date.now();
let error: Error | null = null;
let result: any;
try {
// 4. Execute method
// If cancellable, we should ideally wrap the execution or check signal
if (options.cancellable && context.signal) {
if (context.signal.aborted) {
throw new Error('Operation cancelled');
}
// Add abort listener
context.signal.addEventListener('abort', () => {
logger.info(`[Billable] Job ${context?.jobId} aborted via signal`);
});
}
result = await originalMethod.apply(this, args);
return result;
} catch (err) {
error = err as Error;
throw err;
} finally {
// 5. End tracking
if (usageId) {
const endTime = Date.now();
await updateUsageRecord({
usageId,
responseStatus: error ? 500 : 200,
responseTimeMs: endTime - startTime,
error
});
}
}
};
return descriptor;
};
}
/**
* Class Decorator: Registers the worker queue name
*/
export function Worker(queueName: string) {
return function <T extends { new(...args: any[]): any }>(constructor: T) {
// We can't easily access the instance method 'handler' here without instantiating
// So we assume the class has a 'handler' method or we register the class itself
// For simplicity, let's assume we'll instantiate it later or the registry handles it.
// But wait, pg-boss needs a function.
// Let's store the constructor in the registry, and the registry (or bootstrap) will instantiate and bind.
// Actually, let's just attach the queue name to the class for now,
// and let a separate scanner or manual registration use it.
// OR, we can register a factory.
// Better approach for now: Register the prototype's handler if it exists.
// But 'handler' is on the instance usually.
// Let's just modify the class to have a static 'queueName' property
// and register it.
(constructor as any).queueName = queueName;
};
}

View File

@ -0,0 +1,34 @@
import pino from 'pino';
import path from 'path';
const fileTransport = pino.transport({
target: 'pino/file',
options: { destination: path.join(process.cwd(), 'app.log') },
});
const consoleTransport = pino.transport({
target: 'pino-pretty',
options: {
colorize: true,
ignore: 'pid,hostname',
destination: 1,
},
});
export const logger = pino(
{
level: process.env.PINO_LOG_LEVEL || 'info',
formatters: {
level: (label) => {
return { level: label.toUpperCase() };
},
},
timestamp: pino.stdTimeFunctions.isoTime,
},
pino.multistream([
{ stream: fileTransport, level: 'info' },
{ stream: consoleTransport, level: 'info' },
])
);
export default logger;

View File

@ -0,0 +1,174 @@
import { ProductActionConfig, PRODUCT_ACTIONS } from '../config/products.js';
export interface BillableFunctionConfig extends ProductActionConfig {
productId: string;
actionId: string;
}
/**
* Central registry for all billable functions
* Manages configuration, costs, and metadata
*/
export class FunctionRegistry {
private static registry = new Map<string, BillableFunctionConfig>();
private static initialized = false;
/**
* Initialize the registry with default configurations
*/
static initialize() {
if (this.initialized) return;
// Load legacy PRODUCT_ACTIONS
for (const [productId, actions] of Object.entries(PRODUCT_ACTIONS)) {
for (const [actionId, config] of Object.entries(actions)) {
this.register({
productId,
actionId,
...config
});
}
}
this.initialized = true;
}
/**
* Register a new billable function
*/
static register(config: BillableFunctionConfig) {
const key = this.getKey(config.productId, config.actionId);
this.registry.set(key, config);
}
/**
* Get configuration for a specific function
*/
static get(productId: string, actionId: string): BillableFunctionConfig | null {
if (!this.initialized) this.initialize();
const key = this.getKey(productId, actionId);
return this.registry.get(key) || null;
}
/**
* Get all registered functions
*/
static getAll(): BillableFunctionConfig[] {
if (!this.initialized) this.initialize();
return Array.from(this.registry.values());
}
/**
* Find a configuration by matching route endpoint and method
* (Used for middleware backward compatibility)
*/
static findByRoute(path: string, method: string): BillableFunctionConfig | null {
if (!this.initialized) this.initialize();
for (const config of this.registry.values()) {
if (this.matchesRoute(path, config.endpoint) && method === config.method) {
return config;
}
}
return null;
}
private static getKey(productId: string, actionId: string): string {
return `${productId}:${actionId}`;
}
private static matchesRoute(path: string, pattern: string): boolean {
// Convert pattern to regex
// Handle both :param (Express/Hono style) and {param} (OpenAPI style)
// e.g., '/api/competitors/:place_id' or '/api/competitors/{place_id}' -> /^\/api\/competitors\/[^\/]+$/
const regexPattern = pattern
.replace(/:[^\\/]+/g, '[^/]+') // Replace :param with regex
.replace(/\{[^}]+\}/g, '[^/]+') // Replace {param} with regex
.replace(/\//g, '\\/');
// Allow optional trailing slash
const regex = new RegExp(`^${regexPattern}\\/?$`);
return regex.test(path);
}
}
/**
* Registry for public endpoints that don't require authentication
*/
export class PublicEndpointRegistry {
private static registry = new Set<string>();
static register(path: string, method: string) {
this.registry.add(`${method.toUpperCase()}:${path}`);
}
static isPublic(path: string, method: string): boolean {
const methodUpper = method.toUpperCase();
for (const registered of this.registry) {
const [regMethod, regPath] = registered.split(':');
if (regMethod !== methodUpper) continue;
// Check if path matches pattern
if (this.matchesRoute(path, regPath)) {
return true;
}
}
return false;
}
private static matchesRoute(path: string, pattern: string): boolean {
// Convert pattern to regex
// Handle both :param (Express/Hono style) and {param} (OpenAPI style)
// e.g., '/api/competitors/:place_id' or '/api/competitors/{place_id}' -> /^\/api\/competitors\/[^\/]+$/
const regexPattern = pattern
.replace(/:[^\\/]+/g, '[^/]+') // Replace :param with regex
.replace(/\{[^}]+\}/g, '[^/]+') // Replace {param} with regex
.replace(/\//g, '\\/');
// Allow optional trailing slash
const regex = new RegExp(`^${regexPattern}\\/?$`);
return regex.test(path);
}
}
/**
* Registry for admin-only endpoints
*/
export class AdminEndpointRegistry {
private static registry = new Set<string>();
static register(path: string, method: string) {
this.registry.add(`${method.toUpperCase()}:${path}`);
}
static isAdmin(path: string, method: string): boolean {
const methodUpper = method.toUpperCase();
for (const registered of this.registry) {
const [regMethod, regPath] = registered.split(':');
if (regMethod !== methodUpper) continue;
// Check if path matches pattern
if (this.matchesRoute(path, regPath)) {
return true;
}
}
return false;
}
private static matchesRoute(path: string, pattern: string): boolean {
// Convert pattern to regex
// Handle both :param (Express/Hono style) and {param} (OpenAPI style)
// e.g., '/api/competitors/:place_id' or '/api/competitors/{place_id}' -> /^\/api\/competitors\/[^\/]+$/
const regexPattern = pattern
.replace(/:[^\\/]+/g, '[^/]+') // Replace :param with regex
.replace(/\{[^}]+\}/g, '[^/]+') // Replace {param} with regex
.replace(/\//g, '\\/');
const regex = new RegExp(`^${regexPattern}\\/?$`);
return regex.test(path);
}
}

View File

@ -0,0 +1,26 @@
import { createClient } from '@supabase/supabase-js'
import 'dotenv/config'
const supabaseUrl = process.env.SUPABASE_URL
const supabaseKey = process.env.SUPABASE_SERVICE_KEY
import { logger } from './logger.js'
if (!supabaseUrl || !supabaseKey) {
logger.fatal('Missing Supabase environment variables')
process.exit(1)
}
export const supabase = createClient(supabaseUrl, supabaseKey)
/**
* Test Supabase connection by attempting a simple query
*/
export async function testSupabaseConnection(): Promise<boolean> {
try {
const { error } = await supabase.from('products').select('id').limit(1)
return !error
} catch {
return false
}
}

View File

@ -0,0 +1,10 @@
import { Env } from 'hono'
export interface HonoEnv extends Env {
Variables: {
jobId?: string;
userId?: string;
usageId?: string;
skipUsageStatusUpdate?: boolean;
}
}

View File

@ -0,0 +1,26 @@
import { createRoute } from '@hono/zod-openapi'
import { StatsSchema } from '../schemas/index.js'
import type { Context } from 'hono'
import { Admin } from '../commons/decorators.js'
export const getStatsRoute = Admin(createRoute({
method: 'get',
path: '/api/admin/stats',
responses: {
200: {
content: {
'application/json': {
schema: StatsSchema,
},
},
description: 'Retrieve admin stats',
},
},
}))
export function getStatsHandler(c: Context) {
return c.json({
users: 100,
revenue: 5000,
}, 200)
}

View File

@ -0,0 +1,304 @@
import { createRoute, OpenAPIHono, z } from '@hono/zod-openapi';
import { RouteHandler } from '@hono/zod-openapi';
import { boss } from '../jobs/boss/client.js';
import { QUEUE_MOCK_JOB } from '../jobs/boss/workers.js';
import { HonoEnv } from '../commons/types.js';
const tags = ['PgBoss'];
export const postBossJobRoute = createRoute({
method: 'post',
path: '/api/boss/job',
tags,
request: {
body: {
content: {
'application/json': {
schema: z.object({
delayMs: z.number().default(100),
shouldFail: z.boolean().default(false),
retryLimit: z.number().optional()
}),
},
},
},
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({
jobId: z.string().nullable(),
message: z.string()
}),
},
},
description: 'PgBoss job started',
},
500: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Server error',
},
},
});
export const postBossJobHandler: RouteHandler<typeof postBossJobRoute, HonoEnv> = async (c) => {
if (!boss) {
// Check if there was an initialization error we can report
const { bossInitError } = await import('../jobs/boss/client.js');
return c.json({ error: `PgBoss not initialized. Init error: ${bossInitError}` }, 500);
}
const { delayMs, shouldFail, retryLimit } = c.req.valid('json');
const payload = { delayMs, shouldFail };
const options = retryLimit !== undefined ? { retryLimit } : {};
try {
const jobId = await boss.send(QUEUE_MOCK_JOB, payload, options);
return c.json({ jobId, message: 'Job submitted to PgBoss' }, 200);
} catch (error: any) {
return c.json({ error: error.message }, 500);
}
};
export const getBossJobRoute = createRoute({
method: 'get',
path: '/api/boss/job/{id}',
tags,
request: {
params: z.object({
id: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({
id: z.string(),
name: z.string(),
data: z.any(),
state: z.string(),
createdOn: z.string().optional(),
startedOn: z.string().optional(),
completedOn: z.string().optional(),
}),
},
},
description: 'Job status',
},
404: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Job not found',
},
500: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Server error',
},
},
});
export const getBossJobHandler: RouteHandler<typeof getBossJobRoute, HonoEnv> = async (c) => {
const { id } = c.req.valid('param');
// Use pg directly to bypass PostgREST schema permissions
const { Client } = await import('pg');
const client = new Client({ connectionString: process.env.DATABASE_URL, });
try {
await client.connect();
const result = await client.query('SELECT * FROM pgboss.job WHERE id = $1', [id]);
const job = result.rows[0];
if (!job) {
return c.json({ error: 'Job not found' }, 404);
}
return c.json({
id: job.id,
name: job.name,
data: job.data,
state: job.state,
createdOn: job.createdon,
startedOn: job.startedon,
completedOn: job.completedon,
output: job.output,
}, 200);
} catch (error: any) {
console.error('Error in getBossJobHandler:', error);
return c.json({ error: error.message }, 500);
} finally {
await client.end().catch(() => { });
}
};
export const cancelBossJobRoute = createRoute({
method: 'post',
path: '/api/boss/job/{id}/cancel',
tags,
request: {
params: z.object({
id: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({ message: z.string() }),
},
},
description: 'Job cancelled',
},
500: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Server error',
},
},
});
export const cancelBossJobHandler: RouteHandler<typeof cancelBossJobRoute, HonoEnv> = async (c) => {
if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500);
const { id } = c.req.valid('param');
try {
await boss.cancel(QUEUE_MOCK_JOB, id);
return c.json({ message: 'Job cancelled' }, 200);
} catch (error: any) {
return c.json({ error: error.message }, 500);
}
};
export const resumeBossJobRoute = createRoute({
method: 'post',
path: '/api/boss/job/{id}/resume',
tags,
request: {
params: z.object({
id: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({ message: z.string() }),
},
},
description: 'Job resumed',
},
500: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Server error',
},
},
});
export const resumeBossJobHandler: RouteHandler<typeof resumeBossJobRoute, HonoEnv> = async (c) => {
if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500);
const { id } = c.req.valid('param');
try {
await boss.resume(QUEUE_MOCK_JOB, id);
return c.json({ message: 'Job resumed' }, 200);
} catch (error: any) {
return c.json({ error: error.message }, 500);
}
};
export const completeBossJobRoute = createRoute({
method: 'post',
path: '/api/boss/job/{id}/complete',
tags,
request: {
params: z.object({
id: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({ message: z.string() }),
},
},
description: 'Job completed',
},
500: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Server error',
},
},
});
export const completeBossJobHandler: RouteHandler<typeof completeBossJobRoute, HonoEnv> = async (c) => {
if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500);
const { id } = c.req.valid('param');
try {
await boss.complete(QUEUE_MOCK_JOB, id);
return c.json({ message: 'Job completed' }, 200);
} catch (error: any) {
return c.json({ error: error.message }, 500);
}
};
export const failBossJobRoute = createRoute({
method: 'post',
path: '/api/boss/job/{id}/fail',
tags,
request: {
params: z.object({
id: z.string(),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: z.object({ message: z.string() }),
},
},
description: 'Job failed',
},
500: {
content: {
'application/json': {
schema: z.object({ error: z.string() }),
},
},
description: 'Server error',
},
},
});
export const failBossJobHandler: RouteHandler<typeof failBossJobRoute, HonoEnv> = async (c) => {
if (!boss) return c.json({ error: 'PgBoss not initialized' }, 500);
const { id } = c.req.valid('param');
try {
await boss.fail(QUEUE_MOCK_JOB, id);
return c.json({ message: 'Job failed' }, 200);
} catch (error: any) {
return c.json({ error: error.message }, 500);
}
};

View File

@ -0,0 +1,21 @@
import { CompetitorFull, LocationType } from '@polymech/shared'
import { supabase } from '../../commons/supabase.js'
/**
* Shared utility to fetch location details by place_id
* Used by both competitor details and find email endpoints
*/
export async function getLocationByPlaceId(place_id: string): Promise<LocationType | null> {
const { data: location, error } = await supabase
.from('locations')
.select('*')
.eq('place_id', place_id)
.single()
if (error || !location) {
return null
}
return location
}

View File

@ -0,0 +1,233 @@
import { createRoute, RouteHandler, z } from '@hono/zod-openapi'
import { ErrorSchema } from '../../schemas/index.js'
import { supabase } from '../../commons/supabase.js'
import { getLocationByPlaceId } from '../competitors/getLocationByPlaceId.js'
import { logger } from '../../commons/logger.js'
import { findEmailEach, parseHtml, Page, LocationSiteMeta } from '@polymech/search'
const tags = ['Find']
// Email metadata schema
const EmailMetadataSchema = z.object({
email: z.string().email(),
source: z.string().url(),
foundAt: z.string().datetime(),
tool: z.string()
})
// Response schema
const FindEmailResponseSchema = z.object({
job_id: z.string().nullable(),
place_id: z.string(),
emails: z.array(EmailMetadataSchema),
status: z.enum(['cached', 'processing', 'completed', 'failed']),
message: z.string()
})
export const getFindEmailRoute = createRoute({
method: 'get',
path: '/api/find/email/{place_id}',
tags,
request: {
params: z.object({
place_id: z.string(),
cache: z.boolean().optional().default(false),
}),
},
responses: {
200: {
content: {
'application/json': {
schema: FindEmailResponseSchema,
},
},
description: 'Email finding results',
},
401: {
content: {
'application/json': {
schema: ErrorSchema,
},
},
description: 'Unauthorized',
},
404: {
content: {
'application/json': {
schema: ErrorSchema,
},
},
description: 'Location not found',
},
429: {
content: {
'application/json': {
schema: ErrorSchema,
},
},
description: 'Too many active jobs',
},
500: {
content: {
'application/json': {
schema: ErrorSchema,
},
},
description: 'Server error',
},
},
})
export const getFindEmailHandler: any = async (c: any) => {
const { place_id, cache } = c.req.valid('param')
const jobId = c.get('jobId') as string | undefined
const userId = c.get('userId') as string | undefined
if (!userId) {
return c.json({ error: 'Unauthorized' }, 401)
}
logger.info(`[FindEmail] Starting email finding for ${place_id} - User: ${userId} - JobId: ${jobId}`)
try {
// 1. Fetch location details
const location = await getLocationByPlaceId(place_id)
if (!location) {
return c.json({ error: 'Location not found' }, 404)
}
// 2. Check for existing emails in metadata
const existingEmails = location.meta?.emails || []
if (existingEmails.length > 0 && !cache) {
logger.info(`[FindEmail] Returning cached emails for ${place_id}`)
return c.json({
job_id: jobId || null,
place_id,
emails: existingEmails,
status: 'cached' as const,
message: 'Emails already found (cached)'
}, 200)
}
// 3. Check if location has a website
if (!location.website) {
return c.json({
job_id: jobId || null,
place_id,
emails: [],
status: 'completed' as const,
message: 'No website available for this location'
}, 200)
}
// 4. Get timeout from environment (already fetched above)
// 5. Ensure Meta Data
if (!location.meta || !location.meta.pages || !cache) {
logger.info(`[FindEmail] Meta missing for ${place_id}, fetching...`)
try {
const meta = await parseHtml(location.website, null, {
headless: true,
timeout: 1000 * 60
})
if (meta) {
location.meta = { ...location.meta, ...meta } as any
// Update DB with initial meta
const { error: updateError } = await supabase.from('locations').update({ meta: location.meta }).eq('place_id', place_id)
if (updateError) {
logger.error({ err: updateError }, `[FindEmail] Error updating initial meta for ${place_id}`)
}
}
} catch (err) {
logger.error({ err }, `[FindEmail] Error fetching meta for ${place_id}`)
}
}
// 6. Execute Iterative Scraping
logger.info(`[FindEmail] Starting iterative email search for ${place_id}`)
const foundEmails: string[] = []
let isTimedOut = false
const timeoutMs = 60 * 1000
const timeoutPromise = new Promise<void>((_, reject) => {
setTimeout(() => {
isTimedOut = true
reject(new Error('Timeout'))
}, timeoutMs)
})
const searchPromise = findEmailEach(location, {
headless: true,
searchFrom: 'api',
}, async (page: Page) => {
if (isTimedOut) return
// Progress Callback: Update DB
logger.info(`[FindEmail] Scraped ${page.url}, status: ${page.status}`)
const { error: progressError } = await supabase.from('locations').update({
meta: location.meta
}).eq('place_id', place_id)
if (progressError) {
logger.error({ err: progressError }, `[FindEmail] Error updating progress meta for ${place_id}`)
}
})
try {
await Promise.race([searchPromise, timeoutPromise])
} catch (error: any) {
if (error.message === 'Timeout') {
logger.warn(`[FindEmail] Timeout exceeded for ${place_id}`)
} else {
throw error
}
}
const emails = (await searchPromise) || []
// Format emails
const emailsWithMetadata = emails.map((email) => ({
email,
source: location.website || '',
foundAt: new Date().toISOString(),
tool: 'puppeteer'
}))
// Merge and Update
const allEmails = [...existingEmails, ...emailsWithMetadata]
// Deduplicate
const uniqueEmails = Array.from(new Map(allEmails.map(item => [item.email, item])).values());
const { error: finalUpdateError } = await supabase
.from('locations')
.update({
meta: {
...location.meta,
emails: uniqueEmails
},
updated_at: new Date().toISOString()
})
.eq('place_id', place_id)
if (finalUpdateError) {
logger.error({ err: finalUpdateError }, `[FindEmail] Error saving emails for ${place_id}`)
}
return c.json({
job_id: jobId || null,
place_id,
emails: uniqueEmails,
status: isTimedOut ? 'completed' : 'completed',
message: isTimedOut ? 'Search timed out, returning partial results' : `Found ${emails.length} new email(s)`
}, 200)
} catch (error: any) {
logger.error({ err: error }, '[FindEmail] Error')
return c.json({ error: error.message || 'Internal Server Error' }, 500)
}
}

View File

@ -0,0 +1,126 @@
import { createRoute, z } from '@hono/zod-openapi'
import { ImageSchema, ImageResponseSchema, ErrorSchema } from '../../schemas/index.js'
import type { Context } from 'hono'
const tags = ['Images']
export const getImagesRoute = createRoute({
method: 'get',
path: '/api/images/get',
tags,
responses: {
200: {
content: {
'application/json': {
schema: ImageResponseSchema,
},
},
description: 'Retrieve images',
},
500: {
content: {
'application/json': {
schema: ErrorSchema,
},
},
description: 'Server error',
},
},
})
export const postImagesRoute = createRoute({
method: 'post',
path: '/api/images/post',
tags,
responses: {
200: {
content: {
'application/json': {
schema: ImageResponseSchema,
},
},
description: 'Create image',
},
},
})
export const putImagesRoute = createRoute({
method: 'put',
path: '/api/images/put',
tags,
responses: {
200: {
content: {
'application/json': {
schema: ImageResponseSchema,
},
},
description: 'Put image',
},
},
})
export const updateImagesRoute = createRoute({
method: 'patch', // Using patch for update as is common, though user said 'update' which isn't a method.
// Wait, user said endpoints: /api/images/get,put,post,update
// 'update' is likely the path, not the method.
// I will assume POST or PATCH to /api/images/update. Let's use POST for 'update' path to be safe or PATCH.
// Actually, let's stick to standard methods for the paths if possible, but the paths are explicit.
// Let's use PATCH method for /api/images/update path.
path: '/api/images/update',
tags,
responses: {
200: {
content: {
'application/json': {
schema: ImageResponseSchema,
},
},
description: 'Update image',
},
},
})
// Stubbed data
const stubbedImages = [
{
idx: 0,
id: 6,
name: "images",
slug: "images",
description: "fcghdfgh",
price: "10.00",
variants: "[]",
created_at: "2025-11-22 10:46:09.77718+00",
updated_at: "2025-11-22 10:46:09.77718+00"
}
]
export async function getImagesHandler(c: Context) {
return c.json({
message: 'Get images success',
data: stubbedImages
}, 200)
}
export async function postImagesHandler(c: Context) {
return c.json({
message: 'Post images success',
data: stubbedImages
}, 200)
}
export async function putImagesHandler(c: Context) {
return c.json({
message: 'Put images success',
data: stubbedImages
}, 200)
}
export async function updateImagesHandler(c: Context) {
return c.json({
message: 'Update images success',
data: stubbedImages
}, 200)
}

View File

@ -0,0 +1,39 @@
import { createRoute, z } from '@hono/zod-openapi'
import { ProductSchema, ErrorSchema } from '../schemas/index.js'
import { supabase } from '../commons/supabase.js'
import type { Context } from 'hono'
export const getProductsRoute = createRoute({
method: 'get',
path: '/api/products',
responses: {
200: {
content: {
'application/json': {
schema: z.array(ProductSchema),
},
},
description: 'Retrieve all products',
},
500: {
content: {
'application/json': {
schema: ErrorSchema,
},
},
description: 'Server error',
},
},
})
export async function getProductsHandler(c: Context) {
const { data, error } = await supabase
.from('products')
.select('*')
if (error) {
return c.json({ error: error.message }, 500)
}
return c.json(data, 200)
}

View File

@ -0,0 +1,25 @@
import { createRoute, z } from '@hono/zod-openapi'
import { SubscriptionSchema } from '../schemas/index.js'
import type { Context } from 'hono'
export const getSubscriptionsRoute = createRoute({
method: 'get',
path: '/api/subscriptions',
responses: {
200: {
content: {
'application/json': {
schema: z.array(SubscriptionSchema),
},
},
description: 'Retrieve subscription plans',
},
},
})
export function getSubscriptionsHandler(c: Context) {
return c.json([
{ id: 1, name: 'Basic', price: 10 },
{ id: 2, name: 'Pro', price: 20 },
], 200)
}

View File

@ -0,0 +1,63 @@
import { Job, PgBoss } from 'pg-boss';
import EventEmitter from 'events';
import { logger } from '../../commons/logger.js';
import { EventBus } from '../../products/EventBus.js';
export abstract class AbstractWorker<TData> {
abstract readonly queueName: string;
readonly queueOptions?: any; // pg-boss QueueOptions
protected emitter?: EventEmitter;
public boss?: PgBoss;
readonly teamSize?: number;
// Cost calculation can be static or dynamic based on results
abstract calculateCost(job: Job<TData>, result?: any): number;
// The core business logic
protected abstract process(job: Job<TData>): Promise<any>;
// Main entry point for pg-boss
public async handler(jobOrJobs: Job<TData> | Job<TData>[]) {
const job = Array.isArray(jobOrJobs) ? jobOrJobs[0] : jobOrJobs;
// Safety check
if (!job) {
logger.error(`[${this.queueName}] Received null or empty job`);
return;
}
const jobId = job.id;
logger.info(`[${this.queueName}] Starting job ${jobId}`);
try {
// 2. Execute Business Logic
const result = await this.process(job);
// 3. Calculate Cost
const cost = this.calculateCost(job, result);
// Emit completion on global EventBus
EventBus.emit('job:complete', {
jobId,
result,
data: job.data
});
return result;
} catch (error: any) {
logger.error({ err: error }, `[${this.queueName}] Job failed`);
EventBus.emit('job:failed', {
jobId,
error: error.message || 'Unknown error',
data: job.data
});
throw error; // Let pg-boss handle retry/failure
}
}
}

View File

@ -0,0 +1,44 @@
import { PgBoss } from 'pg-boss';
import { logger } from '../../commons/logger.js';
const connectionString = process.env.DATABASE_URL;
if (!connectionString) {
logger.warn('DATABASE_URL not found, PgBoss will not be initialized');
}
export const boss = connectionString ? new PgBoss({
connectionString,
__test__enableSpies: true
} as any) : null;
export let bossInitError: Error | null = null;
export async function startBoss() {
if (!boss) return;
boss.on('error', (error: Error) => logger.error({ error }, 'PgBoss error'));
try {
await boss.start();
logger.info('PgBoss started');
return boss;
} catch (error: any) {
bossInitError = error;
logger.error({ error }, 'Failed to start PgBoss');
const fs = await import('fs');
fs.writeFileSync('debug_pgboss_error.txt', JSON.stringify(error, Object.getOwnPropertyNames(error)));
}
}
export async function stopBoss() {
if (!boss) {
console.info('PgBoss not initialized, skipping stop.')
return
}
try {
await boss.stop({ timeout: 5000 }); // 5s timeout
console.info('PgBoss stopped');
} catch (error) {
console.error({ error }, 'Failed to stop PgBoss');
}
}

View File

@ -0,0 +1,25 @@
import { Job } from 'pg-boss';
type WorkerHandler = (job: Job<any>) => Promise<any>;
interface WorkerConfig {
queueName: string;
handler: WorkerHandler;
options?: any;
}
export class WorkerRegistry {
private static workers: Map<string, WorkerConfig> = new Map();
static register(queueName: string, handler: WorkerHandler, options?: any) {
this.workers.set(queueName, { queueName, handler, options });
}
static get(queueName: string): WorkerConfig | undefined {
return this.workers.get(queueName);
}
static getAll(): WorkerConfig[] {
return Array.from(this.workers.values());
}
}

View File

@ -0,0 +1,127 @@
import { Job } from 'pg-boss';
import { AbstractWorker } from '../AbstractWorker.js';
import { googleMaps, ResolveFlags } from '@polymech/search';
import { supabase } from '../../../commons/supabase.js';
import { logger } from '../../../commons/logger.js';
import { Worker } from '../../../commons/decorators.js';
import { error } from 'console';
export interface SearchJobData {
query: string;
location: string;
filters?: {
filterCity?: string;
filterContinent?: string;
filterType?: string;
concurrency?: number;
};
userId: string;
usageId?: string;
}
@Worker('search-worker')
export class SearchWorker extends AbstractWorker<SearchJobData> {
readonly queueName = 'search-worker';
calculateCost(job: Job<SearchJobData>, result: any): number {
// Example: 1 credit per search + 0.1 per result
return 1 + (result?.length || 0) * 0.1;
}
protected async process(job: Job<SearchJobData>) {
const { query, location, filters, userId } = job.data;
// Call existing logic (refactored from endpoints/competitors/index.ts)
const results = await googleMaps({
query,
searchFrom: location,
resolve: [ResolveFlags.PHOTOS],
filterCity: filters?.filterCity,
filterContinent: filters?.filterContinent,
filterType: filters?.filterType,
concurrency: filters?.concurrency || 5
});
// Flatten results
const flatResults = results ? results.flat(Infinity) : [];
// Map and Upsert Locations
const locationsToUpsert = flatResults
.filter((r: any) => r.place_id)
.map((r: any) => ({
place_id: r.place_id,
title: r.title,
description: r.description,
address: r.address,
gps_coordinates: r.gps_coordinates,
phone: r.phone,
website: r.website,
operating_hours: r.operating_hours,
thumbnail: r.thumbnail,
types: r.types,
raw_data: r,
continent: r.geo?.continent,
country: r.geo?.countryName,
city: r.geo?.city,
updated_at: new Date().toISOString(), // Update timestamp
user_id: userId
}));
// Fetch existing locations to preserve meta (emails)
const placeIds = locationsToUpsert.map(l => l.place_id);
if (placeIds.length > 0) {
const { data: existingLocations } = await supabase
.from('locations')
.select('place_id, meta')
.in('place_id', placeIds);
if (existingLocations) {
const metaMap = new Map(existingLocations.map(l => [l.place_id, l.meta]));
locationsToUpsert.forEach(l => {
const existingMeta = metaMap.get(l.place_id);
if (existingMeta) {
// Merge existing meta into raw_data for the client
l.raw_data.meta = {
...(l.raw_data.meta || {}),
...existingMeta
};
}
});
}
}
if (locationsToUpsert.length > 0) {
const { error: upsertError } = await supabase
.from('locations')
.upsert(locationsToUpsert, { onConflict: 'place_id' });
if (upsertError) {
logger.error(upsertError, 'Error upserting locations');
throw upsertError;
}
}
// Store Search (for caching)
// Re-create hash logic from handler
const { createHash } = await import('crypto');
const inputParams = { query, location };
const normalizedInput = JSON.stringify(inputParams, Object.keys(inputParams).sort());
const inputHash = createHash('sha256').update(normalizedInput).digest('hex');
const { error: searchStoreError } = await supabase
.from('searches')
.upsert({
input_hash: inputHash,
input_params: inputParams,
result_place_ids: placeIds,
created_at: new Date().toISOString()
}, { onConflict: 'input_hash' });
if (searchStoreError) {
logger.error(searchStoreError, `Error storing search ${searchStoreError.message}`);
// Don't fail the job just because caching failed
}
return { count: locationsToUpsert.length, placeIds };
}
}

View File

@ -0,0 +1,40 @@
import { boss } from './client.js';
import { getAllWorkers } from '@/products/registry.js';
import { logger } from '@/commons/logger.js';
export const QUEUE_MOCK_JOB = 'mock-job';
interface MockJobData {
subtasks: number;
delayMs: number;
shouldFail: boolean;
}
export async function registerMockWorkers() {
if (!boss) return;
// Product workers are now registered by the products themselves in AbstractProduct.start()
await boss.createQueue(QUEUE_MOCK_JOB);
await boss.work<MockJobData>(QUEUE_MOCK_JOB, async (jobs: any) => {
// PgBoss might pass an array of jobs or a single job depending on config/version
const job = Array.isArray(jobs) ? jobs[0] : jobs;
const data = job.data || {};
const { delayMs = 100, shouldFail = false } = data;
const jobId = job.id;
logger.info({ jobId, data }, 'Processing PgBoss mock job');
await new Promise(resolve => setTimeout(resolve, delayMs));
if (shouldFail) {
throw new Error('Simulated PgBoss job failure');
}
logger.info({ jobId }, 'PgBoss mock job completed');
return { success: true };
});
logger.info('PgBoss workers registered');
}