feat: add SSE route with EventBus bridge and tests
This commit is contained in:
@@ -173,7 +173,7 @@ describe('GET /api/sse/stream (integration)', () => {
|
||||
})
|
||||
|
||||
it('returns 200 and event-stream headers for authenticated user', async () => {
|
||||
const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token' })
|
||||
const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token', payloadAsStream: true })
|
||||
expect(res.statusCode).toBe(200)
|
||||
expect(res.headers['content-type']).toBe('text/event-stream')
|
||||
expect(res.headers['cache-control']).toBe('no-cache')
|
||||
@@ -181,7 +181,8 @@ describe('GET /api/sse/stream (integration)', () => {
|
||||
})
|
||||
|
||||
it('sends initial heartbit', async () => {
|
||||
const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token' })
|
||||
expect(res.body).toContain(':heartbit')
|
||||
const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token', payloadAsStream: true })
|
||||
const body = res.stream().read().toString()
|
||||
expect(body).toContain(':heartbit')
|
||||
})
|
||||
})
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
import { NOTIFICATION_EVENTS } from '../../../shared/constants/notification-events.js'
|
||||
|
||||
const {
|
||||
ORDER_CREATED,
|
||||
ORDER_STATUS_CHANGED,
|
||||
ORDER_MESSAGE_SENT,
|
||||
ORDER_MESSAGE_ADMIN_REPLY,
|
||||
PAYMENT_STATUS_CHANGED,
|
||||
DELIVERY_FEE_ADJUSTED,
|
||||
} = NOTIFICATION_EVENTS
|
||||
|
||||
export function isAdminUser(user) {
|
||||
return !!(process.env.ADMIN_EMAIL && user?.email === process.env.ADMIN_EMAIL)
|
||||
}
|
||||
|
||||
export function formatSSE(event, data) {
|
||||
const lines = [`event: ${event}`]
|
||||
if (data !== undefined) {
|
||||
lines.push(`data: ${JSON.stringify(data)}`)
|
||||
}
|
||||
return lines.join('\n') + '\n\n'
|
||||
}
|
||||
|
||||
export function formatHeartbit() {
|
||||
return ':heartbit\n\n'
|
||||
}
|
||||
|
||||
export function buildSseListeners(userId, admin, eventBus, write) {
|
||||
const listeners = []
|
||||
|
||||
function on(eventName, filterFn, sseEvent, dataFn) {
|
||||
function handler(payload) {
|
||||
if (!filterFn(payload)) return
|
||||
write(formatSSE(sseEvent, dataFn(payload)))
|
||||
}
|
||||
listeners.push({ eventName, handler })
|
||||
eventBus.on(eventName, handler)
|
||||
}
|
||||
|
||||
on(
|
||||
ORDER_MESSAGE_ADMIN_REPLY,
|
||||
(p) => p.userId === userId,
|
||||
'message:new',
|
||||
(p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }),
|
||||
)
|
||||
|
||||
on(
|
||||
ORDER_MESSAGE_SENT,
|
||||
() => admin,
|
||||
'message:new',
|
||||
(p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }),
|
||||
)
|
||||
|
||||
on(
|
||||
ORDER_STATUS_CHANGED,
|
||||
(p) => p.userId === userId,
|
||||
'order:statusChanged',
|
||||
(p) => ({ orderId: p.orderId, newStatus: p.newStatus }),
|
||||
)
|
||||
|
||||
on(
|
||||
PAYMENT_STATUS_CHANGED,
|
||||
(p) => p.userId === userId,
|
||||
'order:statusChanged',
|
||||
(p) => ({ orderId: p.orderId }),
|
||||
)
|
||||
|
||||
on(
|
||||
DELIVERY_FEE_ADJUSTED,
|
||||
(p) => p.userId === userId,
|
||||
'order:updated',
|
||||
(p) => ({ orderId: p.orderId }),
|
||||
)
|
||||
|
||||
on(
|
||||
ORDER_CREATED,
|
||||
() => admin,
|
||||
'order:new',
|
||||
(p) => ({ orderId: p.orderId }),
|
||||
)
|
||||
|
||||
on(
|
||||
'order:created:admin',
|
||||
() => admin,
|
||||
'order:new',
|
||||
(p) => ({ orderId: p.orderId }),
|
||||
)
|
||||
|
||||
return function cleanup() {
|
||||
for (const { eventName, handler } of listeners) {
|
||||
eventBus.off(eventName, handler)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function registerSseRoutes(fastify) {
|
||||
fastify.get('/api/sse/stream', { preHandler: [fastify.authenticate] }, async (request, reply) => {
|
||||
reply.hijack()
|
||||
|
||||
reply.raw.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
})
|
||||
|
||||
const userId = request.user.sub
|
||||
const admin = isAdminUser(request.user)
|
||||
|
||||
reply.raw.write(formatHeartbit())
|
||||
|
||||
const heartbitTimer = setInterval(() => {
|
||||
reply.raw.write(formatHeartbit())
|
||||
}, 30_000)
|
||||
|
||||
const cleanup = buildSseListeners(userId, admin, fastify.eventBus, (chunk) => {
|
||||
reply.raw.write(chunk)
|
||||
})
|
||||
|
||||
request.raw.on('close', () => {
|
||||
clearInterval(heartbitTimer)
|
||||
cleanup()
|
||||
})
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user