import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import { io as ioClient, type Socket as ClientSocket } from 'socket.io-client'; import { buildApp } from '../src/app.js'; import { loadConfig } from '../src/config.js'; import { pool } from '../src/db/pool.js'; import { drizzle } from 'drizzle-orm/node-postgres'; import { agents, apiTokens, rooms, roomMembers } from '../src/db/schema.js'; import { generateApiToken, hashApiToken } from '../src/lib/crypto.js'; import type { FastifyInstance } from 'fastify'; describe('socket.io /agents namespace', () => { let app: FastifyInstance; let serverPort: number; let agent1Id: string; let agent2Id: string; let roomId: string; let jwt1: string; let jwt2: string; beforeAll(async () => { const config = loadConfig({ NODE_ENV: 'test', LOG_LEVEL: 'fatal', HOST: '127.0.0.1', PORT: '3001', POSTGRES_HOST: process.env.POSTGRES_HOST || 'localhost', POSTGRES_PORT: process.env.POSTGRES_PORT || '5432', POSTGRES_USER: process.env.POSTGRES_USER || 'agenthub', POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'agenthub', POSTGRES_DB: process.env.POSTGRES_DB || 'agenthub', JWT_SECRET: 'test-secret-with-exactly-32chars', }); app = await buildApp({ config }); await app.listen({ host: '127.0.0.1', port: 0 }); const address = app.server.address(); if (!address || typeof address === 'string') { throw new Error('Failed to get server port'); } serverPort = address.port; // Setup test data const db = drizzle(pool); // Create agents const [a1] = await db .insert(agents) .values({ name: 'test-agent-1', displayName: 'Test Agent 1', role: 'agent', }) .returning(); if (!a1) throw new Error('Failed to create agent 1'); agent1Id = a1.id; const [a2] = await db .insert(agents) .values({ name: 'test-agent-2', displayName: 'Test Agent 2', role: 'agent', }) .returning(); if (!a2) throw new Error('Failed to create agent 2'); agent2Id = a2.id; // Create room const [r] = await db .insert(rooms) .values({ slug: 'test-room', name: 'Test Room', createdBy: agent1Id, }) .returning(); if (!r) throw new Error('Failed to create room'); roomId = r.id; // Add both agents to room await db.insert(roomMembers).values([ { roomId, agentId: agent1Id }, { roomId, agentId: agent2Id }, ]); // Create API tokens const token1 = generateApiToken(); const hash1 = await hashApiToken(token1.fullToken); await db.insert(apiTokens).values({ agentId: agent1Id, hashArgon2id: hash1, prefix: token1.prefix, scopes: {}, status: 'active', }); const token2 = generateApiToken(); const hash2 = await hashApiToken(token2.fullToken); await db.insert(apiTokens).values({ agentId: agent2Id, hashArgon2id: hash2, prefix: token2.prefix, scopes: {}, status: 'active', }); // Exchange for JWTs const res1 = await app.inject({ method: 'POST', url: '/api/v1/sessions', payload: { apiToken: token1.fullToken }, }); jwt1 = JSON.parse(res1.body).jwt; const res2 = await app.inject({ method: 'POST', url: '/api/v1/sessions', payload: { apiToken: token2.fullToken }, }); jwt2 = JSON.parse(res2.body).jwt; }); afterAll(async () => { await app.close(); }); it('should connect with valid JWT and receive agent:hello-ack', async () => { return new Promise((resolve, reject) => { const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); client.on('agent:hello-ack', (payload) => { try { expect(payload.agentId).toBe(agent1Id); expect(payload.rooms).toContain(roomId); client.disconnect(); resolve(); } catch (err) { reject(err); } }); client.on('connect_error', (err) => { reject(err); }); setTimeout(() => reject(new Error('Timeout waiting for hello-ack')), 5000); }); }); it('should reject connection with missing JWT', async () => { return new Promise((resolve, reject) => { const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: {}, }); client.on('connect', () => { client.disconnect(); reject(new Error('Should not connect without JWT')); }); client.on('connect_error', (err) => { expect(err.message).toContain('Missing JWT'); client.disconnect(); resolve(); }); setTimeout(() => reject(new Error('Timeout waiting for error')), 5000); }); }); it('should reject connection with invalid JWT', async () => { return new Promise((resolve, reject) => { const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: 'invalid-jwt' }, }); client.on('connect', () => { client.disconnect(); reject(new Error('Should not connect with invalid JWT')); }); client.on('connect_error', (err) => { expect(err.message).toContain('Invalid or expired JWT'); client.disconnect(); resolve(); }); setTimeout(() => reject(new Error('Timeout waiting for error')), 5000); }); }); it('should emit presence:update when two agents join the same room', async () => { return new Promise((resolve, reject) => { let client1: ClientSocket | null = null; let client2: ClientSocket | null = null; // Connect client 1 client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); client1.on('agent:hello-ack', () => { // Connect client 2 client2 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt2 }, }); }); // Client 1 should receive presence update from client 2 client1.on('presence:update', (payload) => { try { expect(payload.agentId).toBe(agent2Id); expect(payload.status).toBe('online'); client1?.disconnect(); client2?.disconnect(); resolve(); } catch (err) { reject(err); } }); client1.on('connect_error', (err) => reject(err)); setTimeout(() => { client1?.disconnect(); client2?.disconnect(); reject(new Error('Timeout waiting for presence update')); }, 5000); }); }); it('should emit error when trying to join non-member room', async () => { return new Promise((resolve, reject) => { const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); client.on('agent:hello-ack', () => { // Try to join a non-existent room client.emit('room:join', { roomId: '00000000-0000-0000-0000-000000000000', requestId: 'test-req-1', }); }); client.on('error', (payload) => { try { expect(payload.code).toBe('forbidden'); expect(payload.requestId).toBe('test-req-1'); client.disconnect(); resolve(); } catch (err) { reject(err); } }); setTimeout(() => { client.disconnect(); reject(new Error('Timeout waiting for error')); }, 5000); }); }); it('should send and receive messages in real-time', async () => { return new Promise((resolve, reject) => { let client1: ClientSocket | null = null; let client2: ClientSocket | null = null; let receivedByClient2 = false; let receivedEchoByClient1 = false; const messageBody = 'Hello from agent 1!'; let _messageId: string | null = null; // Connect client 1 client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); // Connect client 2 client2 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt2 }, }); // Client 2 listens for new messages client2.on('message:new', (payload) => { try { expect(payload.authorAgentId).toBe(agent1Id); expect(payload.roomId).toBe(roomId); expect(payload.body).toBe(messageBody); expect(payload.id).toBeTruthy(); _messageId = payload.id; receivedByClient2 = true; // Both clients received the message? if (receivedEchoByClient1 && receivedByClient2) { client1?.disconnect(); client2?.disconnect(); resolve(); } } catch (err) { reject(err); } }); // Client 1 also receives the echo client1.on('message:new', (payload) => { try { expect(payload.authorAgentId).toBe(agent1Id); expect(payload.roomId).toBe(roomId); expect(payload.body).toBe(messageBody); receivedEchoByClient1 = true; if (receivedByClient2 && receivedEchoByClient1) { client1?.disconnect(); client2?.disconnect(); resolve(); } } catch (err) { reject(err); } }); // Once both connected, send message let connectedCount = 0; const onBothConnected = () => { connectedCount++; if (connectedCount === 2) { // Wait a bit to ensure both are subscribed setTimeout(() => { client1?.emit( 'message:send', { roomId, body: messageBody, }, (ack: any) => { try { expect(ack.messageId).toBeTruthy(); expect(ack.error).toBeUndefined(); } catch (err) { reject(err); } }, ); }, 100); } }; client1.on('agent:hello-ack', onBothConnected); client2.on('agent:hello-ack', onBothConnected); setTimeout(() => { client1?.disconnect(); client2?.disconnect(); reject(new Error('Timeout waiting for message delivery')); }, 5000); }); }); it('should retrieve message history via REST after reconnection', async () => { return new Promise(async (resolve, reject) => { try { let client1: ClientSocket | null = null; let messageId: string | null = null; // Connect and send a message client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); await new Promise((res, rej) => { client1!.on('agent:hello-ack', () => { client1!.emit( 'message:send', { roomId, body: 'Test message for history', }, (ack: any) => { if (ack.error) { rej(new Error(ack.error)); } else { messageId = ack.messageId; res(); } }, ); }); setTimeout(() => rej(new Error('Timeout sending message')), 2000); }); // Disconnect client1.disconnect(); // Wait a bit await new Promise((res) => setTimeout(res, 200)); // Now fetch history via REST const res = await app.inject({ method: 'GET', url: `/rooms/${roomId}/messages`, headers: { 'x-agent-id': agent1Id, }, }); expect(res.statusCode).toBe(200); const body = JSON.parse(res.body); expect(body.messages).toBeDefined(); expect(Array.isArray(body.messages)).toBe(true); expect(body.messages.length).toBeGreaterThan(0); // Find our message const ourMessage = body.messages.find((m: any) => m.id === messageId); expect(ourMessage).toBeDefined(); expect(ourMessage.body).toBe('Test message for history'); expect(ourMessage.authorAgentId).toBe(agent1Id); resolve(); } catch (err) { reject(err); } }); }); it('should list rooms via WebSocket room:list event', async () => { return new Promise((resolve, reject) => { const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); client.on('agent:hello-ack', () => { client.emit('room:list', { requestId: 'test-room-list' }, (ack: any) => { try { expect(ack.error).toBeUndefined(); expect(ack.rooms).toBeDefined(); expect(Array.isArray(ack.rooms)).toBe(true); expect(ack.rooms.length).toBeGreaterThan(0); // Find our test room const testRoom = ack.rooms.find((r: any) => r.id === roomId); expect(testRoom).toBeDefined(); expect(testRoom.slug).toBe('test-room'); expect(testRoom.name).toBe('Test Room'); client.disconnect(); resolve(); } catch (err) { client.disconnect(); reject(err); } }); }); client.on('connect_error', (err) => reject(err)); setTimeout(() => { client.disconnect(); reject(new Error('Timeout waiting for room:list')); }, 5000); }); }); it('should retrieve message history via WebSocket message:history event', async () => { return new Promise(async (resolve, reject) => { try { let client1: ClientSocket | null = null; let messageId: string | null = null; // Connect and send a message client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); await new Promise((res, rej) => { client1!.on('agent:hello-ack', () => { client1!.emit( 'message:send', { roomId, body: 'Test WebSocket history retrieval', }, (ack: any) => { if (ack.error) { rej(new Error(ack.error)); } else { messageId = ack.messageId; res(); } }, ); }); setTimeout(() => rej(new Error('Timeout sending message')), 2000); }); // Wait a bit for the message to be persisted await new Promise((res) => setTimeout(res, 100)); // Now fetch history via WebSocket await new Promise((res, rej) => { client1!.emit( 'message:history', { roomId, limit: 50, requestId: 'test-history-req', }, (ack: any) => { try { expect(ack.error).toBeUndefined(); expect(ack.messages).toBeDefined(); expect(Array.isArray(ack.messages)).toBe(true); expect(ack.messages.length).toBeGreaterThan(0); expect(ack.hasMore).toBeDefined(); expect(typeof ack.hasMore).toBe('boolean'); // Find our message const ourMessage = ack.messages.find((m: any) => m.id === messageId); expect(ourMessage).toBeDefined(); expect(ourMessage.body).toBe('Test WebSocket history retrieval'); expect(ourMessage.authorAgentId).toBe(agent1Id); expect(ourMessage.roomId).toBe(roomId); res(); } catch (err) { rej(err); } }, ); setTimeout(() => rej(new Error('Timeout fetching history')), 2000); }); client1.disconnect(); resolve(); } catch (err) { reject(err); } }); }); it('should return error when requesting history for non-member room', async () => { return new Promise((resolve, reject) => { const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { auth: { jwt: jwt1 }, }); client.on('agent:hello-ack', () => { client.emit( 'message:history', { roomId: '00000000-0000-0000-0000-000000000000', requestId: 'test-history-forbidden', }, (ack: any) => { try { expect(ack.error).toBeDefined(); expect(ack.error).toBe('Not a member of this room'); client.disconnect(); resolve(); } catch (err) { client.disconnect(); reject(err); } }, ); }); setTimeout(() => { client.disconnect(); reject(new Error('Timeout waiting for error')); }, 5000); }); }); });