feat: add notification queue with retry worker

This commit is contained in:
Kirill
2026-05-18 11:33:06 +05:00
parent 4a424b68a2
commit 3f83a9be8e
+130
View File
@@ -0,0 +1,130 @@
import { prisma } from '../prisma.js'
import { NOTIFICATION_STATUSES, MAX_RETRY_ATTEMPTS, RETRY_DELAYS_MS } from '../../../shared/constants/notification-events.js'
import { emailChannel } from './channels/email-channel.js'
import { telegramChannel } from './channels/telegram-channel.js'
const { PENDING, SENT, FAILED } = NOTIFICATION_STATUSES
const channels = {
email: emailChannel,
telegram: telegramChannel,
}
class NotificationQueue {
constructor() {
this.tasks = []
this.processing = 0
this.maxConcurrent = 5
this.intervalMs = 2000
this.running = false
}
enqueue(task) {
this.tasks.push({ ...task, enqueuedAt: Date.now() })
}
start() {
if (this.running) return
this.running = true
this._tick()
}
stop() {
this.running = false
}
_tick() {
if (!this.running) return
this._processAvailable()
setTimeout(() => this._tick(), this.intervalMs)
}
_processAvailable() {
while (this.tasks.length > 0 && this.processing < this.maxConcurrent) {
const task = this.tasks.shift()
this.processing++
this._execute(task).finally(() => {
this.processing--
})
}
}
async _execute(task) {
const channel = channels[task.channel]
if (!channel) {
await this._markFailed(task.logId, `Unknown channel: ${task.channel}`)
return
}
try {
const result = await channel.send({
recipient: task.recipient,
eventType: task.eventType,
payload: task.payload,
})
if (result.success) {
await this._markSent(task.logId)
} else {
await this._handleFailure(task.logId, task, result.error)
}
} catch (err) {
await this._handleFailure(task.logId, task, err.message)
}
}
async _markSent(logId) {
await prisma.notificationLog.update({
where: { id: logId },
data: { status: SENT },
})
}
async _markFailed(logId, error) {
await prisma.notificationLog.update({
where: { id: logId },
data: { status: FAILED, error },
})
}
async _handleFailure(logId, task, error) {
const log = await prisma.notificationLog.findUnique({ where: { id: logId } })
const newAttempts = (log?.attempts || 0) + 1
if (newAttempts >= MAX_RETRY_ATTEMPTS) {
await this._markFailed(logId, error)
return
}
await prisma.notificationLog.update({
where: { id: logId },
data: { attempts: newAttempts },
})
const delay = RETRY_DELAYS_MS[newAttempts - 1] || RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1]
setTimeout(() => {
this.enqueue({ ...task, logId })
}, delay)
}
async flushPendingOnStartup() {
const pending = await prisma.notificationLog.findMany({
where: { status: PENDING },
})
for (const log of pending) {
await prisma.notificationLog.update({
where: { id: log.id },
data: { status: FAILED, error: 'Server restarted, pending notification lost' },
})
}
if (pending.length > 0) {
console.log(`[notifications] Marked ${pending.length} pending notifications as failed on startup`)
}
}
}
export function createNotificationQueue() {
return new NotificationQueue()
}