From 241b7a87129eb1a8ba0601dc7f3bb88cc9415405 Mon Sep 17 00:00:00 2001 From: babayaga Date: Fri, 26 Dec 2025 11:28:43 +0100 Subject: [PATCH] email worker --- packages/search/dist-in/lib/email.js | 51 ++++++++----- packages/search/dist-in/lib/pupeteer.d.ts | 2 +- packages/search/dist-in/lib/pupeteer.js | 90 +++++++++++++++-------- packages/search/package-lock.json | 28 +++++++ packages/search/package.json | 1 + packages/search/src/lib/email.ts | 56 +++++++++----- packages/search/src/lib/pupeteer.ts | 89 ++++++++++++++-------- 7 files changed, 220 insertions(+), 97 deletions(-) diff --git a/packages/search/dist-in/lib/email.js b/packages/search/dist-in/lib/email.js index 1058f45e..240f63eb 100644 --- a/packages/search/dist-in/lib/email.js +++ b/packages/search/dist-in/lib/email.js @@ -1,4 +1,5 @@ import { logger } from '../index.js'; +import pMap from 'p-map'; import { CheerioWebBaseLoader } from "langchain/document_loaders/web/cheerio"; import { RecursiveCharacterTextSplitter } from "langchain/text_splitter"; import { htmlToText } from "html-to-text"; @@ -49,19 +50,18 @@ export const puppeteerLoader = async (url, headless, location, checkCancelled) = try { // Function to detect a valid URL loaderWithOptions = new loader(url, { launchOptions: { - headless, + headless: headless, ignoreHTTPSErrors: true }, gotoOptions: { - timeout: 15000, + timeout: location.pageTimeout || 15000, waitUntil: "networkidle0", }, async evaluate(page, browser) { if (checkCancelled && await checkCancelled()) { - debugger; const pid = browser.process()?.pid; - logger.warn(`Killing browser process ${pid} due to cancellation`); - await browser.close(); + logger.warn(`Cancellation requested inside evaluate for process ${pid}`); + // Do not close browser, it is shared. Page will be closed by finally block in pupeteer.ts throw new Error('CancelledByUser'); } const result = await page.evaluate(() => document.body.innerHTML); @@ -70,19 +70,23 @@ export const puppeteerLoader = async (url, headless, location, checkCancelled) = } }); // Race load against cancellation - const loadPromise = loaderWithOptions.load(); + let isFinished = false; + const loadPromise = loaderWithOptions.load().finally(() => { + isFinished = true; + }); const cancelPromise = new Promise(async (_, reject) => { if (!checkCancelled) return; // Poll for cancellation - while (true) { + while (!isFinished) { await new Promise(r => setTimeout(r, 1000)); + logger.info('Checking cancellation for ' + url); if (await checkCancelled()) { const browser = await getBrowser(); if (browser) { const pid = browser.process()?.pid; - logger.info(`Killing browser process ${pid} due to cancellation`); - await browser.close(); + logger.info(`Cancellation confirmed for process ${pid}`); + // await browser.close(); // Do not close shared browser } reject(new Error('CancelledByUser')); break; @@ -132,7 +136,7 @@ export const findEMail = async (question, url, opts, location) => { return false; } let pageUrl = url; - let docs = await puppeteerLoader(pageUrl, opts.headless, location, opts.checkCancelled); + let docs = await puppeteerLoader(pageUrl, opts.headless, { ...location, pageTimeout: opts.pageTimeout }, opts.checkCancelled); let emails = []; docs.forEach((d) => { if (d.pageContent && d.pageContent.indexOf('@') !== -1) { @@ -156,17 +160,27 @@ export const findEmailEach = async (location, opts, onProgress) => { } const emails = []; const abortAfter = opts.abortAfter ?? 1; - for (const page of location.meta.pages) { + const concurrency = opts.concurrency || 2; + const maxPages = opts.maxPages || 15; + const contactKeywords = ['contact', 'kontakt', 'contacto', 'contatto', 'info', 'imprint', 'impressum', 'help', 'support', 'about']; + // Sort pages: prioritize contact pages + const pagesToSearch = location.meta.pages.sort((a, b) => { + const urlA = a.url.toLowerCase(); + const urlB = b.url.toLowerCase(); + const scoreA = contactKeywords.some(k => urlA.includes(k)) ? 1 : 0; + const scoreB = contactKeywords.some(k => urlB.includes(k)) ? 1 : 0; + return scoreB - scoreA; // Descending order (contact pages first) + }).slice(0, maxPages); + await pMap(pagesToSearch, async (page) => { if (opts.checkCancelled && await opts.checkCancelled()) { - debugger; - logger.info(`[findEmailEach] Cancellation requested for ${location.title}`); - break; + // logger.info(`[findEmailEach] Cancellation requested for ${location.title}`); + return; } if (emails.length >= abortAfter) { - break; + return; } if (page.status !== 'PENDING') { - continue; + return; } page.status = 'SEARCHING_EMAIL'; try { @@ -179,7 +193,6 @@ export const findEmailEach = async (location, opts, onProgress) => { } catch (error) { if (error.message === 'CancelledByUser') { - debugger; throw error; } page.status = 'FAILED'; @@ -189,7 +202,7 @@ export const findEmailEach = async (location, opts, onProgress) => { if (onProgress) { await onProgress(page); } - } + }, { concurrency, stopOnError: false }); // Update location emails if (emails.length > 0) { const uniqueEmails = [...new Set([...(location.emails || []), ...emails])]; @@ -200,4 +213,4 @@ export const findEmailEach = async (location, opts, onProgress) => { } return emails; }; -//# sourceMappingURL=data:application/json;base64, \ No newline at end of file +//# sourceMappingURL=data:application/json;base64, \ No newline at end of file diff --git a/packages/search/dist-in/lib/pupeteer.d.ts b/packages/search/dist-in/lib/pupeteer.d.ts index abe01d54..0109509a 100644 --- a/packages/search/dist-in/lib/pupeteer.d.ts +++ b/packages/search/dist-in/lib/pupeteer.d.ts @@ -17,7 +17,7 @@ export type PuppeteerWebBaseLoaderOptions = { gotoOptions?: PuppeteerGotoOptions; evaluate?: PuppeteerEvaluate; }; -export declare const getBrowser: () => Browser; +export declare const getBrowser: () => Promise; export declare const getPage: (browser: Browser) => Promise; export declare class PuppeteerWebBaseLoader extends BaseDocumentLoader implements DocumentLoader { webPath: string; diff --git a/packages/search/dist-in/lib/pupeteer.js b/packages/search/dist-in/lib/pupeteer.js index d4457706..ad8630d2 100644 --- a/packages/search/dist-in/lib/pupeteer.js +++ b/packages/search/dist-in/lib/pupeteer.js @@ -1,5 +1,6 @@ import { Document } from "@langchain/core/documents"; import { BaseDocumentLoader } from "langchain/document_loaders/base"; +import pLimit from "p-limit"; /** * Class that extends the BaseDocumentLoader class and implements the * DocumentLoader interface. It represents a document loader for scraping @@ -17,25 +18,43 @@ import { BaseDocumentLoader } from "langchain/document_loaders/base"; * const screenshot = await loader.screenshot(); * ``` */ -let browser = null; -let page = null; -const launchBrowser = async (options) => { - if (browser) - return browser; - const { launch } = await PuppeteerWebBaseLoader.imports(); - browser = await launch({ - headless: true, - defaultViewport: null, - ignoreDefaultArgs: ["--disable-extensions"], - ...options?.launchOptions, - }); - return browser; +// Singleton browser promise to prevent race conditions +let browserPromise = null; +let idleTimer = null; +const limit = pLimit(parseInt(process.env.EMAIL_SEARCH_MAX_PUPETEER_PAGES || '10')); +const IDLE_TIMEOUT_SECONDS = parseInt(process.env.EMAIL_SEARCH_PUPETEER_IDLE_TIMEOUT_SECONDS || '60'); +const resetIdleTimer = () => { + if (idleTimer) + clearTimeout(idleTimer); + idleTimer = setTimeout(async () => { + if (browserPromise) { + console.log(`[Puppeteer] Browser idle timeout (${IDLE_TIMEOUT_SECONDS}s) reached, closing browser`); + const browser = await browserPromise; + await browser.close(); + browserPromise = null; + } + }, IDLE_TIMEOUT_SECONDS * 1000); }; -export const getBrowser = () => browser; +const launchBrowser = async (options) => { + resetIdleTimer(); + if (browserPromise) + return browserPromise; + browserPromise = (async () => { + const { launch } = await PuppeteerWebBaseLoader.imports(); + const b = await launch({ + headless: process.env.EMAIL_SEARCH_HEADLESS === 'false' ? false : true, + defaultViewport: null, + ignoreDefaultArgs: ["--disable-extensions"], + ...options?.launchOptions, + }); + return b; + })(); + return browserPromise; +}; +export const getBrowser = () => browserPromise; export const getPage = async (browser) => { - if (page) - return page; - page = await browser.newPage(); + // Always create a new page for concurrency + const page = await browser.newPage(); return page; }; export class PuppeteerWebBaseLoader extends BaseDocumentLoader { @@ -49,18 +68,31 @@ export class PuppeteerWebBaseLoader extends BaseDocumentLoader { static browser; static async _scrape(url, options) { const browser = await launchBrowser(options); - PuppeteerWebBaseLoader.browser = browser; - const page = await getPage(browser); - await page.goto(url, { - timeout: 5000, - waitUntil: "domcontentloaded", - ...options?.gotoOptions, + // PuppeteerWebBaseLoader.browser = browser // Static property usage is deprecated/incorrect with this pattern + return limit(async () => { + console.log(`[Puppeteer] Entering limit (Active: ${limit.activeCount}, Pending: ${limit.pendingCount}) for ${url}`); + try { + const page = await getPage(browser); + try { + await page.goto(url, { + timeout: 5000, + waitUntil: "domcontentloaded", + ...options?.gotoOptions, + }); + const bodyHTML = options?.evaluate + ? await options?.evaluate(page, browser) + : await page.evaluate(() => document.body.innerHTML); + return bodyHTML; + } + finally { + await page.close(); + } + } + finally { + console.log(`[Puppeteer] Exiting limit (Active: ${limit.activeCount}, Pending: ${limit.pendingCount}) for ${url}`); + resetIdleTimer(); + } }); - const bodyHTML = options?.evaluate - ? await options?.evaluate(page, browser) - : await page.evaluate(() => document.body.innerHTML); - //await browser.close() - return bodyHTML; } /** * Method that calls the _scrape method to perform the scraping of the web @@ -135,4 +167,4 @@ export class PuppeteerWebBaseLoader extends BaseDocumentLoader { } } } -//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoicHVwZXRlZXIuanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi8uLi9zcmMvbGliL3B1cGV0ZWVyLnRzIl0sIm5hbWVzIjpbXSwibWFwcGluZ3MiOiJBQVFBLE9BQU8sRUFBRSxRQUFRLEVBQUUsTUFBTSwyQkFBMkIsQ0FBQTtBQUNwRCxPQUFPLEVBQUUsa0JBQWtCLEVBQWtCLE1BQU0saUNBQWlDLENBQUE7QUF3QnBGOzs7Ozs7Ozs7Ozs7Ozs7O0dBZ0JHO0FBQ0gsSUFBSSxPQUFPLEdBQVksSUFBSSxDQUFBO0FBQzNCLElBQUksSUFBSSxHQUFTLElBQUksQ0FBQTtBQUVyQixNQUFNLGFBQWEsR0FBRyxLQUFLLEVBQUUsT0FBdUMsRUFBb0IsRUFBRTtJQUN4RixJQUFJLE9BQU87UUFBRSxPQUFPLE9BQU8sQ0FBQTtJQUMzQixNQUFNLEVBQUUsTUFBTSxFQUFFLEdBQUcsTUFBTSxzQkFBc0IsQ0FBQyxPQUFPLEVBQUUsQ0FBQztJQUMxRCxPQUFPLEdBQUcsTUFBTSxNQUFNLENBQUM7UUFDckIsUUFBUSxFQUFFLElBQUk7UUFDZCxlQUFlLEVBQUUsSUFBSTtRQUNyQixpQkFBaUIsRUFBRSxDQUFDLHNCQUFzQixDQUFDO1FBQzNDLEdBQUcsT0FBTyxFQUFFLGFBQWE7S0FDMUIsQ0FBQyxDQUFBO0lBQ0YsT0FBTyxPQUFPLENBQUE7QUFDaEIsQ0FBQyxDQUFBO0FBRUQsTUFBTSxDQUFDLE1BQU0sVUFBVSxHQUFHLEdBQUcsRUFBRSxDQUFDLE9BQU8sQ0FBQTtBQUN2QyxNQUFNLENBQUMsTUFBTSxPQUFPLEdBQUcsS0FBSyxFQUFFLE9BQWdCLEVBQUUsRUFBRTtJQUNoRCxJQUFHLElBQUk7UUFDTCxPQUFPLElBQUksQ0FBQTtJQUViLElBQUksR0FBRyxNQUFNLE9BQU8sQ0FBQyxPQUFPLEVBQUUsQ0FBQTtJQUM5QixPQUFPLElBQUksQ0FBQTtBQUNiLENBQUMsQ0FBQTtBQUVELE1BQU0sT0FBTyxzQkFDWCxTQUFRLGtCQUFrQjtJQUlQO0lBRm5CLE9BQU8sQ0FBNEM7SUFFbkQsWUFBbUIsT0FBZSxFQUFFLE9BQXVDO1FBQ3pFLEtBQUssRUFBRSxDQUFDO1FBRFMsWUFBTyxHQUFQLE9BQU8sQ0FBUTtRQUVoQyxJQUFJLENBQUMsT0FBTyxHQUFHLE9BQU8sSUFBSSxTQUFTLENBQUM7SUFDdEMsQ0FBQztJQUVELE1BQU0sQ0FBQyxPQUFPLENBQVU7SUFFeEIsTUFBTSxDQUFDLEtBQUssQ0FBQyxPQUFPLENBQ2xCLEdBQVcsRUFDWCxPQUF1QztRQUd2QyxNQUFNLE9BQU8sR0FBRyxNQUFNLGFBQWEsQ0FBQyxPQUFPLENBQUMsQ0FBQTtRQUU1QyxzQkFBc0IsQ0FBQyxPQUFPLEdBQUcsT0FBTyxDQUFBO1FBQ3hDLE1BQU0sSUFBSSxHQUFHLE1BQU0sT0FBTyxDQUFDLE9BQU8sQ0FBQyxDQUFBO1FBQ25DLE1BQU0sSUFBSSxDQUFDLElBQUksQ0FBQyxHQUFHLEVBQUU7WUFDbkIsT0FBTyxFQUFFLElBQUk7WUFDYixTQUFTLEVBQUUsa0JBQWtCO1lBQzdCLEdBQUcsT0FBTyxFQUFFLFdBQVc7U0FDeEIsQ0FBQyxDQUFDO1FBRUgsTUFBTSxRQUFRLEdBQUcsT0FBTyxFQUFFLFFBQVE7WUFDaEMsQ0FBQyxDQUFDLE1BQU0sT0FBTyxFQUFFLFFBQVEsQ0FBQyxJQUFJLEVBQUUsT0FBTyxDQUFDO1lBQ3hDLENBQUMsQ0FBQyxNQUFNLElBQUksQ0FBQyxRQUFRLENBQUMsR0FBRyxFQUFFLENBQUMsUUFBUSxDQUFDLElBQUksQ0FBQyxTQUFTLENBQUMsQ0FBQztRQUV2RCx1QkFBdUI7UUFFdkIsT0FBTyxRQUFRLENBQUE7SUFDakIsQ0FBQztJQUVEOzs7O09BSUc7SUFDSCxLQUFLLENBQUMsTUFBTTtRQUNWLE9BQU8sc0JBQXNCLENBQUMsT0FBTyxDQUFDLElBQUksQ0FBQyxPQUFPLEVBQUUsSUFBSSxDQUFDLE9BQU8sQ0FBQyxDQUFDO0lBQ3BFLENBQUM7SUFFRDs7OztPQUlHO0lBQ0gsS0FBSyxDQUFDLElBQUk7UUFDUixNQUFNLElBQUksR0FBRyxNQUFNLElBQUksQ0FBQyxNQUFNLEVBQUUsQ0FBQztRQUVqQyxNQUFNLFFBQVEsR0FBRyxFQUFFLE1BQU0sRUFBRSxJQUFJLENBQUMsT0FBTyxFQUFFLENBQUM7UUFDMUMsT0FBTyxDQUFDLElBQUksUUFBUSxDQUFDLEVBQUUsV0FBVyxFQUFFLElBQUksRUFBRSxRQUFRLEVBQUUsQ0FBQyxDQUFDLENBQUM7SUFDekQsQ0FBQztJQUVEOzs7Ozs7OztPQVFHO0lBQ0gsTUFBTSxDQUFDLEtBQUssQ0FBQyxXQUFXLENBQ3RCLEdBQVcsRUFDWCxPQUF1QztRQUV2QyxNQUFNLEVBQUUsTUFBTSxFQUFFLEdBQUcsTUFBTSxzQkFBc0IsQ0FBQyxPQUFPLEVBQUUsQ0FBQztRQUUxRCxNQUFNLE9BQU8sR0FBRyxNQUFNLE1BQU0sQ0FBQztZQUMzQixRQUFRLEVBQUUsSUFBSTtZQUNkLGVBQWUsRUFBRSxJQUFJO1lBQ3JCLElBQUksRUFBRSxDQUFDLGlCQUFpQixDQUFDO1lBQ3pCLGlCQUFpQixFQUFFLENBQUMsc0JBQXNCLENBQUM7WUFDM0MsR0FBRyxPQUFPLEVBQUUsYUFBYTtTQUMxQixDQUFDLENBQUM7UUFDSCxNQUFNLElBQUksR0FBRyxNQUFNLE9BQU8sQ0FBQyxPQUFPLEVBQUUsQ0FBQztRQUVyQyxNQUFNLElBQUksQ0FBQyxJQUFJLENBQUMsR0FBRyxFQUFFO1lBQ25CLE9BQU8sRUFBRSxNQUFNO1lBQ2YsU0FBUyxFQUFFLGtCQUFrQjtZQUM3QixHQUFHLE9BQU8sRUFBRSxXQUFXO1NBQ3hCLENBQUMsQ0FBQztRQUNILE1BQU0sVUFBVSxHQUFHLE1BQU0sSUFBSSxDQUFDLFVBQVUsRUFBRSxDQUFDO1FBQzNDLE1BQU0sTUFBTSxHQUFHLFVBQVUsQ0FBQyxRQUFRLENBQUMsUUFBUSxDQUFDLENBQUM7UUFDN0MsTUFBTSxRQUFRLEdBQUcsRUFBRSxNQUFNLEVBQUUsR0FBRyxFQUFFLENBQUM7UUFDakMsT0FBTyxJQUFJLFFBQVEsQ0FBQyxFQUFFLFdBQVcsRUFBRSxNQUFNLEVBQUUsUUFBUSxFQUFFLENBQUMsQ0FBQztJQUN6RCxDQUFDO0lBRUQ7Ozs7O09BS0c7SUFDSCxLQUFLLENBQUMsVUFBVTtRQUNkLE9BQU8sc0JBQXNCLENBQUMsV0FBVyxDQUFDLElBQUksQ0FBQyxPQUFPLEVBQUUsSUFBSSxDQUFDLE9BQU8sQ0FBQyxDQUFDO0lBQ3hFLENBQUM7SUFFRDs7OztPQUlHO0lBQ0gsTUFBTSxDQUFDLEtBQUssQ0FBQyxPQUFPO1FBR2xCLElBQUksQ0FBQztZQUNILDZEQUE2RDtZQUM3RCxNQUFNLEVBQUUsTUFBTSxFQUFFLEdBQUcsTUFBTSxNQUFNLENBQUMsV0FBVyxDQUFDLENBQUE7WUFDNUMsT0FBTyxFQUFFLE1BQU0sRUFBRSxDQUFDO1FBQ3BCLENBQUM7UUFBQyxPQUFPLENBQUMsRUFBRSxDQUFDO1lBQ1gsT0FBTyxDQUFDLEtBQUssQ0FBQyxDQUFDLENBQUMsQ0FBQztZQUNqQixNQUFNLElBQUksS0FBSyxDQUNiLDBFQUEwRSxDQUMzRSxDQUFDO1FBQ0osQ0FBQztJQUNILENBQUM7Q0FDRiJ9 \ No newline at end of file +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoicHVwZXRlZXIuanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi8uLi9zcmMvbGliL3B1cGV0ZWVyLnRzIl0sIm5hbWVzIjpbXSwibWFwcGluZ3MiOiJBQVFBLE9BQU8sRUFBRSxRQUFRLEVBQUUsTUFBTSwyQkFBMkIsQ0FBQTtBQUNwRCxPQUFPLEVBQUUsa0JBQWtCLEVBQWtCLE1BQU0saUNBQWlDLENBQUE7QUFDcEYsT0FBTyxNQUFNLE1BQU0sU0FBUyxDQUFBO0FBd0I1Qjs7Ozs7Ozs7Ozs7Ozs7OztHQWdCRztBQUNILHVEQUF1RDtBQUN2RCxJQUFJLGNBQWMsR0FBNEIsSUFBSSxDQUFDO0FBQ25ELElBQUksU0FBUyxHQUEwQixJQUFJLENBQUM7QUFDNUMsTUFBTSxLQUFLLEdBQUcsTUFBTSxDQUFDLFFBQVEsQ0FBQyxPQUFPLENBQUMsR0FBRyxDQUFDLCtCQUErQixJQUFJLElBQUksQ0FBQyxDQUFDLENBQUE7QUFDbkYsTUFBTSxvQkFBb0IsR0FBRyxRQUFRLENBQUMsT0FBTyxDQUFDLEdBQUcsQ0FBQywwQ0FBMEMsSUFBSSxJQUFJLENBQUMsQ0FBQztBQUV0RyxNQUFNLGNBQWMsR0FBRyxHQUFHLEVBQUU7SUFDMUIsSUFBSSxTQUFTO1FBQUUsWUFBWSxDQUFDLFNBQVMsQ0FBQyxDQUFDO0lBQ3ZDLFNBQVMsR0FBRyxVQUFVLENBQUMsS0FBSyxJQUFJLEVBQUU7UUFDaEMsSUFBSSxjQUFjLEVBQUUsQ0FBQztZQUNuQixPQUFPLENBQUMsR0FBRyxDQUFDLHFDQUFxQyxvQkFBb0IsNkJBQTZCLENBQUMsQ0FBQztZQUNwRyxNQUFNLE9BQU8sR0FBRyxNQUFNLGNBQWMsQ0FBQztZQUNyQyxNQUFNLE9BQU8sQ0FBQyxLQUFLLEVBQUUsQ0FBQztZQUN0QixjQUFjLEdBQUcsSUFBSSxDQUFDO1FBQ3hCLENBQUM7SUFDSCxDQUFDLEVBQUUsb0JBQW9CLEdBQUcsSUFBSSxDQUFDLENBQUM7QUFDbEMsQ0FBQyxDQUFBO0FBRUQsTUFBTSxhQUFhLEdBQUcsS0FBSyxFQUFFLE9BQXVDLEVBQW9CLEVBQUU7SUFDeEYsY0FBYyxFQUFFLENBQUM7SUFDakIsSUFBSSxjQUFjO1FBQUUsT0FBTyxjQUFjLENBQUM7SUFFMUMsY0FBYyxHQUFHLENBQUMsS0FBSyxJQUFJLEVBQUU7UUFDM0IsTUFBTSxFQUFFLE1BQU0sRUFBRSxHQUFHLE1BQU0sc0JBQXNCLENBQUMsT0FBTyxFQUFFLENBQUM7UUFDMUQsTUFBTSxDQUFDLEdBQUcsTUFBTSxNQUFNLENBQUM7WUFDckIsUUFBUSxFQUFFLE9BQU8sQ0FBQyxHQUFHLENBQUMscUJBQXFCLEtBQUssT0FBTyxDQUFDLENBQUMsQ0FBQyxLQUFLLENBQUMsQ0FBQyxDQUFDLElBQUk7WUFDdEUsZUFBZSxFQUFFLElBQUk7WUFDckIsaUJBQWlCLEVBQUUsQ0FBQyxzQkFBc0IsQ0FBQztZQUMzQyxHQUFHLE9BQU8sRUFBRSxhQUFhO1NBQzFCLENBQUMsQ0FBQztRQUNILE9BQU8sQ0FBQyxDQUFDO0lBQ1gsQ0FBQyxDQUFDLEVBQUUsQ0FBQztJQUVMLE9BQU8sY0FBYyxDQUFDO0FBQ3hCLENBQUMsQ0FBQTtBQUVELE1BQU0sQ0FBQyxNQUFNLFVBQVUsR0FBRyxHQUFHLEVBQUUsQ0FBQyxjQUFjLENBQUM7QUFDL0MsTUFBTSxDQUFDLE1BQU0sT0FBTyxHQUFHLEtBQUssRUFBRSxPQUFnQixFQUFFLEVBQUU7SUFDaEQsMkNBQTJDO0lBQzNDLE1BQU0sSUFBSSxHQUFHLE1BQU0sT0FBTyxDQUFDLE9BQU8sRUFBRSxDQUFBO0lBQ3BDLE9BQU8sSUFBSSxDQUFBO0FBQ2IsQ0FBQyxDQUFBO0FBRUQsTUFBTSxPQUFPLHNCQUNYLFNBQVEsa0JBQWtCO0lBSVA7SUFGbkIsT0FBTyxDQUE0QztJQUVuRCxZQUFtQixPQUFlLEVBQUUsT0FBdUM7UUFDekUsS0FBSyxFQUFFLENBQUM7UUFEUyxZQUFPLEdBQVAsT0FBTyxDQUFRO1FBRWhDLElBQUksQ0FBQyxPQUFPLEdBQUcsT0FBTyxJQUFJLFNBQVMsQ0FBQztJQUN0QyxDQUFDO0lBRUQsTUFBTSxDQUFDLE9BQU8sQ0FBVTtJQUV4QixNQUFNLENBQUMsS0FBSyxDQUFDLE9BQU8sQ0FDbEIsR0FBVyxFQUNYLE9BQXVDO1FBR3ZDLE1BQU0sT0FBTyxHQUFHLE1BQU0sYUFBYSxDQUFDLE9BQU8sQ0FBQyxDQUFBO1FBRTVDLDhHQUE4RztRQUU5RyxPQUFPLEtBQUssQ0FBQyxLQUFLLElBQUksRUFBRTtZQUN0QixPQUFPLENBQUMsR0FBRyxDQUFDLHVDQUF1QyxLQUFLLENBQUMsV0FBVyxjQUFjLEtBQUssQ0FBQyxZQUFZLFNBQVMsR0FBRyxFQUFFLENBQUMsQ0FBQztZQUNwSCxJQUFJLENBQUM7Z0JBQ0gsTUFBTSxJQUFJLEdBQUcsTUFBTSxPQUFPLENBQUMsT0FBTyxDQUFDLENBQUE7Z0JBQ25DLElBQUksQ0FBQztvQkFDSCxNQUFNLElBQUksQ0FBQyxJQUFJLENBQUMsR0FBRyxFQUFFO3dCQUNuQixPQUFPLEVBQUUsSUFBSTt3QkFDYixTQUFTLEVBQUUsa0JBQWtCO3dCQUM3QixHQUFHLE9BQU8sRUFBRSxXQUFXO3FCQUN4QixDQUFDLENBQUM7b0JBRUgsTUFBTSxRQUFRLEdBQUcsT0FBTyxFQUFFLFFBQVE7d0JBQ2hDLENBQUMsQ0FBQyxNQUFNLE9BQU8sRUFBRSxRQUFRLENBQUMsSUFBSSxFQUFFLE9BQU8sQ0FBQzt3QkFDeEMsQ0FBQyxDQUFDLE1BQU0sSUFBSSxDQUFDLFFBQVEsQ0FBQyxHQUFHLEVBQUUsQ0FBQyxRQUFRLENBQUMsSUFBSSxDQUFDLFNBQVMsQ0FBQyxDQUFDO29CQUV2RCxPQUFPLFFBQVEsQ0FBQTtnQkFDakIsQ0FBQzt3QkFBUyxDQUFDO29CQUNULE1BQU0sSUFBSSxDQUFDLEtBQUssRUFBRSxDQUFBO2dCQUNwQixDQUFDO1lBQ0gsQ0FBQztvQkFBUyxDQUFDO2dCQUNULE9BQU8sQ0FBQyxHQUFHLENBQUMsc0NBQXNDLEtBQUssQ0FBQyxXQUFXLGNBQWMsS0FBSyxDQUFDLFlBQVksU0FBUyxHQUFHLEVBQUUsQ0FBQyxDQUFDO2dCQUNuSCxjQUFjLEVBQUUsQ0FBQztZQUNuQixDQUFDO1FBQ0gsQ0FBQyxDQUFDLENBQUE7SUFDSixDQUFDO0lBRUQ7Ozs7T0FJRztJQUNILEtBQUssQ0FBQyxNQUFNO1FBQ1YsT0FBTyxzQkFBc0IsQ0FBQyxPQUFPLENBQUMsSUFBSSxDQUFDLE9BQU8sRUFBRSxJQUFJLENBQUMsT0FBTyxDQUFDLENBQUM7SUFDcEUsQ0FBQztJQUVEOzs7O09BSUc7SUFDSCxLQUFLLENBQUMsSUFBSTtRQUNSLE1BQU0sSUFBSSxHQUFHLE1BQU0sSUFBSSxDQUFDLE1BQU0sRUFBRSxDQUFDO1FBRWpDLE1BQU0sUUFBUSxHQUFHLEVBQUUsTUFBTSxFQUFFLElBQUksQ0FBQyxPQUFPLEVBQUUsQ0FBQztRQUMxQyxPQUFPLENBQUMsSUFBSSxRQUFRLENBQUMsRUFBRSxXQUFXLEVBQUUsSUFBSSxFQUFFLFFBQVEsRUFBRSxDQUFDLENBQUMsQ0FBQztJQUN6RCxDQUFDO0lBRUQ7Ozs7Ozs7O09BUUc7SUFDSCxNQUFNLENBQUMsS0FBSyxDQUFDLFdBQVcsQ0FDdEIsR0FBVyxFQUNYLE9BQXVDO1FBRXZDLE1BQU0sRUFBRSxNQUFNLEVBQUUsR0FBRyxNQUFNLHNCQUFzQixDQUFDLE9BQU8sRUFBRSxDQUFDO1FBRTFELE1BQU0sT0FBTyxHQUFHLE1BQU0sTUFBTSxDQUFDO1lBQzNCLFFBQVEsRUFBRSxJQUFJO1lBQ2QsZUFBZSxFQUFFLElBQUk7WUFDckIsSUFBSSxFQUFFLENBQUMsaUJBQWlCLENBQUM7WUFDekIsaUJBQWlCLEVBQUUsQ0FBQyxzQkFBc0IsQ0FBQztZQUMzQyxHQUFHLE9BQU8sRUFBRSxhQUFhO1NBQzFCLENBQUMsQ0FBQztRQUNILE1BQU0sSUFBSSxHQUFHLE1BQU0sT0FBTyxDQUFDLE9BQU8sRUFBRSxDQUFDO1FBRXJDLE1BQU0sSUFBSSxDQUFDLElBQUksQ0FBQyxHQUFHLEVBQUU7WUFDbkIsT0FBTyxFQUFFLE1BQU07WUFDZixTQUFTLEVBQUUsa0JBQWtCO1lBQzdCLEdBQUcsT0FBTyxFQUFFLFdBQVc7U0FDeEIsQ0FBQyxDQUFDO1FBQ0gsTUFBTSxVQUFVLEdBQUcsTUFBTSxJQUFJLENBQUMsVUFBVSxFQUFFLENBQUM7UUFDM0MsTUFBTSxNQUFNLEdBQUcsVUFBVSxDQUFDLFFBQVEsQ0FBQyxRQUFRLENBQUMsQ0FBQztRQUM3QyxNQUFNLFFBQVEsR0FBRyxFQUFFLE1BQU0sRUFBRSxHQUFHLEVBQUUsQ0FBQztRQUNqQyxPQUFPLElBQUksUUFBUSxDQUFDLEVBQUUsV0FBVyxFQUFFLE1BQU0sRUFBRSxRQUFRLEVBQUUsQ0FBQyxDQUFDO0lBQ3pELENBQUM7SUFFRDs7Ozs7T0FLRztJQUNILEtBQUssQ0FBQyxVQUFVO1FBQ2QsT0FBTyxzQkFBc0IsQ0FBQyxXQUFXLENBQUMsSUFBSSxDQUFDLE9BQU8sRUFBRSxJQUFJLENBQUMsT0FBTyxDQUFDLENBQUM7SUFDeEUsQ0FBQztJQUVEOzs7O09BSUc7SUFDSCxNQUFNLENBQUMsS0FBSyxDQUFDLE9BQU87UUFHbEIsSUFBSSxDQUFDO1lBQ0gsNkRBQTZEO1lBQzdELE1BQU0sRUFBRSxNQUFNLEVBQUUsR0FBRyxNQUFNLE1BQU0sQ0FBQyxXQUFXLENBQUMsQ0FBQTtZQUM1QyxPQUFPLEVBQUUsTUFBTSxFQUFFLENBQUM7UUFDcEIsQ0FBQztRQUFDLE9BQU8sQ0FBQyxFQUFFLENBQUM7WUFDWCxPQUFPLENBQUMsS0FBSyxDQUFDLENBQUMsQ0FBQyxDQUFDO1lBQ2pCLE1BQU0sSUFBSSxLQUFLLENBQ2IsMEVBQTBFLENBQzNFLENBQUM7UUFDSixDQUFDO0lBQ0gsQ0FBQztDQUNGIn0= \ No newline at end of file diff --git a/packages/search/package-lock.json b/packages/search/package-lock.json index 5d3b8dcb..90afc834 100644 --- a/packages/search/package-lock.json +++ b/packages/search/package-lock.json @@ -32,6 +32,7 @@ "md5": "^2.3.0", "node-html-parser": "^6.1.12", "node-xlsx": "^0.23.0", + "p-limit": "^7.2.0", "p-map": "^4.0.0", "publish": "^0.6.0", "puppeteer": "^19.11.1", @@ -12561,6 +12562,21 @@ "node": ">=4" } }, + "node_modules/p-limit": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-7.2.0.tgz", + "integrity": "sha512-ATHLtwoTNDloHRFFxFJdHnG6n2WUeFjaR8XQMFdKIv0xkXjrER8/iG9iu265jOM95zXHAfv9oTkqhrfbIzosrQ==", + "license": "MIT", + "dependencies": { + "yocto-queue": "^1.2.1" + }, + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-map": { "version": "4.0.0", "license": "MIT", @@ -13917,6 +13933,18 @@ "fd-slicer": "~1.1.0" } }, + "node_modules/yocto-queue": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.2.tgz", + "integrity": "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==", + "license": "MIT", + "engines": { + "node": ">=12.20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/zod": { "version": "3.25.76", "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", diff --git a/packages/search/package.json b/packages/search/package.json index 60479221..7071158e 100644 --- a/packages/search/package.json +++ b/packages/search/package.json @@ -38,6 +38,7 @@ "md5": "^2.3.0", "node-html-parser": "^6.1.12", "node-xlsx": "^0.23.0", + "p-limit": "^7.2.0", "p-map": "^4.0.0", "publish": "^0.6.0", "puppeteer": "^19.11.1", diff --git a/packages/search/src/lib/email.ts b/packages/search/src/lib/email.ts index 4332eadd..dbb05572 100644 --- a/packages/search/src/lib/email.ts +++ b/packages/search/src/lib/email.ts @@ -1,4 +1,5 @@ import { logger } from '../index.js' +import pMap from 'p-map' import { CheerioWebBaseLoader } from "langchain/document_loaders/web/cheerio" import { RecursiveCharacterTextSplitter } from "langchain/text_splitter" import { htmlToText } from "html-to-text" @@ -60,20 +61,19 @@ export const puppeteerLoader = async (url: string, headless: boolean, location: url, { launchOptions: { - headless, + headless: headless, ignoreHTTPSErrors: true }, gotoOptions: { - timeout: 15000, + timeout: location.pageTimeout || 15000, waitUntil: "networkidle0", }, async evaluate(page, browser) { if (checkCancelled && await checkCancelled()) { - debugger const pid = browser.process()?.pid; - logger.warn(`Killing browser process ${pid} due to cancellation`); - await browser.close(); + logger.warn(`Cancellation requested inside evaluate for process ${pid}`); + // Do not close browser, it is shared. Page will be closed by finally block in pupeteer.ts throw new Error('CancelledByUser'); } const result = await page.evaluate(() => document.body.innerHTML) @@ -83,20 +83,25 @@ export const puppeteerLoader = async (url: string, headless: boolean, location: } ) // Race load against cancellation - const loadPromise = loaderWithOptions.load(); + let isFinished = false; + const loadPromise = loaderWithOptions.load().finally(() => { + isFinished = true; + }); const cancelPromise = new Promise(async (_, reject) => { if (!checkCancelled) return; // Poll for cancellation - while (true) { + while (!isFinished) { await new Promise(r => setTimeout(r, 1000)); + logger.info('Checking cancellation for ' + url); if (await checkCancelled()) { const browser = await getBrowser(); if (browser) { const pid = browser.process()?.pid; - logger.info(`Killing browser process ${pid} due to cancellation`); - await browser.close(); + logger.info(`Cancellation confirmed for process ${pid}`); + // await browser.close(); // Do not close shared browser } + reject(new Error('CancelledByUser')); break; } @@ -148,7 +153,7 @@ export const findEMail = async (question: string, url: string, opts: { headless? return false } let pageUrl = url - let docs = await puppeteerLoader(pageUrl, opts.headless, location, opts.checkCancelled) as any + let docs = await puppeteerLoader(pageUrl, opts.headless, { ...location, pageTimeout: opts.pageTimeout }, opts.checkCancelled) as any let emails: string[] = [] docs.forEach((d: any) => { if (d.pageContent && d.pageContent.indexOf('@') !== -1) { @@ -176,24 +181,38 @@ export const findEmailEach = async (location: LocalResult, opts: { headless?: bo const emails: string[] = [] const abortAfter = opts.abortAfter ?? 1 - for (const page of location.meta.pages) { + const concurrency = opts.concurrency || 2 + const maxPages = opts.maxPages || 15 + const contactKeywords = ['contact', 'kontakt', 'contacto', 'contatto', 'info', 'imprint', 'impressum', 'help', 'support', 'about']; + + // Sort pages: prioritize contact pages + const pagesToSearch = location.meta.pages.sort((a, b) => { + const urlA = a.url.toLowerCase(); + const urlB = b.url.toLowerCase(); + + const scoreA = contactKeywords.some(k => urlA.includes(k)) ? 1 : 0; + const scoreB = contactKeywords.some(k => urlB.includes(k)) ? 1 : 0; + + return scoreB - scoreA; // Descending order (contact pages first) + }).slice(0, maxPages) + + await pMap(pagesToSearch, async (page: Page) => { if (opts.checkCancelled && await opts.checkCancelled()) { - debugger - logger.info(`[findEmailEach] Cancellation requested for ${location.title}`); - break; + // logger.info(`[findEmailEach] Cancellation requested for ${location.title}`); + return } if (emails.length >= abortAfter) { - break + return } if (page.status !== 'PENDING') { - continue + return } page.status = 'SEARCHING_EMAIL' try { - logger.info(`Scraping email from ${page.url}`); + logger.info(`Scraping email from ${page.url}`) const pageEmails = await findEMail('find email', page.url, opts, location) if (pageEmails && Array.isArray(pageEmails)) { emails.push(...pageEmails) @@ -201,7 +220,6 @@ export const findEmailEach = async (location: LocalResult, opts: { headless?: bo page.status = 'SEARCHED_EMAIL' } catch (error) { if (error.message === 'CancelledByUser') { - debugger throw error; } page.status = 'FAILED' @@ -212,7 +230,7 @@ export const findEmailEach = async (location: LocalResult, opts: { headless?: bo if (onProgress) { await onProgress(page) } - } + }, { concurrency, stopOnError: false }) // Update location emails if (emails.length > 0) { diff --git a/packages/search/src/lib/pupeteer.ts b/packages/search/src/lib/pupeteer.ts index 5c15de51..472c3e24 100644 --- a/packages/search/src/lib/pupeteer.ts +++ b/packages/search/src/lib/pupeteer.ts @@ -8,6 +8,7 @@ import type { import { Document } from "@langchain/core/documents" import { BaseDocumentLoader, DocumentLoader } from "langchain/document_loaders/base" +import pLimit from "p-limit" export { Page, Browser } export type PuppeteerGotoOptions = WaitForOptions & { @@ -48,27 +49,46 @@ export type PuppeteerWebBaseLoaderOptions = { * const screenshot = await loader.screenshot(); * ``` */ -let browser: Browser = null -let page: Page = null +// Singleton browser promise to prevent race conditions +let browserPromise: Promise | null = null; +let idleTimer: NodeJS.Timeout | null = null; +const limit = pLimit(parseInt(process.env.EMAIL_SEARCH_MAX_PUPETEER_PAGES || '10')) +const IDLE_TIMEOUT_SECONDS = parseInt(process.env.EMAIL_SEARCH_PUPETEER_IDLE_TIMEOUT_SECONDS || '60'); -const launchBrowser = async (options?: PuppeteerWebBaseLoaderOptions): Promise => { - if (browser) return browser - const { launch } = await PuppeteerWebBaseLoader.imports(); - browser = await launch({ - headless: true, - defaultViewport: null, - ignoreDefaultArgs: ["--disable-extensions"], - ...options?.launchOptions, - }) - return browser +const resetIdleTimer = () => { + if (idleTimer) clearTimeout(idleTimer); + idleTimer = setTimeout(async () => { + if (browserPromise) { + console.log(`[Puppeteer] Browser idle timeout (${IDLE_TIMEOUT_SECONDS}s) reached, closing browser`); + const browser = await browserPromise; + await browser.close(); + browserPromise = null; + } + }, IDLE_TIMEOUT_SECONDS * 1000); } -export const getBrowser = () => browser +const launchBrowser = async (options?: PuppeteerWebBaseLoaderOptions): Promise => { + resetIdleTimer(); + if (browserPromise) return browserPromise; + + browserPromise = (async () => { + const { launch } = await PuppeteerWebBaseLoader.imports(); + const b = await launch({ + headless: process.env.EMAIL_SEARCH_HEADLESS === 'false' ? false : true, + defaultViewport: null, + ignoreDefaultArgs: ["--disable-extensions"], + ...options?.launchOptions, + }); + return b; + })(); + + return browserPromise; +} + +export const getBrowser = () => browserPromise; export const getPage = async (browser: Browser) => { - if(page) - return page - - page = await browser.newPage() + // Always create a new page for concurrency + const page = await browser.newPage() return page } @@ -91,21 +111,32 @@ export class PuppeteerWebBaseLoader const browser = await launchBrowser(options) - PuppeteerWebBaseLoader.browser = browser - const page = await getPage(browser) - await page.goto(url, { - timeout: 5000, - waitUntil: "domcontentloaded", - ...options?.gotoOptions, - }); + // PuppeteerWebBaseLoader.browser = browser // Static property usage is deprecated/incorrect with this pattern - const bodyHTML = options?.evaluate - ? await options?.evaluate(page, browser) - : await page.evaluate(() => document.body.innerHTML); + return limit(async () => { + console.log(`[Puppeteer] Entering limit (Active: ${limit.activeCount}, Pending: ${limit.pendingCount}) for ${url}`); + try { + const page = await getPage(browser) + try { + await page.goto(url, { + timeout: 5000, + waitUntil: "domcontentloaded", + ...options?.gotoOptions, + }); - //await browser.close() + const bodyHTML = options?.evaluate + ? await options?.evaluate(page, browser) + : await page.evaluate(() => document.body.innerHTML); - return bodyHTML + return bodyHTML + } finally { + await page.close() + } + } finally { + console.log(`[Puppeteer] Exiting limit (Active: ${limit.activeCount}, Pending: ${limit.pendingCount}) for ${url}`); + resetIdleTimer(); + } + }) } /**