From 5127d4a09396e96133f142f60450062ed4546dc3 Mon Sep 17 00:00:00 2001 From: Kirill Date: Fri, 22 May 2026 18:33:49 +0500 Subject: [PATCH] feat: add SSE route with EventBus bridge and tests --- server/src/routes/__tests__/sse.test.js | 7 +- server/src/routes/sse.js | 125 ++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 server/src/routes/sse.js diff --git a/server/src/routes/__tests__/sse.test.js b/server/src/routes/__tests__/sse.test.js index 7c0591d..082f34d 100644 --- a/server/src/routes/__tests__/sse.test.js +++ b/server/src/routes/__tests__/sse.test.js @@ -173,7 +173,7 @@ describe('GET /api/sse/stream (integration)', () => { }) it('returns 200 and event-stream headers for authenticated user', async () => { - const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token' }) + const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token', payloadAsStream: true }) expect(res.statusCode).toBe(200) expect(res.headers['content-type']).toBe('text/event-stream') expect(res.headers['cache-control']).toBe('no-cache') @@ -181,7 +181,8 @@ describe('GET /api/sse/stream (integration)', () => { }) it('sends initial heartbit', async () => { - const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token' }) - expect(res.body).toContain(':heartbit') + const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token', payloadAsStream: true }) + const body = res.stream().read().toString() + expect(body).toContain(':heartbit') }) }) diff --git a/server/src/routes/sse.js b/server/src/routes/sse.js new file mode 100644 index 0000000..e8b32dd --- /dev/null +++ b/server/src/routes/sse.js @@ -0,0 +1,125 @@ +import { NOTIFICATION_EVENTS } from '../../../shared/constants/notification-events.js' + +const { + ORDER_CREATED, + ORDER_STATUS_CHANGED, + ORDER_MESSAGE_SENT, + ORDER_MESSAGE_ADMIN_REPLY, + PAYMENT_STATUS_CHANGED, + DELIVERY_FEE_ADJUSTED, +} = NOTIFICATION_EVENTS + +export function isAdminUser(user) { + return !!(process.env.ADMIN_EMAIL && user?.email === process.env.ADMIN_EMAIL) +} + +export function formatSSE(event, data) { + const lines = [`event: ${event}`] + if (data !== undefined) { + lines.push(`data: ${JSON.stringify(data)}`) + } + return lines.join('\n') + '\n\n' +} + +export function formatHeartbit() { + return ':heartbit\n\n' +} + +export function buildSseListeners(userId, admin, eventBus, write) { + const listeners = [] + + function on(eventName, filterFn, sseEvent, dataFn) { + function handler(payload) { + if (!filterFn(payload)) return + write(formatSSE(sseEvent, dataFn(payload))) + } + listeners.push({ eventName, handler }) + eventBus.on(eventName, handler) + } + + on( + ORDER_MESSAGE_ADMIN_REPLY, + (p) => p.userId === userId, + 'message:new', + (p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }), + ) + + on( + ORDER_MESSAGE_SENT, + () => admin, + 'message:new', + (p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }), + ) + + on( + ORDER_STATUS_CHANGED, + (p) => p.userId === userId, + 'order:statusChanged', + (p) => ({ orderId: p.orderId, newStatus: p.newStatus }), + ) + + on( + PAYMENT_STATUS_CHANGED, + (p) => p.userId === userId, + 'order:statusChanged', + (p) => ({ orderId: p.orderId }), + ) + + on( + DELIVERY_FEE_ADJUSTED, + (p) => p.userId === userId, + 'order:updated', + (p) => ({ orderId: p.orderId }), + ) + + on( + ORDER_CREATED, + () => admin, + 'order:new', + (p) => ({ orderId: p.orderId }), + ) + + on( + 'order:created:admin', + () => admin, + 'order:new', + (p) => ({ orderId: p.orderId }), + ) + + return function cleanup() { + for (const { eventName, handler } of listeners) { + eventBus.off(eventName, handler) + } + } +} + +export async function registerSseRoutes(fastify) { + fastify.get('/api/sse/stream', { preHandler: [fastify.authenticate] }, async (request, reply) => { + reply.hijack() + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }) + + const userId = request.user.sub + const admin = isAdminUser(request.user) + + reply.raw.write(formatHeartbit()) + + const heartbitTimer = setInterval(() => { + reply.raw.write(formatHeartbit()) + }, 30_000) + + const cleanup = buildSseListeners(userId, admin, fastify.eventBus, (chunk) => { + reply.raw.write(chunk) + }) + + request.raw.on('close', () => { + clearInterval(heartbitTimer) + cleanup() + }) + }) +}