email worker

This commit is contained in:
babayaga 2025-12-26 11:28:43 +01:00
parent 16a541127c
commit 241b7a8712
7 changed files with 220 additions and 97 deletions

File diff suppressed because one or more lines are too long

View File

@ -17,7 +17,7 @@ export type PuppeteerWebBaseLoaderOptions = {
gotoOptions?: PuppeteerGotoOptions;
evaluate?: PuppeteerEvaluate;
};
export declare const getBrowser: () => Browser;
export declare const getBrowser: () => Promise<Browser>;
export declare const getPage: (browser: Browser) => Promise<Page>;
export declare class PuppeteerWebBaseLoader extends BaseDocumentLoader implements DocumentLoader {
webPath: string;

File diff suppressed because one or more lines are too long

View File

@ -32,6 +32,7 @@
"md5": "^2.3.0",
"node-html-parser": "^6.1.12",
"node-xlsx": "^0.23.0",
"p-limit": "^7.2.0",
"p-map": "^4.0.0",
"publish": "^0.6.0",
"puppeteer": "^19.11.1",
@ -12561,6 +12562,21 @@
"node": ">=4"
}
},
"node_modules/p-limit": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-7.2.0.tgz",
"integrity": "sha512-ATHLtwoTNDloHRFFxFJdHnG6n2WUeFjaR8XQMFdKIv0xkXjrER8/iG9iu265jOM95zXHAfv9oTkqhrfbIzosrQ==",
"license": "MIT",
"dependencies": {
"yocto-queue": "^1.2.1"
},
"engines": {
"node": ">=20"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-map": {
"version": "4.0.0",
"license": "MIT",
@ -13917,6 +13933,18 @@
"fd-slicer": "~1.1.0"
}
},
"node_modules/yocto-queue": {
"version": "1.2.2",
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.2.tgz",
"integrity": "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==",
"license": "MIT",
"engines": {
"node": ">=12.20"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/zod": {
"version": "3.25.76",
"resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz",

View File

@ -38,6 +38,7 @@
"md5": "^2.3.0",
"node-html-parser": "^6.1.12",
"node-xlsx": "^0.23.0",
"p-limit": "^7.2.0",
"p-map": "^4.0.0",
"publish": "^0.6.0",
"puppeteer": "^19.11.1",

View File

@ -1,4 +1,5 @@
import { logger } from '../index.js'
import pMap from 'p-map'
import { CheerioWebBaseLoader } from "langchain/document_loaders/web/cheerio"
import { RecursiveCharacterTextSplitter } from "langchain/text_splitter"
import { htmlToText } from "html-to-text"
@ -60,20 +61,19 @@ export const puppeteerLoader = async (url: string, headless: boolean, location:
url,
{
launchOptions: {
headless,
headless: headless,
ignoreHTTPSErrors: true
},
gotoOptions: {
timeout: 15000,
timeout: location.pageTimeout || 15000,
waitUntil: "networkidle0",
},
async evaluate(page, browser) {
if (checkCancelled && await checkCancelled()) {
debugger
const pid = browser.process()?.pid;
logger.warn(`Killing browser process ${pid} due to cancellation`);
await browser.close();
logger.warn(`Cancellation requested inside evaluate for process ${pid}`);
// Do not close browser, it is shared. Page will be closed by finally block in pupeteer.ts
throw new Error('CancelledByUser');
}
const result = await page.evaluate(() => document.body.innerHTML)
@ -83,20 +83,25 @@ export const puppeteerLoader = async (url: string, headless: boolean, location:
}
)
// Race load against cancellation
const loadPromise = loaderWithOptions.load();
let isFinished = false;
const loadPromise = loaderWithOptions.load().finally(() => {
isFinished = true;
});
const cancelPromise = new Promise<never>(async (_, reject) => {
if (!checkCancelled) return;
// Poll for cancellation
while (true) {
while (!isFinished) {
await new Promise(r => setTimeout(r, 1000));
logger.info('Checking cancellation for ' + url);
if (await checkCancelled()) {
const browser = await getBrowser();
if (browser) {
const pid = browser.process()?.pid;
logger.info(`Killing browser process ${pid} due to cancellation`);
await browser.close();
logger.info(`Cancellation confirmed for process ${pid}`);
// await browser.close(); // Do not close shared browser
}
reject(new Error('CancelledByUser'));
break;
}
@ -148,7 +153,7 @@ export const findEMail = async (question: string, url: string, opts: { headless?
return false
}
let pageUrl = url
let docs = await puppeteerLoader(pageUrl, opts.headless, location, opts.checkCancelled) as any
let docs = await puppeteerLoader(pageUrl, opts.headless, { ...location, pageTimeout: opts.pageTimeout }, opts.checkCancelled) as any
let emails: string[] = []
docs.forEach((d: any) => {
if (d.pageContent && d.pageContent.indexOf('@') !== -1) {
@ -176,24 +181,38 @@ export const findEmailEach = async (location: LocalResult, opts: { headless?: bo
const emails: string[] = []
const abortAfter = opts.abortAfter ?? 1
for (const page of location.meta.pages) {
const concurrency = opts.concurrency || 2
const maxPages = opts.maxPages || 15
const contactKeywords = ['contact', 'kontakt', 'contacto', 'contatto', 'info', 'imprint', 'impressum', 'help', 'support', 'about'];
// Sort pages: prioritize contact pages
const pagesToSearch = location.meta.pages.sort((a, b) => {
const urlA = a.url.toLowerCase();
const urlB = b.url.toLowerCase();
const scoreA = contactKeywords.some(k => urlA.includes(k)) ? 1 : 0;
const scoreB = contactKeywords.some(k => urlB.includes(k)) ? 1 : 0;
return scoreB - scoreA; // Descending order (contact pages first)
}).slice(0, maxPages)
await pMap(pagesToSearch, async (page: Page) => {
if (opts.checkCancelled && await opts.checkCancelled()) {
debugger
logger.info(`[findEmailEach] Cancellation requested for ${location.title}`);
break;
// logger.info(`[findEmailEach] Cancellation requested for ${location.title}`);
return
}
if (emails.length >= abortAfter) {
break
return
}
if (page.status !== 'PENDING') {
continue
return
}
page.status = 'SEARCHING_EMAIL'
try {
logger.info(`Scraping email from ${page.url}`);
logger.info(`Scraping email from ${page.url}`)
const pageEmails = await findEMail('find email', page.url, opts, location)
if (pageEmails && Array.isArray(pageEmails)) {
emails.push(...pageEmails)
@ -201,7 +220,6 @@ export const findEmailEach = async (location: LocalResult, opts: { headless?: bo
page.status = 'SEARCHED_EMAIL'
} catch (error) {
if (error.message === 'CancelledByUser') {
debugger
throw error;
}
page.status = 'FAILED'
@ -212,7 +230,7 @@ export const findEmailEach = async (location: LocalResult, opts: { headless?: bo
if (onProgress) {
await onProgress(page)
}
}
}, { concurrency, stopOnError: false })
// Update location emails
if (emails.length > 0) {

View File

@ -8,6 +8,7 @@ import type {
import { Document } from "@langchain/core/documents"
import { BaseDocumentLoader, DocumentLoader } from "langchain/document_loaders/base"
import pLimit from "p-limit"
export { Page, Browser }
export type PuppeteerGotoOptions = WaitForOptions & {
@ -48,27 +49,46 @@ export type PuppeteerWebBaseLoaderOptions = {
* const screenshot = await loader.screenshot();
* ```
*/
let browser: Browser = null
let page: Page = null
// Singleton browser promise to prevent race conditions
let browserPromise: Promise<Browser> | null = null;
let idleTimer: NodeJS.Timeout | null = null;
const limit = pLimit(parseInt(process.env.EMAIL_SEARCH_MAX_PUPETEER_PAGES || '10'))
const IDLE_TIMEOUT_SECONDS = parseInt(process.env.EMAIL_SEARCH_PUPETEER_IDLE_TIMEOUT_SECONDS || '60');
const launchBrowser = async (options?: PuppeteerWebBaseLoaderOptions): Promise<Browser> => {
if (browser) return browser
const { launch } = await PuppeteerWebBaseLoader.imports();
browser = await launch({
headless: true,
defaultViewport: null,
ignoreDefaultArgs: ["--disable-extensions"],
...options?.launchOptions,
})
return browser
const resetIdleTimer = () => {
if (idleTimer) clearTimeout(idleTimer);
idleTimer = setTimeout(async () => {
if (browserPromise) {
console.log(`[Puppeteer] Browser idle timeout (${IDLE_TIMEOUT_SECONDS}s) reached, closing browser`);
const browser = await browserPromise;
await browser.close();
browserPromise = null;
}
}, IDLE_TIMEOUT_SECONDS * 1000);
}
export const getBrowser = () => browser
export const getPage = async (browser: Browser) => {
if(page)
return page
const launchBrowser = async (options?: PuppeteerWebBaseLoaderOptions): Promise<Browser> => {
resetIdleTimer();
if (browserPromise) return browserPromise;
page = await browser.newPage()
browserPromise = (async () => {
const { launch } = await PuppeteerWebBaseLoader.imports();
const b = await launch({
headless: process.env.EMAIL_SEARCH_HEADLESS === 'false' ? false : true,
defaultViewport: null,
ignoreDefaultArgs: ["--disable-extensions"],
...options?.launchOptions,
});
return b;
})();
return browserPromise;
}
export const getBrowser = () => browserPromise;
export const getPage = async (browser: Browser) => {
// Always create a new page for concurrency
const page = await browser.newPage()
return page
}
@ -91,21 +111,32 @@ export class PuppeteerWebBaseLoader
const browser = await launchBrowser(options)
PuppeteerWebBaseLoader.browser = browser
const page = await getPage(browser)
await page.goto(url, {
timeout: 5000,
waitUntil: "domcontentloaded",
...options?.gotoOptions,
});
// PuppeteerWebBaseLoader.browser = browser // Static property usage is deprecated/incorrect with this pattern
const bodyHTML = options?.evaluate
? await options?.evaluate(page, browser)
: await page.evaluate(() => document.body.innerHTML);
return limit(async () => {
console.log(`[Puppeteer] Entering limit (Active: ${limit.activeCount}, Pending: ${limit.pendingCount}) for ${url}`);
try {
const page = await getPage(browser)
try {
await page.goto(url, {
timeout: 5000,
waitUntil: "domcontentloaded",
...options?.gotoOptions,
});
//await browser.close()
const bodyHTML = options?.evaluate
? await options?.evaluate(page, browser)
: await page.evaluate(() => document.body.innerHTML);
return bodyHTML
return bodyHTML
} finally {
await page.close()
}
} finally {
console.log(`[Puppeteer] Exiting limit (Active: ${limit.activeCount}, Pending: ${limit.pendingCount}) for ${url}`);
resetIdleTimer();
}
})
}
/**