Files
shop-server/server/src/routes/sse.js
T

141 lines
3.1 KiB
JavaScript

import { NOTIFICATION_EVENTS } from '../../../shared/constants/notification-events.js'
const {
ORDER_CREATED,
ORDER_STATUS_CHANGED,
ORDER_MESSAGE_SENT,
ORDER_MESSAGE_ADMIN_REPLY,
PAYMENT_STATUS_CHANGED,
DELIVERY_FEE_ADJUSTED,
} = NOTIFICATION_EVENTS
export function isAdminUser(user) {
return !!(process.env.ADMIN_EMAIL && user?.email === process.env.ADMIN_EMAIL)
}
export function formatSSE(event, data) {
const lines = [`event: ${event}`]
if (data !== undefined) {
lines.push(`data: ${JSON.stringify(data)}`)
}
return lines.join('\n') + '\n\n'
}
export function formatHeartbit() {
return ':heartbit\n\n'
}
export function buildSseListeners(userId, admin, eventBus, write) {
const listeners = []
function on(eventName, filterFn, sseEvent, dataFn) {
function handler(payload) {
if (!filterFn(payload)) return
write(formatSSE(sseEvent, dataFn(payload)))
}
listeners.push({ eventName, handler })
eventBus.on(eventName, handler)
}
on(
ORDER_MESSAGE_ADMIN_REPLY,
(p) => p.userId === userId,
'message:new',
(p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }),
)
on(
ORDER_MESSAGE_SENT,
() => admin,
'message:new',
(p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }),
)
on(
ORDER_STATUS_CHANGED,
(p) => p.userId === userId,
'order:statusChanged',
(p) => ({ orderId: p.orderId, newStatus: p.newStatus }),
)
on(
PAYMENT_STATUS_CHANGED,
(p) => p.userId === userId,
'order:statusChanged',
(p) => ({ orderId: p.orderId }),
)
on(
DELIVERY_FEE_ADJUSTED,
(p) => p.userId === userId,
'order:updated',
(p) => ({ orderId: p.orderId }),
)
on(
ORDER_CREATED,
() => admin,
'order:new',
(p) => ({ orderId: p.orderId }),
)
on(
'order:created:admin',
() => admin,
'order:new',
(p) => ({ orderId: p.orderId }),
)
return function cleanup() {
for (const { eventName, handler } of listeners) {
eventBus.off(eventName, handler)
}
}
}
export async function registerSseRoutes(fastify) {
fastify.get('/api/sse/stream', { preHandler: [fastify.authenticate] }, async (request, reply) => {
reply.hijack()
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
})
let closed = false
function safeWrite(chunk) {
if (closed) return
try {
reply.raw.write(chunk)
} catch {
closed = true
cleanUp()
}
}
const userId = request.user.sub
const admin = isAdminUser(request.user)
safeWrite(formatHeartbit())
const heartbitTimer = setInterval(() => {
safeWrite(formatHeartbit())
}, 30_000)
const removeListeners = buildSseListeners(userId, admin, fastify.eventBus, safeWrite)
function cleanUp() {
if (closed) return
closed = true
clearInterval(heartbitTimer)
removeListeners()
}
request.raw.on('close', cleanUp)
request.raw.on('error', cleanUp)
})
}