Files
Spanglish/backend/src/lib/emailQueue.ts
Michilis b9f46b02cc Email queue + async sending; legal settings and placeholders
- Add in-memory email queue with rate limiting (MAX_EMAILS_PER_HOUR)
- Bulk send to event attendees now queues and returns immediately
- Frontend shows 'Emails are being sent in the background'
- Legal pages, settings, and placeholders updates

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-12 21:03:49 +00:00

195 lines
4.9 KiB
TypeScript

// In-memory email queue with rate limiting
// Processes emails asynchronously in the background without blocking the request thread
import { generateId } from './utils.js';
// ==================== Types ====================
export interface EmailJob {
id: string;
type: 'template';
params: TemplateEmailJobParams;
addedAt: number;
}
export interface TemplateEmailJobParams {
templateSlug: string;
to: string;
toName?: string;
variables: Record<string, any>;
locale?: string;
eventId?: string;
sentBy?: string;
}
export interface QueueStatus {
queued: number;
processing: boolean;
sentInLastHour: number;
maxPerHour: number;
}
// ==================== Queue State ====================
const queue: EmailJob[] = [];
const sentTimestamps: number[] = [];
let processing = false;
let processTimer: ReturnType<typeof setTimeout> | null = null;
// Lazy reference to emailService to avoid circular imports
let _emailService: any = null;
function getEmailService() {
if (!_emailService) {
// Dynamic import to avoid circular dependency
throw new Error('[EmailQueue] Email service not initialized. Call initEmailQueue() first.');
}
return _emailService;
}
/**
* Initialize the email queue with a reference to the email service.
* Must be called once at startup.
*/
export function initEmailQueue(emailService: any): void {
_emailService = emailService;
console.log('[EmailQueue] Initialized');
}
// ==================== Rate Limiting ====================
function getMaxPerHour(): number {
return parseInt(process.env.MAX_EMAILS_PER_HOUR || '30', 10);
}
/**
* Clean up timestamps older than 1 hour
*/
function cleanOldTimestamps(): void {
const oneHourAgo = Date.now() - 3_600_000;
while (sentTimestamps.length > 0 && sentTimestamps[0] <= oneHourAgo) {
sentTimestamps.shift();
}
}
// ==================== Queue Operations ====================
/**
* Add a single email job to the queue.
* Returns the job ID.
*/
export function enqueueEmail(params: TemplateEmailJobParams): string {
const id = generateId();
queue.push({
id,
type: 'template',
params,
addedAt: Date.now(),
});
scheduleProcessing();
return id;
}
/**
* Add multiple email jobs to the queue at once.
* Returns array of job IDs.
*/
export function enqueueBulkEmails(paramsList: TemplateEmailJobParams[]): string[] {
const ids: string[] = [];
for (const params of paramsList) {
const id = generateId();
queue.push({
id,
type: 'template',
params,
addedAt: Date.now(),
});
ids.push(id);
}
if (ids.length > 0) {
console.log(`[EmailQueue] Queued ${ids.length} emails for background processing`);
scheduleProcessing();
}
return ids;
}
/**
* Get current queue status
*/
export function getQueueStatus(): QueueStatus {
cleanOldTimestamps();
return {
queued: queue.length,
processing,
sentInLastHour: sentTimestamps.length,
maxPerHour: getMaxPerHour(),
};
}
// ==================== Processing ====================
function scheduleProcessing(): void {
if (processing) return;
processing = true;
// Start processing on next tick to not block the caller
setImmediate(() => processNext());
}
async function processNext(): Promise<void> {
if (queue.length === 0) {
processing = false;
console.log('[EmailQueue] Queue empty. Processing stopped.');
return;
}
// Rate limit check
cleanOldTimestamps();
const maxPerHour = getMaxPerHour();
if (sentTimestamps.length >= maxPerHour) {
// Calculate when the oldest timestamp in the window expires
const waitMs = sentTimestamps[0] + 3_600_000 - Date.now() + 500; // 500ms buffer
console.log(
`[EmailQueue] Rate limit reached (${maxPerHour}/hr). ` +
`Pausing for ${Math.ceil(waitMs / 1000)}s. ${queue.length} email(s) remaining.`
);
processTimer = setTimeout(() => processNext(), waitMs);
return;
}
// Dequeue and process
const job = queue.shift()!;
try {
const emailService = getEmailService();
await emailService.sendTemplateEmail(job.params);
sentTimestamps.push(Date.now());
console.log(
`[EmailQueue] Sent email ${job.id} to ${job.params.to}. ` +
`Queue: ${queue.length} remaining. Sent this hour: ${sentTimestamps.length}/${maxPerHour}`
);
} catch (error: any) {
console.error(
`[EmailQueue] Failed to send email ${job.id} to ${job.params.to}:`,
error?.message || error
);
// The sendTemplateEmail method already logs the failure in the email_logs table,
// so we don't need to retry here. The error is logged and we move on.
}
// Small delay between sends to be gentle on the email server
processTimer = setTimeout(() => processNext(), 200);
}
/**
* Stop processing (for graceful shutdown)
*/
export function stopQueue(): void {
if (processTimer) {
clearTimeout(processTimer);
processTimer = null;
}
processing = false;
console.log(`[EmailQueue] Stopped. ${queue.length} email(s) remaining in queue.`);
}