import { prisma } from '../prisma.js' import { NOTIFICATION_STATUSES, MAX_RETRY_ATTEMPTS, RETRY_DELAYS_MS } from '../../../../shared/constants/notification-events.js' import { emailChannel } from './channels/email-channel.js' import { telegramChannel } from './channels/telegram-channel.js' const { PENDING, SENT, FAILED } = NOTIFICATION_STATUSES const channels = { email: emailChannel, telegram: telegramChannel, } class NotificationQueue { constructor() { this.tasks = [] this.processing = 0 this.maxConcurrent = 5 this.intervalMs = 2000 this.running = false } enqueue(task) { this.tasks.push({ ...task, enqueuedAt: Date.now() }) } start() { if (this.running) return this.running = true this._tick() } stop() { this.running = false } _tick() { if (!this.running) return this._processAvailable() setTimeout(() => this._tick(), this.intervalMs) } _processAvailable() { while (this.tasks.length > 0 && this.processing < this.maxConcurrent) { const task = this.tasks.shift() this.processing++ this._execute(task).finally(() => { this.processing-- }) } } async _execute(task) { const channel = channels[task.channel] if (!channel) { await this._markFailed(task.logId, `Unknown channel: ${task.channel}`) return } try { const result = await channel.send({ recipient: task.recipient, eventType: task.eventType, payload: task.payload, }) if (result.success) { await this._markSent(task.logId) } else { await this._handleFailure(task.logId, task, result.error) } } catch (err) { await this._handleFailure(task.logId, task, err.message) } } async _markSent(logId) { await prisma.notificationLog.update({ where: { id: logId }, data: { status: SENT }, }) } async _markFailed(logId, error) { await prisma.notificationLog.update({ where: { id: logId }, data: { status: FAILED, error }, }) } async _handleFailure(logId, task, error) { const log = await prisma.notificationLog.findUnique({ where: { id: logId } }) const newAttempts = (log?.attempts || 0) + 1 if (newAttempts >= MAX_RETRY_ATTEMPTS) { await this._markFailed(logId, error) return } await prisma.notificationLog.update({ where: { id: logId }, data: { attempts: newAttempts }, }) const delay = RETRY_DELAYS_MS[newAttempts - 1] || RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1] setTimeout(() => { this.enqueue({ ...task, logId }) }, delay) } async flushPendingOnStartup() { const pending = await prisma.notificationLog.findMany({ where: { status: PENDING }, }) for (const log of pending) { await prisma.notificationLog.update({ where: { id: log.id }, data: { status: FAILED, error: 'Server restarted, pending notification lost' }, }) } if (pending.length > 0) { console.log(`[notifications] Marked ${pending.length} pending notifications as failed on startup`) } } } export function createNotificationQueue() { return new NotificationQueue() }