# SSE Realtime Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace HTTP polling with Server-Sent Events (SSE) for real-time updates of chat messages, unread counters, order status changes, and admin notifications. **Architecture:** New SSE route on server bridges existing EventBus events to SSE streams. Client-side SseProvider manages EventSource lifecycle (connect on login, close on logout) and invalidates React Query caches on incoming events. **Tech Stack:** Fastify (raw SSE via `reply.raw`), EventSource API, Effector (`$token` store), React Query (`invalidateQueries`), vitest. --- ## File Map | File | Responsibility | |---|---| | `server/src/routes/sse.js` | SSE endpoint, EventBus listener bridge | | `server/src/routes/__tests__/sse.test.js` | Server tests | | `server/src/index.js` | Import and register SSE routes | | `client/src/shared/lib/sse.ts` | EventSource factory | | `client/src/app/providers/SseProvider.tsx` | SSE→ReactQuery invalidation bridge | | `client/src/app/providers/__tests__/SseProvider.test.tsx` | Client tests | | `client/src/app/providers/AppProviders.tsx` | Mount SseProvider | | `client/src/pages/me/ui/MeLayoutPage.tsx` | Remove refetchInterval | | `client/src/pages/admin-layout/ui/AdminLayoutPage.tsx` | Remove refetchInterval | --- ### Task 1: Server — write SSE route tests (TDD red) **Files:** - Create: `server/src/routes/__tests__/sse.test.js` - [ ] **Step 1: Write the test file** ```js import Fastify from 'fastify' import { EventEmitter } from 'node:events' import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { buildSseListeners, formatHeartbit, formatSSE, isAdminUser, registerSseRoutes } from '../sse.js' describe('formatSSE', () => { it('formats event with data', () => { const result = formatSSE('message:new', { orderId: 'o1' }) expect(result).toBe('event: message:new\ndata: {"orderId":"o1"}\n\n') }) it('formats event without data', () => { const result = formatSSE('heartbit') expect(result).toBe('event: heartbit\n\n') }) }) describe('formatHeartbit', () => { it('returns SSE comment', () => { expect(formatHeartbit()).toBe(':heartbit\n\n') }) }) describe('isAdminUser', () => { it('returns false for non-matching email', () => { expect(isAdminUser({ email: 'user@test.com' })).toBe(false) }) it('returns true when email matches ADMIN_EMAIL', () => { const adminEmail = process.env.ADMIN_EMAIL if (adminEmail) { expect(isAdminUser({ email: adminEmail })).toBe(true) } }) it('returns false for null/undefined user', () => { expect(isAdminUser(null)).toBe(false) expect(isAdminUser(undefined)).toBe(false) }) }) describe('buildSseListeners', () => { let eventBus let write beforeEach(() => { eventBus = new EventEmitter() eventBus.setMaxListeners(50) write = vi.fn() }) it('forwards orderMessage:adminReply to matching userId', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) eventBus.emit('orderMessage:adminReply', { orderId: 'o1', userId: 'user-1', messageId: 'm1', preview: 'Hi' }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: message:new') expect(write.mock.calls[0][0]).toContain('"orderId":"o1"') cleanup() }) it('ignores orderMessage:adminReply for non-matching userId', () => { const cleanup = buildSseListeners('user-2', false, eventBus, write) eventBus.emit('orderMessage:adminReply', { orderId: 'o1', userId: 'user-1', messageId: 'm1', preview: 'Hi' }) expect(write).not.toHaveBeenCalled() cleanup() }) it('forwards orderMessage:sent to admin', () => { const cleanup = buildSseListeners('admin-1', true, eventBus, write) eventBus.emit('orderMessage:sent', { orderId: 'o1', authorType: 'user', messageId: 'm1', preview: 'Hello' }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: message:new') cleanup() }) it('ignores orderMessage:sent for non-admin', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) eventBus.emit('orderMessage:sent', { orderId: 'o1', authorType: 'user', messageId: 'm1', preview: 'Hello' }) expect(write).not.toHaveBeenCalled() cleanup() }) it('forwards order:statusChanged to matching userId', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) eventBus.emit('order:statusChanged', { orderId: 'o1', userId: 'user-1', oldStatus: 'PENDING_PAYMENT', newStatus: 'PAID' }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: order:statusChanged') expect(write.mock.calls[0][0]).toContain('"newStatus":"PAID"') cleanup() }) it('forwards payment:statusChanged to matching userId', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) eventBus.emit('payment:statusChanged', { orderId: 'o1', userId: 'user-1', paymentStatus: 'paid' }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: order:statusChanged') cleanup() }) it('forwards order:deliveryFeeAdjusted to matching userId', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) eventBus.emit('order:deliveryFeeAdjusted', { orderId: 'o1', userId: 'user-1', totalCents: 50000 }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: order:updated') cleanup() }) it('forwards order:created to admin', () => { const cleanup = buildSseListeners('admin-1', true, eventBus, write) eventBus.emit('order:created', { orderId: 'o1', userId: 'user-1', totalCents: 50000 }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: order:new') cleanup() }) it('forwards order:created:admin to admin', () => { const cleanup = buildSseListeners('admin-1', true, eventBus, write) eventBus.emit('order:created:admin', { orderId: 'o1', userId: 'user-1', userEmail: 'user@test.com' }) expect(write).toHaveBeenCalledTimes(1) expect(write.mock.calls[0][0]).toContain('event: order:new') cleanup() }) it('ignores order:created for non-admin', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) eventBus.emit('order:created', { orderId: 'o1', userId: 'user-1', totalCents: 50000 }) expect(write).not.toHaveBeenCalled() cleanup() }) it('cleanup removes all listeners', () => { const cleanup = buildSseListeners('user-1', false, eventBus, write) cleanup() eventBus.emit('orderMessage:adminReply', { orderId: 'o1', userId: 'user-1', messageId: 'm1', preview: 'Hi' }) expect(write).not.toHaveBeenCalled() }) }) describe('GET /api/sse/stream (integration)', () => { let app beforeAll(async () => { app = Fastify({ logger: false }) app.decorate('authenticate', async function (request, reply) { try { const token = request.query?.token if (!token) throw new Error('no token') if (token === 'user-token') { request.user = { sub: 'user-1', email: 'user@test.com' } } else if (token === 'admin-token') { request.user = { sub: 'admin-1', email: process.env.ADMIN_EMAIL || 'admin@test.com' } } else { throw new Error('bad token') } } catch { return reply.code(401).send({ error: 'Unauthorized' }) } }) app.decorate('eventBus', new EventEmitter()) await registerSseRoutes(app) await app.ready() }) afterAll(async () => { await app.close() }) it('returns 401 without token', async () => { const res = await app.inject({ method: 'GET', url: '/api/sse/stream' }) expect(res.statusCode).toBe(401) }) it('returns 200 and event-stream headers for authenticated user', async () => { const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token' }) expect(res.statusCode).toBe(200) expect(res.headers['content-type']).toBe('text/event-stream') expect(res.headers['cache-control']).toBe('no-cache') expect(res.headers['connection']).toBe('keep-alive') }) it('sends initial heartbit', async () => { const res = await app.inject({ method: 'GET', url: '/api/sse/stream?token=user-token' }) expect(res.body).toContain(':heartbit') }) }) ``` - [ ] **Step 2: Run tests — expect FAIL (sse.js does not exist)** Run: `cd server && npx vitest run src/routes/__tests__/sse.test.js` Expected: FAIL — module `../sse.js` not found --- ### Task 2: Server — implement SSE route (TDD green) **Files:** - Create: `server/src/routes/sse.js` - [ ] **Step 1: Write sse.js with exported helpers** ```js import { NOTIFICATION_EVENTS } from '../../../shared/constants/notification-events.js' const { ORDER_CREATED, ORDER_STATUS_CHANGED, ORDER_MESSAGE_SENT, ORDER_MESSAGE_ADMIN_REPLY, PAYMENT_STATUS_CHANGED, DELIVERY_FEE_ADJUSTED, } = NOTIFICATION_EVENTS export function isAdminUser(user) { return user?.email === process.env.ADMIN_EMAIL } export function formatSSE(event, data) { const lines = [`event: ${event}`] if (data !== undefined) { lines.push(`data: ${JSON.stringify(data)}`) } return lines.join('\n') + '\n\n' } export function formatHeartbit() { return ':heartbit\n\n' } export function buildSseListeners(userId, admin, eventBus, write) { const listeners = [] function on(eventName, filterFn, sseEvent, dataFn) { function handler(payload) { if (!filterFn(payload)) return write(formatSSE(sseEvent, dataFn(payload))) } listeners.push({ eventName, handler }) eventBus.on(eventName, handler) } on( ORDER_MESSAGE_ADMIN_REPLY, (p) => p.userId === userId, 'message:new', (p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }), ) on( ORDER_MESSAGE_SENT, () => admin, 'message:new', (p) => ({ orderId: p.orderId, messageId: p.messageId, preview: p.preview }), ) on( ORDER_STATUS_CHANGED, (p) => p.userId === userId, 'order:statusChanged', (p) => ({ orderId: p.orderId, newStatus: p.newStatus }), ) on( PAYMENT_STATUS_CHANGED, (p) => p.userId === userId, 'order:statusChanged', (p) => ({ orderId: p.orderId }), ) on( DELIVERY_FEE_ADJUSTED, (p) => p.userId === userId, 'order:updated', (p) => ({ orderId: p.orderId }), ) on( ORDER_CREATED, () => admin, 'order:new', (p) => ({ orderId: p.orderId }), ) on( 'order:created:admin', () => admin, 'order:new', (p) => ({ orderId: p.orderId }), ) return function cleanup() { for (const { eventName, handler } of listeners) { eventBus.off(eventName, handler) } } } export async function registerSseRoutes(fastify) { fastify.get('/api/sse/stream', { preHandler: [fastify.authenticate] }, async (request, reply) => { reply.raw.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }) const userId = request.user.sub const admin = isAdminUser(request.user) reply.raw.write(formatHeartbit()) const heartbitTimer = setInterval(() => { reply.raw.write(formatHeartbit()) }, 30_000) const cleanup = buildSseListeners(userId, admin, fastify.eventBus, (chunk) => { reply.raw.write(chunk) }) request.raw.on('close', () => { clearInterval(heartbitTimer) cleanup() }) }) } ``` - [ ] **Step 2: Run tests — expect PASS** Run: `cd server && npx vitest run src/routes/__tests__/sse.test.js` Expected: all PASS - [ ] **Step 3: Commit both files** ```bash git add server/src/routes/sse.js server/src/routes/__tests__/sse.test.js git commit -m "feat: add SSE route with EventBus bridge and tests" ``` --- ### Task 3: Server — register SSE routes in index.js **Files:** - Modify: `server/src/index.js` - [ ] **Step 1: Add import** After line 27 (`import { registerUserMessageRoutes } from './routes/user-messages.js'`), add: ```js import { registerSseRoutes } from './routes/sse.js' ``` - [ ] **Step 2: Add registration** After line 94 (`await registerUserMessageRoutes(fastify)`), add: ```js await registerSseRoutes(fastify) ``` - [ ] **Step 3: Verify server starts** Run: `cd server && timeout 5 npm run dev 2>&1 || true` Expected: no crash, server starts listening - [ ] **Step 4: Commit** ```bash git add server/src/index.js git commit -m "feat: register SSE routes in server" ``` --- ### Task 4: Client — EventSource factory **Files:** - Create: `client/src/shared/lib/sse.ts` - [ ] **Step 1: Write the factory** ```ts export function createEventStream(token: string): EventSource { return new EventSource(`/api/sse/stream?token=${encodeURIComponent(token)}`) } ``` - [ ] **Step 2: Commit** ```bash git add client/src/shared/lib/sse.ts git commit -m "feat: add EventSource factory for SSE" ``` --- ### Task 5: Client — write SseProvider tests (TDD red) **Files:** - Create: `client/src/app/providers/__tests__/SseProvider.test.tsx` - [ ] **Step 1: Write the test file** ```tsx import { QueryClient, QueryClientProvider } from '@tanstack/react-query' import { render } from '@testing-library/react' import { afterEach, describe, expect, it, vi } from 'vitest' import { SseProvider } from '../SseProvider' const mockInvalidateQueries = vi.fn() vi.mock('@tanstack/react-query', async () => { const actual = await vi.importActual('@tanstack/react-query') return { ...actual, useQueryClient: () => ({ invalidateQueries: mockInvalidateQueries }) } }) vi.mock('@/shared/model/auth', () => ({ $token: { defaultState: null, subscribe: () => () => {}, getState: () => null, watch: () => () => {}, on: () => {}, reset: () => {} }, })) let mockToken: string | null = null let mockEventHandlers: Record void> = {} let mockCloseCalls = 0 class MockEventSource { url: string constructor(url: string) { this.url = url mockCloseCalls = 0 mockEventHandlers = {} } addEventListener(type: string, handler: (event: MessageEvent) => void) { mockEventHandlers[type] = handler } removeEventListener(type: string, _handler: (event: MessageEvent) => void) { delete mockEventHandlers[type] } close() { mockCloseCalls++ } } vi.mock('@/shared/lib/sse', () => ({ createEventStream: (token: string) => { mockToken = token return new MockEventSource(`/api/sse/stream?token=${token}`) as unknown as EventSource }, })) vi.mock('effector-react', async () => { const actual = await vi.importActual('effector-react') return { ...actual, useUnit: () => mockToken } }) function renderSse() { const qc = new QueryClient({ defaultOptions: { queries: { retry: false } } }) return render() } describe('SseProvider', () => { afterEach(() => { mockToken = null mockInvalidateQueries.mockReset() mockCloseCalls = 0 mockEventHandlers = {} }) it('renders nothing (returns null)', () => { mockToken = null const { container } = renderSse() expect(container.innerHTML).toBe('') }) it('does not create EventSource when token is null', () => { mockToken = null renderSse() expect(mockToken).toBeNull() }) it('creates EventSource when token is set', () => { mockToken = 'test-jwt' renderSse() expect(mockToken).toBe('test-jwt') }) it('closes EventSource on unmount', () => { mockToken = 'test-jwt' const { unmount } = renderSse() expect(mockCloseCalls).toBe(0) unmount() expect(mockCloseCalls).toBe(1) }) it('invalidates unread-count and conversations on message:new', () => { mockToken = 'test-jwt' renderSse() const handler = mockEventHandlers['message:new'] expect(handler).toBeDefined() handler(new MessageEvent('message:new', { data: JSON.stringify({ orderId: 'o1' }) })) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['me', 'messages', 'unread-count'] }) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['me', 'conversations'] }) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['me', 'orders', 'o1'] }) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['admin', 'orders', 'o1'] }) }) it('invalidates order queries on order:statusChanged', () => { mockToken = 'test-jwt' renderSse() const handler = mockEventHandlers['order:statusChanged'] handler(new MessageEvent('order:statusChanged', { data: JSON.stringify({ orderId: 'o2' }) })) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['me', 'orders', 'o2'] }) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['admin', 'orders', 'o2'] }) }) it('invalidates order queries on order:updated', () => { mockToken = 'test-jwt' renderSse() const handler = mockEventHandlers['order:updated'] handler(new MessageEvent('order:updated', { data: JSON.stringify({ orderId: 'o3' }) })) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['me', 'orders', 'o3'] }) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['admin', 'orders', 'o3'] }) }) it('invalidates admin queries on order:new', () => { mockToken = 'test-jwt' renderSse() const handler = mockEventHandlers['order:new'] handler(new MessageEvent('order:new', { data: JSON.stringify({ orderId: 'o4' }) })) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['admin', 'orders', 'summary'] }) expect(mockInvalidateQueries).toHaveBeenCalledWith({ queryKey: ['admin', 'orders'] }) }) it('handles invalid JSON gracefully', () => { mockToken = 'test-jwt' renderSse() const handler = mockEventHandlers['message:new'] expect(() => { handler(new MessageEvent('message:new', { data: ':heartbit' })) }).not.toThrow() expect(mockInvalidateQueries).not.toHaveBeenCalled() }) }) ``` - [ ] **Step 2: Run tests — expect FAIL (SseProvider does not exist)** Run: `cd client && npx vitest run src/app/providers/__tests__/SseProvider.test.tsx` Expected: FAIL — module `../SseProvider` not found --- ### Task 6: Client — implement SseProvider (TDD green) **Files:** - Create: `client/src/app/providers/SseProvider.tsx` - [ ] **Step 1: Write the SseProvider component** ```tsx import { useEffect, useRef } from 'react' import { useQueryClient } from '@tanstack/react-query' import { useUnit } from 'effector-react' import { createEventStream } from '@/shared/lib/sse' import { $token } from '@/shared/model/auth' export function SseProvider() { const token = useUnit($token) const queryClient = useQueryClient() const sourceRef = useRef(null) useEffect(() => { if (!token) { if (sourceRef.current) { sourceRef.current.close() sourceRef.current = null } return } const es = createEventStream(token) sourceRef.current = es function handleEvent(eventName: string) { return function (event: MessageEvent) { try { const data = JSON.parse(event.data) const orderId = data.orderId switch (eventName) { case 'message:new': queryClient.invalidateQueries({ queryKey: ['me', 'messages', 'unread-count'] }) queryClient.invalidateQueries({ queryKey: ['me', 'conversations'] }) if (orderId) { queryClient.invalidateQueries({ queryKey: ['me', 'orders', orderId] }) queryClient.invalidateQueries({ queryKey: ['admin', 'orders', orderId] }) } break case 'order:statusChanged': if (orderId) { queryClient.invalidateQueries({ queryKey: ['me', 'orders', orderId] }) queryClient.invalidateQueries({ queryKey: ['admin', 'orders', orderId] }) } break case 'order:updated': if (orderId) { queryClient.invalidateQueries({ queryKey: ['me', 'orders', orderId] }) queryClient.invalidateQueries({ queryKey: ['admin', 'orders', orderId] }) } break case 'order:new': queryClient.invalidateQueries({ queryKey: ['admin', 'orders', 'summary'] }) queryClient.invalidateQueries({ queryKey: ['admin', 'orders'] }) break } } catch { // ignore parse errors (e.g. heartbit comments) } } } const messageNewHandler = handleEvent('message:new') const orderStatusHandler = handleEvent('order:statusChanged') const orderUpdatedHandler = handleEvent('order:updated') const orderNewHandler = handleEvent('order:new') es.addEventListener('message:new', messageNewHandler) es.addEventListener('order:statusChanged', orderStatusHandler) es.addEventListener('order:updated', orderUpdatedHandler) es.addEventListener('order:new', orderNewHandler) return () => { es.removeEventListener('message:new', messageNewHandler) es.removeEventListener('order:statusChanged', orderStatusHandler) es.removeEventListener('order:updated', orderUpdatedHandler) es.removeEventListener('order:new', orderNewHandler) es.close() sourceRef.current = null } }, [token, queryClient]) return null } ``` - [ ] **Step 2: Run tests — expect PASS** Run: `cd client && npx vitest run src/app/providers/__tests__/SseProvider.test.tsx` Expected: all PASS - [ ] **Step 3: Commit both files** ```bash git add client/src/shared/lib/sse.ts client/src/app/providers/SseProvider.tsx client/src/app/providers/__tests__/SseProvider.test.tsx git commit -m "feat: add SseProvider — SSE to ReactQuery bridge with tests" ``` --- ### Task 7: Client — mount SseProvider in AppProviders **Files:** - Modify: `client/src/app/providers/AppProviders.tsx` - [ ] **Step 1: Add import and mount** ```tsx import { SseProvider } from './SseProvider' ``` Inside the return, add `` as first child of `QueryClientProvider`: ```tsx {children} ``` - [ ] **Step 2: Verify build** Run: `cd client && npx tsc --noEmit` Expected: no errors - [ ] **Step 3: Commit** ```bash git add client/src/app/providers/AppProviders.tsx git commit -m "feat: mount SseProvider in AppProviders" ``` --- ### Task 8: Client — remove polling from MeLayoutPage **Files:** - Modify: `client/src/pages/me/ui/MeLayoutPage.tsx` - [ ] **Step 1: Remove refetchInterval and refetchOnWindowFocus** Find the unread query. Remove `refetchInterval: 45_000` and `refetchOnWindowFocus: true`: ```ts const unreadQuery = useQuery({ queryKey: ['me', 'messages', 'unread-count'], queryFn: fetchUnreadMessageCount, enabled: Boolean(user), }) ``` - [ ] **Step 2: Verify TypeScript** Run: `cd client && npx tsc --noEmit` Expected: no errors - [ ] **Step 3: Commit** ```bash git add client/src/pages/me/ui/MeLayoutPage.tsx git commit -m "feat: remove polling from MeLayoutPage — replaced by SSE" ``` --- ### Task 9: Client — remove polling from AdminLayoutPage **Files:** - Modify: `client/src/pages/admin-layout/ui/AdminLayoutPage.tsx` - [ ] **Step 1: Remove refetchInterval and refetchOnWindowFocus** Find the orders summary query. Remove `refetchInterval: 45_000` and `refetchOnWindowFocus: true`: ```ts const ordersSummaryQuery = useQuery({ queryKey: ['admin', 'orders', 'summary'], queryFn: fetchAdminOrdersSummary, enabled: isAdmin, }) ``` - [ ] **Step 2: Verify TypeScript** Run: `cd client && npx tsc --noEmit` Expected: no errors - [ ] **Step 3: Commit** ```bash git add client/src/pages/admin-layout/ui/AdminLayoutPage.tsx git commit -m "feat: remove admin polling from AdminLayoutPage — replaced by SSE" ``` --- ### Task 10: Final verification **Files:** none (verification only) - [ ] **Step 1: Run server tests** Run: `cd server && npm test -- --run` Expected: all tests pass - [ ] **Step 2: Run client tests** Run: `cd client && npm test -- --run` Expected: all tests pass - [ ] **Step 3: Run client lint** Run: `cd client && npm run lint` Expected: no errors - [ ] **Step 4: Run server lint** Run: `cd server && npm run lint` Expected: no errors - [ ] **Step 5: Run client format check** Run: `cd client && npm run format:check` Expected: no formatting issues - [ ] **Step 6: Run client build** Run: `cd client && npm run build` Expected: successful build