feat(agenthub): Add room:list and message:history WebSocket handlers

Implements missing WebSocket event handlers for J5 messaging:
- room:list: List all rooms for the authenticated agent
- message:history: Retrieve paginated message history for a room

Also adds:
- Unit tests for both handlers in socket.test.ts
- E2E validation script test/j5-messaging-validation.js

Completes BARAAA-50 deliverables for real-time messaging.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Paperclip FoundingEngineer 2026-05-02 00:05:01 +00:00
parent bdd5d92ba7
commit a79df89a78
3 changed files with 616 additions and 1 deletions

View file

@ -3,7 +3,7 @@ import { Server as SocketIOServer } from 'socket.io';
import type { Pool } from 'pg';
import { drizzle } from 'drizzle-orm/node-postgres';
import { roomMembers, messages } from '../db/schema.js';
import { eq, and } from 'drizzle-orm';
import { eq, and, sql } from 'drizzle-orm';
import { verifyJWT } from '../lib/crypto.js';
import { auditLog } from '../lib/audit.js';
import type { AppConfig } from '../config.js';
@ -30,10 +30,18 @@ export interface ServerToClientEvents {
export interface ClientToServerEvents {
'room:join': (payload: { roomId: string; requestId?: string }) => void;
'room:leave': (payload: { roomId: string; requestId?: string }) => void;
'room:list': (
payload: { requestId?: string },
ack: (response: { rooms: Array<{ id: string; slug: string; name: string }> } | { error: string }) => void,
) => void;
'message:send': (
payload: { roomId: string; body: string; mentions?: string[]; replyTo?: string },
ack: (response: { messageId: string } | { error: string }) => void,
) => void;
'message:history': (
payload: { roomId: string; before?: string; limit?: number; requestId?: string },
ack: (response: { messages: Array<{ id: string; roomId: string; authorAgentId: string; body: string; createdAt: string }>; hasMore: boolean; cursor: string | null } | { error: string }) => void,
) => void;
}
export interface SocketData {
@ -87,6 +95,17 @@ export function setupSocketIO(
requestId: z.string().optional(),
});
const RoomListSchema = z.object({
requestId: z.string().optional(),
});
const MessageHistorySchema = z.object({
roomId: z.string().uuid(),
before: z.string().uuid().optional(),
limit: z.number().int().min(1).max(100).optional(),
requestId: z.string().optional(),
});
// Rate limiting: track events per socket (30 events/s)
const socketRateLimits = new Map<
string,
@ -248,6 +267,103 @@ export function setupSocketIO(
socket.to(roomId).emit('presence:update', { agentId, status: 'offline' });
});
// Handle room:list
socket.on('room:list', async (payload, ack) => {
// Rate limit
if (!checkRateLimit(socket.id)) {
ack({ error: 'Rate limit exceeded' });
return;
}
// Validate payload
const parsed = RoomListSchema.safeParse(payload);
if (!parsed.success) {
ack({ error: 'Invalid room:list payload' });
return;
}
// Get agent's rooms
const { rooms } = await import('../db/schema.js');
const result = await db
.select({
id: rooms.id,
slug: rooms.slug,
name: rooms.name,
})
.from(rooms)
.innerJoin(roomMembers, eq(rooms.id, roomMembers.roomId))
.where(eq(roomMembers.agentId, agentId));
ack({
rooms: result.map((r) => ({
id: r.id,
slug: r.slug,
name: r.name,
})),
});
});
// Handle message:history
socket.on('message:history', async (payload, ack) => {
// Rate limit
if (!checkRateLimit(socket.id)) {
ack({ error: 'Rate limit exceeded' });
return;
}
// Validate payload
const parsed = MessageHistorySchema.safeParse(payload);
if (!parsed.success) {
ack({ error: 'Invalid message:history payload' });
return;
}
const { roomId, before, limit } = parsed.data;
const limitNum = Math.min(limit || 50, 100);
// Check if agent is member
const [membership] = await db
.select()
.from(roomMembers)
.where(and(eq(roomMembers.roomId, roomId), eq(roomMembers.agentId, agentId)));
if (!membership) {
ack({ error: 'Not a member of this room' });
return;
}
// Build query
let conditions = [eq(messages.roomId, roomId)];
if (before) {
conditions.push(sql`${messages.id} < ${before}`);
}
const result = await db
.select({
id: messages.id,
roomId: messages.roomId,
authorAgentId: messages.authorAgentId,
body: messages.body,
createdAt: messages.createdAt,
})
.from(messages)
.where(and(...conditions))
.orderBy(sql`${messages.createdAt} DESC`, sql`${messages.id} DESC`)
.limit(limitNum);
ack({
messages: result.map((m) => ({
id: m.id,
roomId: m.roomId,
authorAgentId: m.authorAgentId,
body: m.body,
createdAt: m.createdAt.toISOString(),
})),
hasMore: result.length === limitNum,
cursor: result.length > 0 ? result[result.length - 1]!.id : null,
});
});
// Handle message:send
socket.on('message:send', async (payload, ack) => {
const startTime = performance.now();

View file

@ -0,0 +1,348 @@
#!/usr/bin/env node
/**
* AgentHub J5 Messaging Validation Test
* Tests new WebSocket handlers: room:list and message:history
* Also validates broadcast and persistence
*/
import { io } from 'socket.io-client';
const AGENTHUB_HOST = process.argv[2] || '192.168.9.23:3000';
const BASE_URL = `http://${AGENTHUB_HOST}`;
const WS_URL = `http://${AGENTHUB_HOST}`;
// ANSI colors
const GREEN = '\x1b[32m';
const YELLOW = '\x1b[33m';
const RED = '\x1b[31m';
const CYAN = '\x1b[36m';
const NC = '\x1b[0m';
function step(msg) {
console.log(`${YELLOW}${msg}${NC}`);
}
function success(msg) {
console.log(`${GREEN}${msg}${NC}`);
}
function error(msg) {
console.error(`${RED}${msg}${NC}`);
process.exit(1);
}
function info(msg) {
console.log(`${CYAN} ${msg}${NC}`);
}
// Helper to make HTTP requests
async function request(method, path, body, headers = {}) {
const url = `${BASE_URL}${path}`;
const options = {
method,
headers: {
'Content-Type': 'application/json',
...headers,
},
};
if (body) {
options.body = JSON.stringify(body);
}
const response = await fetch(url, options);
const text = await response.text();
try {
return JSON.parse(text);
} catch {
return text;
}
}
// Main test flow
async function main() {
console.log('🚀 AgentHub J5 — Messaging Validation Test');
console.log('━'.repeat(70));
console.log(`Target: ${BASE_URL}`);
console.log('');
let alexiaId, alanId, roomId, jwtAlexia, jwtAlan;
let clientAlexia, clientAlan;
try {
// Step 1: Health check
step('Step 1/10: Health check');
const health = await request('GET', '/healthz');
if (health.status !== 'ok') {
error(`Health check failed: ${JSON.stringify(health)}`);
}
success('AgentHub is healthy');
console.log('');
// Step 2: Create Alexia
step('Step 2/10: Create agent Alexia');
const alexia = await request('POST', '/api/v1/agents', {
name: `alexia-j5-${Date.now()}`,
displayName: 'Alexia',
role: 'agent',
});
alexiaId = alexia.id;
if (!alexiaId) error(`Failed to create Alexia: ${JSON.stringify(alexia)}`);
success(`Alexia created: ${alexiaId}`);
console.log('');
// Step 3: Create Alan
step('Step 3/10: Create agent Alan');
const alan = await request('POST', '/api/v1/agents', {
name: `alan-j5-${Date.now()}`,
displayName: 'Alan',
role: 'agent',
});
alanId = alan.id;
if (!alanId) error(`Failed to create Alan: ${JSON.stringify(alan)}`);
success(`Alan created: ${alanId}`);
console.log('');
// Step 4: Generate API tokens
step('Step 4/10: Generate API tokens');
const tokenAlexia = await request('POST', '/api/v1/tokens', {
agentId: alexiaId,
});
const tokenAlan = await request('POST', '/api/v1/tokens', {
agentId: alanId,
});
if (!tokenAlexia.token || !tokenAlan.token) {
error('Failed to generate API tokens');
}
success(`Tokens generated`);
console.log('');
// Step 5: Exchange for JWTs
step('Step 5/10: Exchange tokens for JWTs');
const sessionAlexia = await request('POST', '/api/v1/sessions', {
apiToken: tokenAlexia.token,
});
const sessionAlan = await request('POST', '/api/v1/sessions', {
apiToken: tokenAlan.token,
});
jwtAlexia = sessionAlexia.jwt;
jwtAlan = sessionAlan.jwt;
if (!jwtAlexia || !jwtAlan) error('Failed to get JWTs');
success('JWTs obtained');
console.log('');
// Step 6: Create test room
step('Step 6/10: Create test room');
const room = await request(
'POST',
'/api/v1/rooms',
{
slug: `test-j5-${Date.now()}`,
name: 'Test J5 Room',
members: [alexiaId, alanId],
},
{ 'x-agent-id': alexiaId },
);
roomId = room.id;
if (!roomId) error(`Failed to create room: ${JSON.stringify(room)}`);
success(`Room created: ${roomId}`);
console.log('');
// Step 7: Connect WebSockets
step('Step 7/10: Connect WebSockets');
await new Promise((resolve, reject) => {
let connectedCount = 0;
clientAlexia = io(`${WS_URL}/agents`, {
auth: { jwt: jwtAlexia },
});
clientAlan = io(`${WS_URL}/agents`, {
auth: { jwt: jwtAlan },
});
const onHello = () => {
connectedCount++;
if (connectedCount === 2) {
success('Both agents connected to WebSocket');
console.log('');
resolve();
}
};
clientAlexia.on('agent:hello-ack', onHello);
clientAlan.on('agent:hello-ack', onHello);
clientAlexia.on('connect_error', reject);
clientAlan.on('connect_error', reject);
setTimeout(() => reject(new Error('WebSocket connection timeout')), 10000);
});
// Step 8: Test room:list handler
step('Step 8/10: Test room:list handler (NEW)');
await new Promise((resolve, reject) => {
clientAlexia.emit('room:list', {}, (ack) => {
if (ack.error) {
error(`room:list error: ${ack.error}`);
return reject(new Error(ack.error));
}
if (!Array.isArray(ack.rooms)) {
error(`room:list response invalid: ${JSON.stringify(ack)}`);
return reject(new Error('Invalid response'));
}
const ourRoom = ack.rooms.find((r) => r.id === roomId);
if (!ourRoom) {
error(`Room ${roomId} not found in list`);
return reject(new Error('Room not in list'));
}
success(`room:list works — found ${ack.rooms.length} room(s)`);
info(` → Room: ${ourRoom.name} (${ourRoom.slug})`);
console.log('');
resolve();
});
setTimeout(() => reject(new Error('room:list timeout')), 5000);
});
// Step 9: Send messages and test broadcast
step('Step 9/10: Send messages and test broadcast');
const messageIds = [];
// Alexia sends message 1
await new Promise((resolve, reject) => {
const receivedBy = { alexia: false, alan: false };
const checkComplete = () => {
if (receivedBy.alexia && receivedBy.alan) {
success('Message 1 broadcast to both agents');
resolve();
}
};
clientAlexia.on('message:new', (payload) => {
if (payload.body === 'Hello from Alexia') {
receivedBy.alexia = true;
checkComplete();
}
});
clientAlan.on('message:new', (payload) => {
if (payload.body === 'Hello from Alexia') {
receivedBy.alan = true;
checkComplete();
}
});
clientAlexia.emit(
'message:send',
{
roomId,
body: 'Hello from Alexia',
},
(ack) => {
if (ack.error) return reject(new Error(ack.error));
messageIds.push(ack.messageId);
},
);
setTimeout(() => reject(new Error('Message broadcast timeout')), 5000);
});
// Alan sends message 2
await new Promise((resolve, reject) => {
clientAlan.emit(
'message:send',
{
roomId,
body: 'Hello from Alan',
},
(ack) => {
if (ack.error) return reject(new Error(ack.error));
messageIds.push(ack.messageId);
success('Message 2 sent by Alan');
resolve();
},
);
setTimeout(() => reject(new Error('Message send timeout')), 5000);
});
// Wait for persistence
await new Promise((resolve) => setTimeout(resolve, 500));
info(`${messageIds.length} messages sent`);
console.log('');
// Step 10: Test message:history handler
step('Step 10/10: Test message:history handler (NEW)');
await new Promise((resolve, reject) => {
clientAlan.emit(
'message:history',
{
roomId,
limit: 10,
},
(ack) => {
if (ack.error) {
error(`message:history error: ${ack.error}`);
return reject(new Error(ack.error));
}
if (!Array.isArray(ack.messages)) {
error(`message:history response invalid: ${JSON.stringify(ack)}`);
return reject(new Error('Invalid response'));
}
if (ack.messages.length < 2) {
error(`Expected at least 2 messages, got ${ack.messages.length}`);
return reject(new Error('Not enough messages in history'));
}
// Verify both messages are present
const bodies = ack.messages.map((m) => m.body);
const hasAlexiaMsg = bodies.includes('Hello from Alexia');
const hasAlanMsg = bodies.includes('Hello from Alan');
if (!hasAlexiaMsg || !hasAlanMsg) {
error('Missing messages in history');
return reject(new Error('Messages not found in history'));
}
success(`message:history works — retrieved ${ack.messages.length} message(s)`);
info(` → hasMore: ${ack.hasMore}, cursor: ${ack.cursor ? 'present' : 'null'}`);
console.log('');
resolve();
},
);
setTimeout(() => reject(new Error('message:history timeout')), 5000);
});
// Cleanup
clientAlexia.disconnect();
clientAlan.disconnect();
console.log('━'.repeat(70));
success('All J5 tests passed! ✨');
console.log('━'.repeat(70));
console.log('');
console.log('Validated:');
console.log(' ✓ room:list WebSocket event');
console.log(' ✓ message:history WebSocket event');
console.log(' ✓ message:send with persistence');
console.log(' ✓ Broadcast to all room members');
console.log(' ✓ Message persistence in DB');
console.log('');
} catch (err) {
if (clientAlexia) clientAlexia.disconnect();
if (clientAlan) clientAlan.disconnect();
error(`Test failed: ${err.message}`);
}
}
main();

View file

@ -423,4 +423,155 @@ describe('socket.io /agents namespace', () => {
}
});
});
it('should list rooms via WebSocket room:list event', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client.on('agent:hello-ack', () => {
client.emit('room:list', { requestId: 'test-room-list' }, (ack: any) => {
try {
expect(ack.error).toBeUndefined();
expect(ack.rooms).toBeDefined();
expect(Array.isArray(ack.rooms)).toBe(true);
expect(ack.rooms.length).toBeGreaterThan(0);
// Find our test room
const testRoom = ack.rooms.find((r: any) => r.id === roomId);
expect(testRoom).toBeDefined();
expect(testRoom.slug).toBe('test-room');
expect(testRoom.name).toBe('Test Room');
client.disconnect();
resolve();
} catch (err) {
client.disconnect();
reject(err);
}
});
});
client.on('connect_error', (err) => reject(err));
setTimeout(() => {
client.disconnect();
reject(new Error('Timeout waiting for room:list'));
}, 5000);
});
});
it('should retrieve message history via WebSocket message:history event', async () => {
return new Promise<void>(async (resolve, reject) => {
try {
let client1: ClientSocket | null = null;
let messageId: string | null = null;
// Connect and send a message
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
await new Promise<void>((res, rej) => {
client1!.on('agent:hello-ack', () => {
client1!.emit(
'message:send',
{
roomId,
body: 'Test WebSocket history retrieval',
},
(ack: any) => {
if (ack.error) {
rej(new Error(ack.error));
} else {
messageId = ack.messageId;
res();
}
},
);
});
setTimeout(() => rej(new Error('Timeout sending message')), 2000);
});
// Wait a bit for the message to be persisted
await new Promise((res) => setTimeout(res, 100));
// Now fetch history via WebSocket
await new Promise<void>((res, rej) => {
client1!.emit(
'message:history',
{
roomId,
limit: 50,
requestId: 'test-history-req',
},
(ack: any) => {
try {
expect(ack.error).toBeUndefined();
expect(ack.messages).toBeDefined();
expect(Array.isArray(ack.messages)).toBe(true);
expect(ack.messages.length).toBeGreaterThan(0);
expect(ack.hasMore).toBeDefined();
expect(typeof ack.hasMore).toBe('boolean');
// Find our message
const ourMessage = ack.messages.find((m: any) => m.id === messageId);
expect(ourMessage).toBeDefined();
expect(ourMessage.body).toBe('Test WebSocket history retrieval');
expect(ourMessage.authorAgentId).toBe(agent1Id);
expect(ourMessage.roomId).toBe(roomId);
res();
} catch (err) {
rej(err);
}
},
);
setTimeout(() => rej(new Error('Timeout fetching history')), 2000);
});
client1.disconnect();
resolve();
} catch (err) {
reject(err);
}
});
});
it('should return error when requesting history for non-member room', async () => {
return new Promise<void>((resolve, reject) => {
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
auth: { jwt: jwt1 },
});
client.on('agent:hello-ack', () => {
client.emit(
'message:history',
{
roomId: '00000000-0000-0000-0000-000000000000',
requestId: 'test-history-forbidden',
},
(ack: any) => {
try {
expect(ack.error).toBeDefined();
expect(ack.error).toBe('Not a member of this room');
client.disconnect();
resolve();
} catch (err) {
client.disconnect();
reject(err);
}
},
);
});
setTimeout(() => {
client.disconnect();
reject(new Error('Timeout waiting for error'));
}, 5000);
});
});
});