// In-memory email queue with rate limiting // Processes emails asynchronously in the background without blocking the request thread import { generateId } from './utils.js'; // ==================== Types ==================== export interface EmailJob { id: string; type: 'template'; params: TemplateEmailJobParams; addedAt: number; } export interface TemplateEmailJobParams { templateSlug: string; to: string; toName?: string; variables: Record; locale?: string; eventId?: string; sentBy?: string; } export interface QueueStatus { queued: number; processing: boolean; sentInLastHour: number; maxPerHour: number; } // ==================== Queue State ==================== const queue: EmailJob[] = []; const sentTimestamps: number[] = []; let processing = false; let processTimer: ReturnType | null = null; // Lazy reference to emailService to avoid circular imports let _emailService: any = null; function getEmailService() { if (!_emailService) { // Dynamic import to avoid circular dependency throw new Error('[EmailQueue] Email service not initialized. Call initEmailQueue() first.'); } return _emailService; } /** * Initialize the email queue with a reference to the email service. * Must be called once at startup. */ export function initEmailQueue(emailService: any): void { _emailService = emailService; console.log('[EmailQueue] Initialized'); } // ==================== Rate Limiting ==================== function getMaxPerHour(): number { return parseInt(process.env.MAX_EMAILS_PER_HOUR || '30', 10); } /** * Clean up timestamps older than 1 hour */ function cleanOldTimestamps(): void { const oneHourAgo = Date.now() - 3_600_000; while (sentTimestamps.length > 0 && sentTimestamps[0] <= oneHourAgo) { sentTimestamps.shift(); } } // ==================== Queue Operations ==================== /** * Add a single email job to the queue. * Returns the job ID. */ export function enqueueEmail(params: TemplateEmailJobParams): string { const id = generateId(); queue.push({ id, type: 'template', params, addedAt: Date.now(), }); scheduleProcessing(); return id; } /** * Add multiple email jobs to the queue at once. * Returns array of job IDs. */ export function enqueueBulkEmails(paramsList: TemplateEmailJobParams[]): string[] { const ids: string[] = []; for (const params of paramsList) { const id = generateId(); queue.push({ id, type: 'template', params, addedAt: Date.now(), }); ids.push(id); } if (ids.length > 0) { console.log(`[EmailQueue] Queued ${ids.length} emails for background processing`); scheduleProcessing(); } return ids; } /** * Get current queue status */ export function getQueueStatus(): QueueStatus { cleanOldTimestamps(); return { queued: queue.length, processing, sentInLastHour: sentTimestamps.length, maxPerHour: getMaxPerHour(), }; } // ==================== Processing ==================== function scheduleProcessing(): void { if (processing) return; processing = true; // Start processing on next tick to not block the caller setImmediate(() => processNext()); } async function processNext(): Promise { if (queue.length === 0) { processing = false; console.log('[EmailQueue] Queue empty. Processing stopped.'); return; } // Rate limit check cleanOldTimestamps(); const maxPerHour = getMaxPerHour(); if (sentTimestamps.length >= maxPerHour) { // Calculate when the oldest timestamp in the window expires const waitMs = sentTimestamps[0] + 3_600_000 - Date.now() + 500; // 500ms buffer console.log( `[EmailQueue] Rate limit reached (${maxPerHour}/hr). ` + `Pausing for ${Math.ceil(waitMs / 1000)}s. ${queue.length} email(s) remaining.` ); processTimer = setTimeout(() => processNext(), waitMs); return; } // Dequeue and process const job = queue.shift()!; try { const emailService = getEmailService(); await emailService.sendTemplateEmail(job.params); sentTimestamps.push(Date.now()); console.log( `[EmailQueue] Sent email ${job.id} to ${job.params.to}. ` + `Queue: ${queue.length} remaining. Sent this hour: ${sentTimestamps.length}/${maxPerHour}` ); } catch (error: any) { console.error( `[EmailQueue] Failed to send email ${job.id} to ${job.params.to}:`, error?.message || error ); // The sendTemplateEmail method already logs the failure in the email_logs table, // so we don't need to retry here. The error is logged and we move on. } // Small delay between sends to be gentle on the email server processTimer = setTimeout(() => processNext(), 200); } /** * Stop processing (for graceful shutdown) */ export function stopQueue(): void { if (processTimer) { clearTimeout(processTimer); processTimer = null; } processing = false; console.log(`[EmailQueue] Stopped. ${queue.length} email(s) remaining in queue.`); }