- Add in-memory email queue with rate limiting (MAX_EMAILS_PER_HOUR) - Bulk send to event attendees now queues and returns immediately - Frontend shows 'Emails are being sent in the background' - Legal pages, settings, and placeholders updates Co-authored-by: Cursor <cursoragent@cursor.com>
195 lines
4.9 KiB
TypeScript
195 lines
4.9 KiB
TypeScript
// In-memory email queue with rate limiting
|
|
// Processes emails asynchronously in the background without blocking the request thread
|
|
|
|
import { generateId } from './utils.js';
|
|
|
|
// ==================== Types ====================
|
|
|
|
export interface EmailJob {
|
|
id: string;
|
|
type: 'template';
|
|
params: TemplateEmailJobParams;
|
|
addedAt: number;
|
|
}
|
|
|
|
export interface TemplateEmailJobParams {
|
|
templateSlug: string;
|
|
to: string;
|
|
toName?: string;
|
|
variables: Record<string, any>;
|
|
locale?: string;
|
|
eventId?: string;
|
|
sentBy?: string;
|
|
}
|
|
|
|
export interface QueueStatus {
|
|
queued: number;
|
|
processing: boolean;
|
|
sentInLastHour: number;
|
|
maxPerHour: number;
|
|
}
|
|
|
|
// ==================== Queue State ====================
|
|
|
|
const queue: EmailJob[] = [];
|
|
const sentTimestamps: number[] = [];
|
|
let processing = false;
|
|
let processTimer: ReturnType<typeof setTimeout> | null = null;
|
|
|
|
// Lazy reference to emailService to avoid circular imports
|
|
let _emailService: any = null;
|
|
|
|
function getEmailService() {
|
|
if (!_emailService) {
|
|
// Dynamic import to avoid circular dependency
|
|
throw new Error('[EmailQueue] Email service not initialized. Call initEmailQueue() first.');
|
|
}
|
|
return _emailService;
|
|
}
|
|
|
|
/**
|
|
* Initialize the email queue with a reference to the email service.
|
|
* Must be called once at startup.
|
|
*/
|
|
export function initEmailQueue(emailService: any): void {
|
|
_emailService = emailService;
|
|
console.log('[EmailQueue] Initialized');
|
|
}
|
|
|
|
// ==================== Rate Limiting ====================
|
|
|
|
function getMaxPerHour(): number {
|
|
return parseInt(process.env.MAX_EMAILS_PER_HOUR || '30', 10);
|
|
}
|
|
|
|
/**
|
|
* Clean up timestamps older than 1 hour
|
|
*/
|
|
function cleanOldTimestamps(): void {
|
|
const oneHourAgo = Date.now() - 3_600_000;
|
|
while (sentTimestamps.length > 0 && sentTimestamps[0] <= oneHourAgo) {
|
|
sentTimestamps.shift();
|
|
}
|
|
}
|
|
|
|
// ==================== Queue Operations ====================
|
|
|
|
/**
|
|
* Add a single email job to the queue.
|
|
* Returns the job ID.
|
|
*/
|
|
export function enqueueEmail(params: TemplateEmailJobParams): string {
|
|
const id = generateId();
|
|
queue.push({
|
|
id,
|
|
type: 'template',
|
|
params,
|
|
addedAt: Date.now(),
|
|
});
|
|
scheduleProcessing();
|
|
return id;
|
|
}
|
|
|
|
/**
|
|
* Add multiple email jobs to the queue at once.
|
|
* Returns array of job IDs.
|
|
*/
|
|
export function enqueueBulkEmails(paramsList: TemplateEmailJobParams[]): string[] {
|
|
const ids: string[] = [];
|
|
for (const params of paramsList) {
|
|
const id = generateId();
|
|
queue.push({
|
|
id,
|
|
type: 'template',
|
|
params,
|
|
addedAt: Date.now(),
|
|
});
|
|
ids.push(id);
|
|
}
|
|
if (ids.length > 0) {
|
|
console.log(`[EmailQueue] Queued ${ids.length} emails for background processing`);
|
|
scheduleProcessing();
|
|
}
|
|
return ids;
|
|
}
|
|
|
|
/**
|
|
* Get current queue status
|
|
*/
|
|
export function getQueueStatus(): QueueStatus {
|
|
cleanOldTimestamps();
|
|
return {
|
|
queued: queue.length,
|
|
processing,
|
|
sentInLastHour: sentTimestamps.length,
|
|
maxPerHour: getMaxPerHour(),
|
|
};
|
|
}
|
|
|
|
// ==================== Processing ====================
|
|
|
|
function scheduleProcessing(): void {
|
|
if (processing) return;
|
|
processing = true;
|
|
// Start processing on next tick to not block the caller
|
|
setImmediate(() => processNext());
|
|
}
|
|
|
|
async function processNext(): Promise<void> {
|
|
if (queue.length === 0) {
|
|
processing = false;
|
|
console.log('[EmailQueue] Queue empty. Processing stopped.');
|
|
return;
|
|
}
|
|
|
|
// Rate limit check
|
|
cleanOldTimestamps();
|
|
const maxPerHour = getMaxPerHour();
|
|
|
|
if (sentTimestamps.length >= maxPerHour) {
|
|
// Calculate when the oldest timestamp in the window expires
|
|
const waitMs = sentTimestamps[0] + 3_600_000 - Date.now() + 500; // 500ms buffer
|
|
console.log(
|
|
`[EmailQueue] Rate limit reached (${maxPerHour}/hr). ` +
|
|
`Pausing for ${Math.ceil(waitMs / 1000)}s. ${queue.length} email(s) remaining.`
|
|
);
|
|
processTimer = setTimeout(() => processNext(), waitMs);
|
|
return;
|
|
}
|
|
|
|
// Dequeue and process
|
|
const job = queue.shift()!;
|
|
|
|
try {
|
|
const emailService = getEmailService();
|
|
await emailService.sendTemplateEmail(job.params);
|
|
sentTimestamps.push(Date.now());
|
|
console.log(
|
|
`[EmailQueue] Sent email ${job.id} to ${job.params.to}. ` +
|
|
`Queue: ${queue.length} remaining. Sent this hour: ${sentTimestamps.length}/${maxPerHour}`
|
|
);
|
|
} catch (error: any) {
|
|
console.error(
|
|
`[EmailQueue] Failed to send email ${job.id} to ${job.params.to}:`,
|
|
error?.message || error
|
|
);
|
|
// The sendTemplateEmail method already logs the failure in the email_logs table,
|
|
// so we don't need to retry here. The error is logged and we move on.
|
|
}
|
|
|
|
// Small delay between sends to be gentle on the email server
|
|
processTimer = setTimeout(() => processNext(), 200);
|
|
}
|
|
|
|
/**
|
|
* Stop processing (for graceful shutdown)
|
|
*/
|
|
export function stopQueue(): void {
|
|
if (processTimer) {
|
|
clearTimeout(processTimer);
|
|
processTimer = null;
|
|
}
|
|
processing = false;
|
|
console.log(`[EmailQueue] Stopped. ${queue.length} email(s) remaining in queue.`);
|
|
}
|