From a79df89a785b352f344b4b8a44e5e1095d069ad8 Mon Sep 17 00:00:00 2001 From: Paperclip FoundingEngineer Date: Sat, 2 May 2026 00:05:01 +0000 Subject: [PATCH] feat(agenthub): Add room:list and message:history WebSocket handlers Implements missing WebSocket event handlers for J5 messaging: - room:list: List all rooms for the authenticated agent - message:history: Retrieve paginated message history for a room Also adds: - Unit tests for both handlers in socket.test.ts - E2E validation script test/j5-messaging-validation.js Completes BARAAA-50 deliverables for real-time messaging. Co-Authored-By: Paperclip --- src/socket/index.ts | 118 ++++++++++- test/j5-messaging-validation.js | 348 ++++++++++++++++++++++++++++++++ test/socket.test.ts | 151 ++++++++++++++ 3 files changed, 616 insertions(+), 1 deletion(-) create mode 100644 test/j5-messaging-validation.js diff --git a/src/socket/index.ts b/src/socket/index.ts index 0483126..aaeba00 100644 --- a/src/socket/index.ts +++ b/src/socket/index.ts @@ -3,7 +3,7 @@ import { Server as SocketIOServer } from 'socket.io'; import type { Pool } from 'pg'; import { drizzle } from 'drizzle-orm/node-postgres'; import { roomMembers, messages } from '../db/schema.js'; -import { eq, and } from 'drizzle-orm'; +import { eq, and, sql } from 'drizzle-orm'; import { verifyJWT } from '../lib/crypto.js'; import { auditLog } from '../lib/audit.js'; import type { AppConfig } from '../config.js'; @@ -30,10 +30,18 @@ export interface ServerToClientEvents { export interface ClientToServerEvents { 'room:join': (payload: { roomId: string; requestId?: string }) => void; 'room:leave': (payload: { roomId: string; requestId?: string }) => void; + 'room:list': ( + payload: { requestId?: string }, + ack: (response: { rooms: Array<{ id: string; slug: string; name: string }> } | { error: string }) => void, + ) => void; 'message:send': ( payload: { roomId: string; body: string; mentions?: string[]; replyTo?: string }, ack: (response: { messageId: string } | { error: string }) => void, ) => void; + 'message:history': ( + payload: { roomId: string; before?: string; limit?: number; requestId?: string }, + ack: (response: { messages: Array<{ id: string; roomId: string; authorAgentId: string; body: string; createdAt: string }>; hasMore: boolean; cursor: string | null } | { error: string }) => void, + ) => void; } export interface SocketData { @@ -87,6 +95,17 @@ export function setupSocketIO( requestId: z.string().optional(), }); + const RoomListSchema = z.object({ + requestId: z.string().optional(), + }); + + const MessageHistorySchema = z.object({ + roomId: z.string().uuid(), + before: z.string().uuid().optional(), + limit: z.number().int().min(1).max(100).optional(), + requestId: z.string().optional(), + }); + // Rate limiting: track events per socket (30 events/s) const socketRateLimits = new Map< string, @@ -248,6 +267,103 @@ export function setupSocketIO( socket.to(roomId).emit('presence:update', { agentId, status: 'offline' }); }); + // Handle room:list + socket.on('room:list', async (payload, ack) => { + // Rate limit + if (!checkRateLimit(socket.id)) { + ack({ error: 'Rate limit exceeded' }); + return; + } + + // Validate payload + const parsed = RoomListSchema.safeParse(payload); + if (!parsed.success) { + ack({ error: 'Invalid room:list payload' }); + return; + } + + // Get agent's rooms + const { rooms } = await import('../db/schema.js'); + const result = await db + .select({ + id: rooms.id, + slug: rooms.slug, + name: rooms.name, + }) + .from(rooms) + .innerJoin(roomMembers, eq(rooms.id, roomMembers.roomId)) + .where(eq(roomMembers.agentId, agentId)); + + ack({ + rooms: result.map((r) => ({ + id: r.id, + slug: r.slug, + name: r.name, + })), + }); + }); + + // Handle message:history + socket.on('message:history', async (payload, ack) => { + // Rate limit + if (!checkRateLimit(socket.id)) { + ack({ error: 'Rate limit exceeded' }); + return; + } + + // Validate payload + const parsed = MessageHistorySchema.safeParse(payload); + if (!parsed.success) { + ack({ error: 'Invalid message:history payload' }); + return; + } + + const { roomId, before, limit } = parsed.data; + const limitNum = Math.min(limit || 50, 100); + + // Check if agent is member + const [membership] = await db + .select() + .from(roomMembers) + .where(and(eq(roomMembers.roomId, roomId), eq(roomMembers.agentId, agentId))); + + if (!membership) { + ack({ error: 'Not a member of this room' }); + return; + } + + // Build query + let conditions = [eq(messages.roomId, roomId)]; + if (before) { + conditions.push(sql`${messages.id} < ${before}`); + } + + const result = await db + .select({ + id: messages.id, + roomId: messages.roomId, + authorAgentId: messages.authorAgentId, + body: messages.body, + createdAt: messages.createdAt, + }) + .from(messages) + .where(and(...conditions)) + .orderBy(sql`${messages.createdAt} DESC`, sql`${messages.id} DESC`) + .limit(limitNum); + + ack({ + messages: result.map((m) => ({ + id: m.id, + roomId: m.roomId, + authorAgentId: m.authorAgentId, + body: m.body, + createdAt: m.createdAt.toISOString(), + })), + hasMore: result.length === limitNum, + cursor: result.length > 0 ? result[result.length - 1]!.id : null, + }); + }); + // Handle message:send socket.on('message:send', async (payload, ack) => { const startTime = performance.now(); diff --git a/test/j5-messaging-validation.js b/test/j5-messaging-validation.js new file mode 100644 index 0000000..d7125ae --- /dev/null +++ b/test/j5-messaging-validation.js @@ -0,0 +1,348 @@ +#!/usr/bin/env node + +/** + * AgentHub J5 — Messaging Validation Test + * Tests new WebSocket handlers: room:list and message:history + * Also validates broadcast and persistence + */ + +import { io } from 'socket.io-client'; + +const AGENTHUB_HOST = process.argv[2] || '192.168.9.23:3000'; +const BASE_URL = `http://${AGENTHUB_HOST}`; +const WS_URL = `http://${AGENTHUB_HOST}`; + +// ANSI colors +const GREEN = '\x1b[32m'; +const YELLOW = '\x1b[33m'; +const RED = '\x1b[31m'; +const CYAN = '\x1b[36m'; +const NC = '\x1b[0m'; + +function step(msg) { + console.log(`${YELLOW}▶ ${msg}${NC}`); +} + +function success(msg) { + console.log(`${GREEN}✓ ${msg}${NC}`); +} + +function error(msg) { + console.error(`${RED}✗ ${msg}${NC}`); + process.exit(1); +} + +function info(msg) { + console.log(`${CYAN}ℹ ${msg}${NC}`); +} + +// Helper to make HTTP requests +async function request(method, path, body, headers = {}) { + const url = `${BASE_URL}${path}`; + const options = { + method, + headers: { + 'Content-Type': 'application/json', + ...headers, + }, + }; + + if (body) { + options.body = JSON.stringify(body); + } + + const response = await fetch(url, options); + const text = await response.text(); + + try { + return JSON.parse(text); + } catch { + return text; + } +} + +// Main test flow +async function main() { + console.log('🚀 AgentHub J5 — Messaging Validation Test'); + console.log('━'.repeat(70)); + console.log(`Target: ${BASE_URL}`); + console.log(''); + + let alexiaId, alanId, roomId, jwtAlexia, jwtAlan; + let clientAlexia, clientAlan; + + try { + // Step 1: Health check + step('Step 1/10: Health check'); + const health = await request('GET', '/healthz'); + if (health.status !== 'ok') { + error(`Health check failed: ${JSON.stringify(health)}`); + } + success('AgentHub is healthy'); + console.log(''); + + // Step 2: Create Alexia + step('Step 2/10: Create agent Alexia'); + const alexia = await request('POST', '/api/v1/agents', { + name: `alexia-j5-${Date.now()}`, + displayName: 'Alexia', + role: 'agent', + }); + alexiaId = alexia.id; + if (!alexiaId) error(`Failed to create Alexia: ${JSON.stringify(alexia)}`); + success(`Alexia created: ${alexiaId}`); + console.log(''); + + // Step 3: Create Alan + step('Step 3/10: Create agent Alan'); + const alan = await request('POST', '/api/v1/agents', { + name: `alan-j5-${Date.now()}`, + displayName: 'Alan', + role: 'agent', + }); + alanId = alan.id; + if (!alanId) error(`Failed to create Alan: ${JSON.stringify(alan)}`); + success(`Alan created: ${alanId}`); + console.log(''); + + // Step 4: Generate API tokens + step('Step 4/10: Generate API tokens'); + const tokenAlexia = await request('POST', '/api/v1/tokens', { + agentId: alexiaId, + }); + const tokenAlan = await request('POST', '/api/v1/tokens', { + agentId: alanId, + }); + if (!tokenAlexia.token || !tokenAlan.token) { + error('Failed to generate API tokens'); + } + success(`Tokens generated`); + console.log(''); + + // Step 5: Exchange for JWTs + step('Step 5/10: Exchange tokens for JWTs'); + const sessionAlexia = await request('POST', '/api/v1/sessions', { + apiToken: tokenAlexia.token, + }); + const sessionAlan = await request('POST', '/api/v1/sessions', { + apiToken: tokenAlan.token, + }); + jwtAlexia = sessionAlexia.jwt; + jwtAlan = sessionAlan.jwt; + if (!jwtAlexia || !jwtAlan) error('Failed to get JWTs'); + success('JWTs obtained'); + console.log(''); + + // Step 6: Create test room + step('Step 6/10: Create test room'); + const room = await request( + 'POST', + '/api/v1/rooms', + { + slug: `test-j5-${Date.now()}`, + name: 'Test J5 Room', + members: [alexiaId, alanId], + }, + { 'x-agent-id': alexiaId }, + ); + roomId = room.id; + if (!roomId) error(`Failed to create room: ${JSON.stringify(room)}`); + success(`Room created: ${roomId}`); + console.log(''); + + // Step 7: Connect WebSockets + step('Step 7/10: Connect WebSockets'); + await new Promise((resolve, reject) => { + let connectedCount = 0; + + clientAlexia = io(`${WS_URL}/agents`, { + auth: { jwt: jwtAlexia }, + }); + + clientAlan = io(`${WS_URL}/agents`, { + auth: { jwt: jwtAlan }, + }); + + const onHello = () => { + connectedCount++; + if (connectedCount === 2) { + success('Both agents connected to WebSocket'); + console.log(''); + resolve(); + } + }; + + clientAlexia.on('agent:hello-ack', onHello); + clientAlan.on('agent:hello-ack', onHello); + + clientAlexia.on('connect_error', reject); + clientAlan.on('connect_error', reject); + + setTimeout(() => reject(new Error('WebSocket connection timeout')), 10000); + }); + + // Step 8: Test room:list handler + step('Step 8/10: Test room:list handler (NEW)'); + await new Promise((resolve, reject) => { + clientAlexia.emit('room:list', {}, (ack) => { + if (ack.error) { + error(`room:list error: ${ack.error}`); + return reject(new Error(ack.error)); + } + + if (!Array.isArray(ack.rooms)) { + error(`room:list response invalid: ${JSON.stringify(ack)}`); + return reject(new Error('Invalid response')); + } + + const ourRoom = ack.rooms.find((r) => r.id === roomId); + if (!ourRoom) { + error(`Room ${roomId} not found in list`); + return reject(new Error('Room not in list')); + } + + success(`room:list works — found ${ack.rooms.length} room(s)`); + info(` → Room: ${ourRoom.name} (${ourRoom.slug})`); + console.log(''); + resolve(); + }); + + setTimeout(() => reject(new Error('room:list timeout')), 5000); + }); + + // Step 9: Send messages and test broadcast + step('Step 9/10: Send messages and test broadcast'); + const messageIds = []; + + // Alexia sends message 1 + await new Promise((resolve, reject) => { + const receivedBy = { alexia: false, alan: false }; + + const checkComplete = () => { + if (receivedBy.alexia && receivedBy.alan) { + success('Message 1 broadcast to both agents'); + resolve(); + } + }; + + clientAlexia.on('message:new', (payload) => { + if (payload.body === 'Hello from Alexia') { + receivedBy.alexia = true; + checkComplete(); + } + }); + + clientAlan.on('message:new', (payload) => { + if (payload.body === 'Hello from Alexia') { + receivedBy.alan = true; + checkComplete(); + } + }); + + clientAlexia.emit( + 'message:send', + { + roomId, + body: 'Hello from Alexia', + }, + (ack) => { + if (ack.error) return reject(new Error(ack.error)); + messageIds.push(ack.messageId); + }, + ); + + setTimeout(() => reject(new Error('Message broadcast timeout')), 5000); + }); + + // Alan sends message 2 + await new Promise((resolve, reject) => { + clientAlan.emit( + 'message:send', + { + roomId, + body: 'Hello from Alan', + }, + (ack) => { + if (ack.error) return reject(new Error(ack.error)); + messageIds.push(ack.messageId); + success('Message 2 sent by Alan'); + resolve(); + }, + ); + + setTimeout(() => reject(new Error('Message send timeout')), 5000); + }); + + // Wait for persistence + await new Promise((resolve) => setTimeout(resolve, 500)); + info(` → ${messageIds.length} messages sent`); + console.log(''); + + // Step 10: Test message:history handler + step('Step 10/10: Test message:history handler (NEW)'); + await new Promise((resolve, reject) => { + clientAlan.emit( + 'message:history', + { + roomId, + limit: 10, + }, + (ack) => { + if (ack.error) { + error(`message:history error: ${ack.error}`); + return reject(new Error(ack.error)); + } + + if (!Array.isArray(ack.messages)) { + error(`message:history response invalid: ${JSON.stringify(ack)}`); + return reject(new Error('Invalid response')); + } + + if (ack.messages.length < 2) { + error(`Expected at least 2 messages, got ${ack.messages.length}`); + return reject(new Error('Not enough messages in history')); + } + + // Verify both messages are present + const bodies = ack.messages.map((m) => m.body); + const hasAlexiaMsg = bodies.includes('Hello from Alexia'); + const hasAlanMsg = bodies.includes('Hello from Alan'); + + if (!hasAlexiaMsg || !hasAlanMsg) { + error('Missing messages in history'); + return reject(new Error('Messages not found in history')); + } + + success(`message:history works — retrieved ${ack.messages.length} message(s)`); + info(` → hasMore: ${ack.hasMore}, cursor: ${ack.cursor ? 'present' : 'null'}`); + console.log(''); + resolve(); + }, + ); + + setTimeout(() => reject(new Error('message:history timeout')), 5000); + }); + + // Cleanup + clientAlexia.disconnect(); + clientAlan.disconnect(); + + console.log('━'.repeat(70)); + success('All J5 tests passed! ✨'); + console.log('━'.repeat(70)); + console.log(''); + console.log('Validated:'); + console.log(' ✓ room:list WebSocket event'); + console.log(' ✓ message:history WebSocket event'); + console.log(' ✓ message:send with persistence'); + console.log(' ✓ Broadcast to all room members'); + console.log(' ✓ Message persistence in DB'); + console.log(''); + } catch (err) { + if (clientAlexia) clientAlexia.disconnect(); + if (clientAlan) clientAlan.disconnect(); + error(`Test failed: ${err.message}`); + } +} + +main(); diff --git a/test/socket.test.ts b/test/socket.test.ts index 6fb5613..3077a57 100644 --- a/test/socket.test.ts +++ b/test/socket.test.ts @@ -423,4 +423,155 @@ describe('socket.io /agents namespace', () => { } }); }); + + it('should list rooms via WebSocket room:list event', async () => { + return new Promise((resolve, reject) => { + const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { + auth: { jwt: jwt1 }, + }); + + client.on('agent:hello-ack', () => { + client.emit('room:list', { requestId: 'test-room-list' }, (ack: any) => { + try { + expect(ack.error).toBeUndefined(); + expect(ack.rooms).toBeDefined(); + expect(Array.isArray(ack.rooms)).toBe(true); + expect(ack.rooms.length).toBeGreaterThan(0); + + // Find our test room + const testRoom = ack.rooms.find((r: any) => r.id === roomId); + expect(testRoom).toBeDefined(); + expect(testRoom.slug).toBe('test-room'); + expect(testRoom.name).toBe('Test Room'); + + client.disconnect(); + resolve(); + } catch (err) { + client.disconnect(); + reject(err); + } + }); + }); + + client.on('connect_error', (err) => reject(err)); + + setTimeout(() => { + client.disconnect(); + reject(new Error('Timeout waiting for room:list')); + }, 5000); + }); + }); + + it('should retrieve message history via WebSocket message:history event', async () => { + return new Promise(async (resolve, reject) => { + try { + let client1: ClientSocket | null = null; + let messageId: string | null = null; + + // Connect and send a message + client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, { + auth: { jwt: jwt1 }, + }); + + await new Promise((res, rej) => { + client1!.on('agent:hello-ack', () => { + client1!.emit( + 'message:send', + { + roomId, + body: 'Test WebSocket history retrieval', + }, + (ack: any) => { + if (ack.error) { + rej(new Error(ack.error)); + } else { + messageId = ack.messageId; + res(); + } + }, + ); + }); + + setTimeout(() => rej(new Error('Timeout sending message')), 2000); + }); + + // Wait a bit for the message to be persisted + await new Promise((res) => setTimeout(res, 100)); + + // Now fetch history via WebSocket + await new Promise((res, rej) => { + client1!.emit( + 'message:history', + { + roomId, + limit: 50, + requestId: 'test-history-req', + }, + (ack: any) => { + try { + expect(ack.error).toBeUndefined(); + expect(ack.messages).toBeDefined(); + expect(Array.isArray(ack.messages)).toBe(true); + expect(ack.messages.length).toBeGreaterThan(0); + expect(ack.hasMore).toBeDefined(); + expect(typeof ack.hasMore).toBe('boolean'); + + // Find our message + const ourMessage = ack.messages.find((m: any) => m.id === messageId); + expect(ourMessage).toBeDefined(); + expect(ourMessage.body).toBe('Test WebSocket history retrieval'); + expect(ourMessage.authorAgentId).toBe(agent1Id); + expect(ourMessage.roomId).toBe(roomId); + + res(); + } catch (err) { + rej(err); + } + }, + ); + + setTimeout(() => rej(new Error('Timeout fetching history')), 2000); + }); + + client1.disconnect(); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + it('should return error when requesting history for non-member room', async () => { + return new Promise((resolve, reject) => { + const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, { + auth: { jwt: jwt1 }, + }); + + client.on('agent:hello-ack', () => { + client.emit( + 'message:history', + { + roomId: '00000000-0000-0000-0000-000000000000', + requestId: 'test-history-forbidden', + }, + (ack: any) => { + try { + expect(ack.error).toBeDefined(); + expect(ack.error).toBe('Not a member of this room'); + client.disconnect(); + resolve(); + } catch (err) { + client.disconnect(); + reject(err); + } + }, + ); + }); + + setTimeout(() => { + client.disconnect(); + reject(new Error('Timeout waiting for error')); + }, 5000); + }); + }); });