Email queue + async sending; legal settings and placeholders
- Add in-memory email queue with rate limiting (MAX_EMAILS_PER_HOUR) - Bulk send to event attendees now queues and returns immediately - Frontend shows 'Emails are being sent in the background' - Legal pages, settings, and placeholders updates Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
194
backend/src/lib/emailQueue.ts
Normal file
194
backend/src/lib/emailQueue.ts
Normal file
@@ -0,0 +1,194 @@
|
||||
// In-memory email queue with rate limiting
|
||||
// Processes emails asynchronously in the background without blocking the request thread
|
||||
|
||||
import { generateId } from './utils.js';
|
||||
|
||||
// ==================== Types ====================
|
||||
|
||||
export interface EmailJob {
|
||||
id: string;
|
||||
type: 'template';
|
||||
params: TemplateEmailJobParams;
|
||||
addedAt: number;
|
||||
}
|
||||
|
||||
export interface TemplateEmailJobParams {
|
||||
templateSlug: string;
|
||||
to: string;
|
||||
toName?: string;
|
||||
variables: Record<string, any>;
|
||||
locale?: string;
|
||||
eventId?: string;
|
||||
sentBy?: string;
|
||||
}
|
||||
|
||||
export interface QueueStatus {
|
||||
queued: number;
|
||||
processing: boolean;
|
||||
sentInLastHour: number;
|
||||
maxPerHour: number;
|
||||
}
|
||||
|
||||
// ==================== Queue State ====================
|
||||
|
||||
const queue: EmailJob[] = [];
|
||||
const sentTimestamps: number[] = [];
|
||||
let processing = false;
|
||||
let processTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
// Lazy reference to emailService to avoid circular imports
|
||||
let _emailService: any = null;
|
||||
|
||||
function getEmailService() {
|
||||
if (!_emailService) {
|
||||
// Dynamic import to avoid circular dependency
|
||||
throw new Error('[EmailQueue] Email service not initialized. Call initEmailQueue() first.');
|
||||
}
|
||||
return _emailService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the email queue with a reference to the email service.
|
||||
* Must be called once at startup.
|
||||
*/
|
||||
export function initEmailQueue(emailService: any): void {
|
||||
_emailService = emailService;
|
||||
console.log('[EmailQueue] Initialized');
|
||||
}
|
||||
|
||||
// ==================== Rate Limiting ====================
|
||||
|
||||
function getMaxPerHour(): number {
|
||||
return parseInt(process.env.MAX_EMAILS_PER_HOUR || '30', 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up timestamps older than 1 hour
|
||||
*/
|
||||
function cleanOldTimestamps(): void {
|
||||
const oneHourAgo = Date.now() - 3_600_000;
|
||||
while (sentTimestamps.length > 0 && sentTimestamps[0] <= oneHourAgo) {
|
||||
sentTimestamps.shift();
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Queue Operations ====================
|
||||
|
||||
/**
|
||||
* Add a single email job to the queue.
|
||||
* Returns the job ID.
|
||||
*/
|
||||
export function enqueueEmail(params: TemplateEmailJobParams): string {
|
||||
const id = generateId();
|
||||
queue.push({
|
||||
id,
|
||||
type: 'template',
|
||||
params,
|
||||
addedAt: Date.now(),
|
||||
});
|
||||
scheduleProcessing();
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add multiple email jobs to the queue at once.
|
||||
* Returns array of job IDs.
|
||||
*/
|
||||
export function enqueueBulkEmails(paramsList: TemplateEmailJobParams[]): string[] {
|
||||
const ids: string[] = [];
|
||||
for (const params of paramsList) {
|
||||
const id = generateId();
|
||||
queue.push({
|
||||
id,
|
||||
type: 'template',
|
||||
params,
|
||||
addedAt: Date.now(),
|
||||
});
|
||||
ids.push(id);
|
||||
}
|
||||
if (ids.length > 0) {
|
||||
console.log(`[EmailQueue] Queued ${ids.length} emails for background processing`);
|
||||
scheduleProcessing();
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current queue status
|
||||
*/
|
||||
export function getQueueStatus(): QueueStatus {
|
||||
cleanOldTimestamps();
|
||||
return {
|
||||
queued: queue.length,
|
||||
processing,
|
||||
sentInLastHour: sentTimestamps.length,
|
||||
maxPerHour: getMaxPerHour(),
|
||||
};
|
||||
}
|
||||
|
||||
// ==================== Processing ====================
|
||||
|
||||
function scheduleProcessing(): void {
|
||||
if (processing) return;
|
||||
processing = true;
|
||||
// Start processing on next tick to not block the caller
|
||||
setImmediate(() => processNext());
|
||||
}
|
||||
|
||||
async function processNext(): Promise<void> {
|
||||
if (queue.length === 0) {
|
||||
processing = false;
|
||||
console.log('[EmailQueue] Queue empty. Processing stopped.');
|
||||
return;
|
||||
}
|
||||
|
||||
// Rate limit check
|
||||
cleanOldTimestamps();
|
||||
const maxPerHour = getMaxPerHour();
|
||||
|
||||
if (sentTimestamps.length >= maxPerHour) {
|
||||
// Calculate when the oldest timestamp in the window expires
|
||||
const waitMs = sentTimestamps[0] + 3_600_000 - Date.now() + 500; // 500ms buffer
|
||||
console.log(
|
||||
`[EmailQueue] Rate limit reached (${maxPerHour}/hr). ` +
|
||||
`Pausing for ${Math.ceil(waitMs / 1000)}s. ${queue.length} email(s) remaining.`
|
||||
);
|
||||
processTimer = setTimeout(() => processNext(), waitMs);
|
||||
return;
|
||||
}
|
||||
|
||||
// Dequeue and process
|
||||
const job = queue.shift()!;
|
||||
|
||||
try {
|
||||
const emailService = getEmailService();
|
||||
await emailService.sendTemplateEmail(job.params);
|
||||
sentTimestamps.push(Date.now());
|
||||
console.log(
|
||||
`[EmailQueue] Sent email ${job.id} to ${job.params.to}. ` +
|
||||
`Queue: ${queue.length} remaining. Sent this hour: ${sentTimestamps.length}/${maxPerHour}`
|
||||
);
|
||||
} catch (error: any) {
|
||||
console.error(
|
||||
`[EmailQueue] Failed to send email ${job.id} to ${job.params.to}:`,
|
||||
error?.message || error
|
||||
);
|
||||
// The sendTemplateEmail method already logs the failure in the email_logs table,
|
||||
// so we don't need to retry here. The error is logged and we move on.
|
||||
}
|
||||
|
||||
// Small delay between sends to be gentle on the email server
|
||||
processTimer = setTimeout(() => processNext(), 200);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop processing (for graceful shutdown)
|
||||
*/
|
||||
export function stopQueue(): void {
|
||||
if (processTimer) {
|
||||
clearTimeout(processTimer);
|
||||
processTimer = null;
|
||||
}
|
||||
processing = false;
|
||||
console.log(`[EmailQueue] Stopped. ${queue.length} email(s) remaining in queue.`);
|
||||
}
|
||||
Reference in New Issue
Block a user