From 3f83a9be8e5e0b54e6a6587075264eb4fe3c5ef8 Mon Sep 17 00:00:00 2001 From: Kirill Date: Mon, 18 May 2026 11:33:06 +0500 Subject: [PATCH] feat: add notification queue with retry worker --- server/src/lib/notifications/queue.js | 130 ++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 server/src/lib/notifications/queue.js diff --git a/server/src/lib/notifications/queue.js b/server/src/lib/notifications/queue.js new file mode 100644 index 0000000..6658069 --- /dev/null +++ b/server/src/lib/notifications/queue.js @@ -0,0 +1,130 @@ +import { prisma } from '../prisma.js' +import { NOTIFICATION_STATUSES, MAX_RETRY_ATTEMPTS, RETRY_DELAYS_MS } from '../../../shared/constants/notification-events.js' +import { emailChannel } from './channels/email-channel.js' +import { telegramChannel } from './channels/telegram-channel.js' + +const { PENDING, SENT, FAILED } = NOTIFICATION_STATUSES + +const channels = { + email: emailChannel, + telegram: telegramChannel, +} + +class NotificationQueue { + constructor() { + this.tasks = [] + this.processing = 0 + this.maxConcurrent = 5 + this.intervalMs = 2000 + this.running = false + } + + enqueue(task) { + this.tasks.push({ ...task, enqueuedAt: Date.now() }) + } + + start() { + if (this.running) return + this.running = true + this._tick() + } + + stop() { + this.running = false + } + + _tick() { + if (!this.running) return + + this._processAvailable() + + setTimeout(() => this._tick(), this.intervalMs) + } + + _processAvailable() { + while (this.tasks.length > 0 && this.processing < this.maxConcurrent) { + const task = this.tasks.shift() + this.processing++ + this._execute(task).finally(() => { + this.processing-- + }) + } + } + + async _execute(task) { + const channel = channels[task.channel] + if (!channel) { + await this._markFailed(task.logId, `Unknown channel: ${task.channel}`) + return + } + + try { + const result = await channel.send({ + recipient: task.recipient, + eventType: task.eventType, + payload: task.payload, + }) + + if (result.success) { + await this._markSent(task.logId) + } else { + await this._handleFailure(task.logId, task, result.error) + } + } catch (err) { + await this._handleFailure(task.logId, task, err.message) + } + } + + async _markSent(logId) { + await prisma.notificationLog.update({ + where: { id: logId }, + data: { status: SENT }, + }) + } + + async _markFailed(logId, error) { + await prisma.notificationLog.update({ + where: { id: logId }, + data: { status: FAILED, error }, + }) + } + + async _handleFailure(logId, task, error) { + const log = await prisma.notificationLog.findUnique({ where: { id: logId } }) + const newAttempts = (log?.attempts || 0) + 1 + + if (newAttempts >= MAX_RETRY_ATTEMPTS) { + await this._markFailed(logId, error) + return + } + + await prisma.notificationLog.update({ + where: { id: logId }, + data: { attempts: newAttempts }, + }) + + const delay = RETRY_DELAYS_MS[newAttempts - 1] || RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1] + setTimeout(() => { + this.enqueue({ ...task, logId }) + }, delay) + } + + async flushPendingOnStartup() { + const pending = await prisma.notificationLog.findMany({ + where: { status: PENDING }, + }) + for (const log of pending) { + await prisma.notificationLog.update({ + where: { id: log.id }, + data: { status: FAILED, error: 'Server restarted, pending notification lost' }, + }) + } + if (pending.length > 0) { + console.log(`[notifications] Marked ${pending.length} pending notifications as failed on startup`) + } + } +} + +export function createNotificationQueue() { + return new NotificationQueue() +}