agenthub/test/socket.test.ts
Paperclip FoundingEngineer a79df89a78 feat(agenthub): Add room:list and message:history WebSocket handlers
Implements missing WebSocket event handlers for J5 messaging:
- room:list: List all rooms for the authenticated agent
- message:history: Retrieve paginated message history for a room

Also adds:
- Unit tests for both handlers in socket.test.ts
- E2E validation script test/j5-messaging-validation.js

Completes BARAAA-50 deliverables for real-time messaging.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-05-02 00:05:01 +00:00

577 lines
17 KiB
TypeScript

import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { io as ioClient, type Socket as ClientSocket } from 'socket.io-client';
import { buildApp } from '../src/app.js';
import { loadConfig } from '../src/config.js';
import { pool } from '../src/db/pool.js';
import { drizzle } from 'drizzle-orm/node-postgres';
import { agents, apiTokens, rooms, roomMembers } from '../src/db/schema.js';
import { generateApiToken, hashApiToken } from '../src/lib/crypto.js';
import type { FastifyInstance } from 'fastify';
describe('socket.io /agents namespace', () => {
let app: FastifyInstance;
let serverPort: number;
let agent1Id: string;
let agent2Id: string;
let roomId: string;
let jwt1: string;
let jwt2: string;
beforeAll(async () => {
const config = loadConfig({
NODE_ENV: 'test',
LOG_LEVEL: 'fatal',
HOST: '127.0.0.1',
PORT: '3001',
POSTGRES_HOST: process.env.POSTGRES_HOST || 'localhost',
POSTGRES_PORT: process.env.POSTGRES_PORT || '5432',
POSTGRES_USER: process.env.POSTGRES_USER || 'agenthub',
POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'agenthub',
POSTGRES_DB: process.env.POSTGRES_DB || 'agenthub',
JWT_SECRET: 'test-secret-with-exactly-32chars',
});
app = await buildApp({ config });
await app.listen({ host: '127.0.0.1', port: 0 });
const address = app.server.address();
if (!address || typeof address === 'string') {
throw new Error('Failed to get server port');
}
serverPort = address.port;
// Setup test data
const db = drizzle(pool);
// Create agents
const [a1] = await db
.insert(agents)
.values({
name: 'test-agent-1',
displayName: 'Test Agent 1',
role: 'agent',
})
.returning();
if (!a1) throw new Error('Failed to create agent 1');
agent1Id = a1.id;
const [a2] = await db
.insert(agents)
.values({
name: 'test-agent-2',
displayName: 'Test Agent 2',
role: 'agent',
})
.returning();
if (!a2) throw new Error('Failed to create agent 2');
agent2Id = a2.id;
// Create room
const [r] = await db
.insert(rooms)
.values({
slug: 'test-room',
name: 'Test Room',
createdBy: agent1Id,
})
.returning();
if (!r) throw new Error('Failed to create room');
roomId = r.id;
// Add both agents to room
await db.insert(roomMembers).values([
{ roomId, agentId: agent1Id },
{ roomId, agentId: agent2Id },
]);
// Create API tokens
const token1 = generateApiToken();
const hash1 = await hashApiToken(token1.fullToken);
await db.insert(apiTokens).values({
agentId: agent1Id,
hashArgon2id: hash1,
prefix: token1.prefix,
scopes: {},
status: 'active',
});
const token2 = generateApiToken();
const hash2 = await hashApiToken(token2.fullToken);
await db.insert(apiTokens).values({
agentId: agent2Id,
hashArgon2id: hash2,
prefix: token2.prefix,
scopes: {},
status: 'active',
});
// Exchange for JWTs
const res1 = await app.inject({
method: 'POST',
url: '/api/v1/sessions',
payload: { apiToken: token1.fullToken },
});
jwt1 = JSON.parse(res1.body).jwt;
const res2 = await app.inject({
method: 'POST',
url: '/api/v1/sessions',
payload: { apiToken: token2.fullToken },
});
jwt2 = JSON.parse(res2.body).jwt;
});
afterAll(async () => {
await app.close();
});
it('should connect with valid JWT and receive agent:hello-ack', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client.on('agent:hello-ack', (payload) => {
try {
expect(payload.agentId).toBe(agent1Id);
expect(payload.rooms).toContain(roomId);
client.disconnect();
resolve();
} catch (err) {
reject(err);
}
});
client.on('connect_error', (err) => {
reject(err);
});
setTimeout(() => reject(new Error('Timeout waiting for hello-ack')), 5000);
});
});
it('should reject connection with missing JWT', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: {},
});
client.on('connect', () => {
client.disconnect();
reject(new Error('Should not connect without JWT'));
});
client.on('connect_error', (err) => {
expect(err.message).toContain('Missing JWT');
client.disconnect();
resolve();
});
setTimeout(() => reject(new Error('Timeout waiting for error')), 5000);
});
});
it('should reject connection with invalid JWT', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: 'invalid-jwt' },
});
client.on('connect', () => {
client.disconnect();
reject(new Error('Should not connect with invalid JWT'));
});
client.on('connect_error', (err) => {
expect(err.message).toContain('Invalid or expired JWT');
client.disconnect();
resolve();
});
setTimeout(() => reject(new Error('Timeout waiting for error')), 5000);
});
});
it('should emit presence:update when two agents join the same room', async () => {
return new Promise<void>((resolve, reject) => {
let client1: ClientSocket | null = null;
let client2: ClientSocket | null = null;
// Connect client 1
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client1.on('agent:hello-ack', () => {
// Connect client 2
client2 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt2 },
});
});
// Client 1 should receive presence update from client 2
client1.on('presence:update', (payload) => {
try {
expect(payload.agentId).toBe(agent2Id);
expect(payload.status).toBe('online');
client1?.disconnect();
client2?.disconnect();
resolve();
} catch (err) {
reject(err);
}
});
client1.on('connect_error', (err) => reject(err));
setTimeout(() => {
client1?.disconnect();
client2?.disconnect();
reject(new Error('Timeout waiting for presence update'));
}, 5000);
});
});
it('should emit error when trying to join non-member room', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client.on('agent:hello-ack', () => {
// Try to join a non-existent room
client.emit('room:join', {
roomId: '00000000-0000-0000-0000-000000000000',
requestId: 'test-req-1',
});
});
client.on('error', (payload) => {
try {
expect(payload.code).toBe('forbidden');
expect(payload.requestId).toBe('test-req-1');
client.disconnect();
resolve();
} catch (err) {
reject(err);
}
});
setTimeout(() => {
client.disconnect();
reject(new Error('Timeout waiting for error'));
}, 5000);
});
});
it('should send and receive messages in real-time', async () => {
return new Promise<void>((resolve, reject) => {
let client1: ClientSocket | null = null;
let client2: ClientSocket | null = null;
let receivedByClient2 = false;
let receivedEchoByClient1 = false;
const messageBody = 'Hello from agent 1!';
let _messageId: string | null = null;
// Connect client 1
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
// Connect client 2
client2 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt2 },
});
// Client 2 listens for new messages
client2.on('message:new', (payload) => {
try {
expect(payload.authorAgentId).toBe(agent1Id);
expect(payload.roomId).toBe(roomId);
expect(payload.body).toBe(messageBody);
expect(payload.id).toBeTruthy();
_messageId = payload.id;
receivedByClient2 = true;
// Both clients received the message?
if (receivedEchoByClient1 && receivedByClient2) {
client1?.disconnect();
client2?.disconnect();
resolve();
}
} catch (err) {
reject(err);
}
});
// Client 1 also receives the echo
client1.on('message:new', (payload) => {
try {
expect(payload.authorAgentId).toBe(agent1Id);
expect(payload.roomId).toBe(roomId);
expect(payload.body).toBe(messageBody);
receivedEchoByClient1 = true;
if (receivedByClient2 && receivedEchoByClient1) {
client1?.disconnect();
client2?.disconnect();
resolve();
}
} catch (err) {
reject(err);
}
});
// Once both connected, send message
let connectedCount = 0;
const onBothConnected = () => {
connectedCount++;
if (connectedCount === 2) {
// Wait a bit to ensure both are subscribed
setTimeout(() => {
client1?.emit(
'message:send',
{
roomId,
body: messageBody,
},
(ack: any) => {
try {
expect(ack.messageId).toBeTruthy();
expect(ack.error).toBeUndefined();
} catch (err) {
reject(err);
}
},
);
}, 100);
}
};
client1.on('agent:hello-ack', onBothConnected);
client2.on('agent:hello-ack', onBothConnected);
setTimeout(() => {
client1?.disconnect();
client2?.disconnect();
reject(new Error('Timeout waiting for message delivery'));
}, 5000);
});
});
it('should retrieve message history via REST after reconnection', async () => {
return new Promise<void>(async (resolve, reject) => {
try {
let client1: ClientSocket | null = null;
let messageId: string | null = null;
// Connect and send a message
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
await new Promise<void>((res, rej) => {
client1!.on('agent:hello-ack', () => {
client1!.emit(
'message:send',
{
roomId,
body: 'Test message for history',
},
(ack: any) => {
if (ack.error) {
rej(new Error(ack.error));
} else {
messageId = ack.messageId;
res();
}
},
);
});
setTimeout(() => rej(new Error('Timeout sending message')), 2000);
});
// Disconnect
client1.disconnect();
// Wait a bit
await new Promise((res) => setTimeout(res, 200));
// Now fetch history via REST
const res = await app.inject({
method: 'GET',
url: `/rooms/${roomId}/messages`,
headers: {
'x-agent-id': agent1Id,
},
});
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.messages).toBeDefined();
expect(Array.isArray(body.messages)).toBe(true);
expect(body.messages.length).toBeGreaterThan(0);
// Find our message
const ourMessage = body.messages.find((m: any) => m.id === messageId);
expect(ourMessage).toBeDefined();
expect(ourMessage.body).toBe('Test message for history');
expect(ourMessage.authorAgentId).toBe(agent1Id);
resolve();
} catch (err) {
reject(err);
}
});
});
it('should list rooms via WebSocket room:list event', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client.on('agent:hello-ack', () => {
client.emit('room:list', { requestId: 'test-room-list' }, (ack: any) => {
try {
expect(ack.error).toBeUndefined();
expect(ack.rooms).toBeDefined();
expect(Array.isArray(ack.rooms)).toBe(true);
expect(ack.rooms.length).toBeGreaterThan(0);
// Find our test room
const testRoom = ack.rooms.find((r: any) => r.id === roomId);
expect(testRoom).toBeDefined();
expect(testRoom.slug).toBe('test-room');
expect(testRoom.name).toBe('Test Room');
client.disconnect();
resolve();
} catch (err) {
client.disconnect();
reject(err);
}
});
});
client.on('connect_error', (err) => reject(err));
setTimeout(() => {
client.disconnect();
reject(new Error('Timeout waiting for room:list'));
}, 5000);
});
});
it('should retrieve message history via WebSocket message:history event', async () => {
return new Promise<void>(async (resolve, reject) => {
try {
let client1: ClientSocket | null = null;
let messageId: string | null = null;
// Connect and send a message
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
await new Promise<void>((res, rej) => {
client1!.on('agent:hello-ack', () => {
client1!.emit(
'message:send',
{
roomId,
body: 'Test WebSocket history retrieval',
},
(ack: any) => {
if (ack.error) {
rej(new Error(ack.error));
} else {
messageId = ack.messageId;
res();
}
},
);
});
setTimeout(() => rej(new Error('Timeout sending message')), 2000);
});
// Wait a bit for the message to be persisted
await new Promise((res) => setTimeout(res, 100));
// Now fetch history via WebSocket
await new Promise<void>((res, rej) => {
client1!.emit(
'message:history',
{
roomId,
limit: 50,
requestId: 'test-history-req',
},
(ack: any) => {
try {
expect(ack.error).toBeUndefined();
expect(ack.messages).toBeDefined();
expect(Array.isArray(ack.messages)).toBe(true);
expect(ack.messages.length).toBeGreaterThan(0);
expect(ack.hasMore).toBeDefined();
expect(typeof ack.hasMore).toBe('boolean');
// Find our message
const ourMessage = ack.messages.find((m: any) => m.id === messageId);
expect(ourMessage).toBeDefined();
expect(ourMessage.body).toBe('Test WebSocket history retrieval');
expect(ourMessage.authorAgentId).toBe(agent1Id);
expect(ourMessage.roomId).toBe(roomId);
res();
} catch (err) {
rej(err);
}
},
);
setTimeout(() => rej(new Error('Timeout fetching history')), 2000);
});
client1.disconnect();
resolve();
} catch (err) {
reject(err);
}
});
});
it('should return error when requesting history for non-member room', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client.on('agent:hello-ack', () => {
client.emit(
'message:history',
{
roomId: '00000000-0000-0000-0000-000000000000',
requestId: 'test-history-forbidden',
},
(ack: any) => {
try {
expect(ack.error).toBeDefined();
expect(ack.error).toBe('Not a member of this room');
client.disconnect();
resolve();
} catch (err) {
client.disconnect();
reject(err);
}
},
);
});
setTimeout(() => {
client.disconnect();
reject(new Error('Timeout waiting for error'));
}, 5000);
});
});
});