Complete implementation ready for Coolify: - Node.js 22 + Fastify + socket.io backend - PostgreSQL 16 + Redis 7 services - Docker Compose configuration - Deployment scripts and documentation Co-Authored-By: Paperclip <noreply@paperclip.ing>
426 lines
12 KiB
TypeScript
426 lines
12 KiB
TypeScript
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
|
|
import { io as ioClient, type Socket as ClientSocket } from 'socket.io-client';
|
|
import { buildApp } from '../src/app.js';
|
|
import { loadConfig } from '../src/config.js';
|
|
import { pool } from '../src/db/pool.js';
|
|
import { drizzle } from 'drizzle-orm/node-postgres';
|
|
import { agents, apiTokens, rooms, roomMembers } from '../src/db/schema.js';
|
|
import { generateApiToken, hashApiToken } from '../src/lib/crypto.js';
|
|
import type { FastifyInstance } from 'fastify';
|
|
|
|
describe('socket.io /agents namespace', () => {
|
|
let app: FastifyInstance;
|
|
let serverPort: number;
|
|
let agent1Id: string;
|
|
let agent2Id: string;
|
|
let roomId: string;
|
|
let jwt1: string;
|
|
let jwt2: string;
|
|
|
|
beforeAll(async () => {
|
|
const config = loadConfig({
|
|
NODE_ENV: 'test',
|
|
LOG_LEVEL: 'fatal',
|
|
HOST: '127.0.0.1',
|
|
PORT: '3001',
|
|
POSTGRES_HOST: process.env.POSTGRES_HOST || 'localhost',
|
|
POSTGRES_PORT: process.env.POSTGRES_PORT || '5432',
|
|
POSTGRES_USER: process.env.POSTGRES_USER || 'agenthub',
|
|
POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'agenthub',
|
|
POSTGRES_DB: process.env.POSTGRES_DB || 'agenthub',
|
|
JWT_SECRET: 'test-secret-with-exactly-32chars',
|
|
});
|
|
app = await buildApp({ config });
|
|
await app.listen({ host: '127.0.0.1', port: 0 });
|
|
const address = app.server.address();
|
|
if (!address || typeof address === 'string') {
|
|
throw new Error('Failed to get server port');
|
|
}
|
|
serverPort = address.port;
|
|
|
|
// Setup test data
|
|
const db = drizzle(pool);
|
|
|
|
// Create agents
|
|
const [a1] = await db
|
|
.insert(agents)
|
|
.values({
|
|
name: 'test-agent-1',
|
|
displayName: 'Test Agent 1',
|
|
role: 'agent',
|
|
})
|
|
.returning();
|
|
if (!a1) throw new Error('Failed to create agent 1');
|
|
agent1Id = a1.id;
|
|
|
|
const [a2] = await db
|
|
.insert(agents)
|
|
.values({
|
|
name: 'test-agent-2',
|
|
displayName: 'Test Agent 2',
|
|
role: 'agent',
|
|
})
|
|
.returning();
|
|
if (!a2) throw new Error('Failed to create agent 2');
|
|
agent2Id = a2.id;
|
|
|
|
// Create room
|
|
const [r] = await db
|
|
.insert(rooms)
|
|
.values({
|
|
slug: 'test-room',
|
|
name: 'Test Room',
|
|
createdBy: agent1Id,
|
|
})
|
|
.returning();
|
|
if (!r) throw new Error('Failed to create room');
|
|
roomId = r.id;
|
|
|
|
// Add both agents to room
|
|
await db.insert(roomMembers).values([
|
|
{ roomId, agentId: agent1Id },
|
|
{ roomId, agentId: agent2Id },
|
|
]);
|
|
|
|
// Create API tokens
|
|
const token1 = generateApiToken();
|
|
const hash1 = await hashApiToken(token1.fullToken);
|
|
await db.insert(apiTokens).values({
|
|
agentId: agent1Id,
|
|
hashArgon2id: hash1,
|
|
prefix: token1.prefix,
|
|
scopes: {},
|
|
status: 'active',
|
|
});
|
|
|
|
const token2 = generateApiToken();
|
|
const hash2 = await hashApiToken(token2.fullToken);
|
|
await db.insert(apiTokens).values({
|
|
agentId: agent2Id,
|
|
hashArgon2id: hash2,
|
|
prefix: token2.prefix,
|
|
scopes: {},
|
|
status: 'active',
|
|
});
|
|
|
|
// Exchange for JWTs
|
|
const res1 = await app.inject({
|
|
method: 'POST',
|
|
url: '/api/v1/sessions',
|
|
payload: { apiToken: token1.fullToken },
|
|
});
|
|
jwt1 = JSON.parse(res1.body).jwt;
|
|
|
|
const res2 = await app.inject({
|
|
method: 'POST',
|
|
url: '/api/v1/sessions',
|
|
payload: { apiToken: token2.fullToken },
|
|
});
|
|
jwt2 = JSON.parse(res2.body).jwt;
|
|
});
|
|
|
|
afterAll(async () => {
|
|
await app.close();
|
|
});
|
|
|
|
it('should connect with valid JWT and receive agent:hello-ack', async () => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt1 },
|
|
});
|
|
|
|
client.on('agent:hello-ack', (payload) => {
|
|
try {
|
|
expect(payload.agentId).toBe(agent1Id);
|
|
expect(payload.rooms).toContain(roomId);
|
|
client.disconnect();
|
|
resolve();
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
});
|
|
|
|
client.on('connect_error', (err) => {
|
|
reject(err);
|
|
});
|
|
|
|
setTimeout(() => reject(new Error('Timeout waiting for hello-ack')), 5000);
|
|
});
|
|
});
|
|
|
|
it('should reject connection with missing JWT', async () => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: {},
|
|
});
|
|
|
|
client.on('connect', () => {
|
|
client.disconnect();
|
|
reject(new Error('Should not connect without JWT'));
|
|
});
|
|
|
|
client.on('connect_error', (err) => {
|
|
expect(err.message).toContain('Missing JWT');
|
|
client.disconnect();
|
|
resolve();
|
|
});
|
|
|
|
setTimeout(() => reject(new Error('Timeout waiting for error')), 5000);
|
|
});
|
|
});
|
|
|
|
it('should reject connection with invalid JWT', async () => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: 'invalid-jwt' },
|
|
});
|
|
|
|
client.on('connect', () => {
|
|
client.disconnect();
|
|
reject(new Error('Should not connect with invalid JWT'));
|
|
});
|
|
|
|
client.on('connect_error', (err) => {
|
|
expect(err.message).toContain('Invalid or expired JWT');
|
|
client.disconnect();
|
|
resolve();
|
|
});
|
|
|
|
setTimeout(() => reject(new Error('Timeout waiting for error')), 5000);
|
|
});
|
|
});
|
|
|
|
it('should emit presence:update when two agents join the same room', async () => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
let client1: ClientSocket | null = null;
|
|
let client2: ClientSocket | null = null;
|
|
|
|
// Connect client 1
|
|
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt1 },
|
|
});
|
|
|
|
client1.on('agent:hello-ack', () => {
|
|
// Connect client 2
|
|
client2 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt2 },
|
|
});
|
|
});
|
|
|
|
// Client 1 should receive presence update from client 2
|
|
client1.on('presence:update', (payload) => {
|
|
try {
|
|
expect(payload.agentId).toBe(agent2Id);
|
|
expect(payload.status).toBe('online');
|
|
client1?.disconnect();
|
|
client2?.disconnect();
|
|
resolve();
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
});
|
|
|
|
client1.on('connect_error', (err) => reject(err));
|
|
|
|
setTimeout(() => {
|
|
client1?.disconnect();
|
|
client2?.disconnect();
|
|
reject(new Error('Timeout waiting for presence update'));
|
|
}, 5000);
|
|
});
|
|
});
|
|
|
|
it('should emit error when trying to join non-member room', async () => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
const client = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt1 },
|
|
});
|
|
|
|
client.on('agent:hello-ack', () => {
|
|
// Try to join a non-existent room
|
|
client.emit('room:join', {
|
|
roomId: '00000000-0000-0000-0000-000000000000',
|
|
requestId: 'test-req-1',
|
|
});
|
|
});
|
|
|
|
client.on('error', (payload) => {
|
|
try {
|
|
expect(payload.code).toBe('forbidden');
|
|
expect(payload.requestId).toBe('test-req-1');
|
|
client.disconnect();
|
|
resolve();
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
});
|
|
|
|
setTimeout(() => {
|
|
client.disconnect();
|
|
reject(new Error('Timeout waiting for error'));
|
|
}, 5000);
|
|
});
|
|
});
|
|
|
|
it('should send and receive messages in real-time', async () => {
|
|
return new Promise<void>((resolve, reject) => {
|
|
let client1: ClientSocket | null = null;
|
|
let client2: ClientSocket | null = null;
|
|
let receivedByClient2 = false;
|
|
let receivedEchoByClient1 = false;
|
|
const messageBody = 'Hello from agent 1!';
|
|
let _messageId: string | null = null;
|
|
|
|
// Connect client 1
|
|
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt1 },
|
|
});
|
|
|
|
// Connect client 2
|
|
client2 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt2 },
|
|
});
|
|
|
|
// Client 2 listens for new messages
|
|
client2.on('message:new', (payload) => {
|
|
try {
|
|
expect(payload.authorAgentId).toBe(agent1Id);
|
|
expect(payload.roomId).toBe(roomId);
|
|
expect(payload.body).toBe(messageBody);
|
|
expect(payload.id).toBeTruthy();
|
|
_messageId = payload.id;
|
|
receivedByClient2 = true;
|
|
|
|
// Both clients received the message?
|
|
if (receivedEchoByClient1 && receivedByClient2) {
|
|
client1?.disconnect();
|
|
client2?.disconnect();
|
|
resolve();
|
|
}
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
});
|
|
|
|
// Client 1 also receives the echo
|
|
client1.on('message:new', (payload) => {
|
|
try {
|
|
expect(payload.authorAgentId).toBe(agent1Id);
|
|
expect(payload.roomId).toBe(roomId);
|
|
expect(payload.body).toBe(messageBody);
|
|
receivedEchoByClient1 = true;
|
|
|
|
if (receivedByClient2 && receivedEchoByClient1) {
|
|
client1?.disconnect();
|
|
client2?.disconnect();
|
|
resolve();
|
|
}
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
});
|
|
|
|
// Once both connected, send message
|
|
let connectedCount = 0;
|
|
const onBothConnected = () => {
|
|
connectedCount++;
|
|
if (connectedCount === 2) {
|
|
// Wait a bit to ensure both are subscribed
|
|
setTimeout(() => {
|
|
client1?.emit(
|
|
'message:send',
|
|
{
|
|
roomId,
|
|
body: messageBody,
|
|
},
|
|
(ack: any) => {
|
|
try {
|
|
expect(ack.messageId).toBeTruthy();
|
|
expect(ack.error).toBeUndefined();
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
},
|
|
);
|
|
}, 100);
|
|
}
|
|
};
|
|
|
|
client1.on('agent:hello-ack', onBothConnected);
|
|
client2.on('agent:hello-ack', onBothConnected);
|
|
|
|
setTimeout(() => {
|
|
client1?.disconnect();
|
|
client2?.disconnect();
|
|
reject(new Error('Timeout waiting for message delivery'));
|
|
}, 5000);
|
|
});
|
|
});
|
|
|
|
it('should retrieve message history via REST after reconnection', async () => {
|
|
return new Promise<void>(async (resolve, reject) => {
|
|
try {
|
|
let client1: ClientSocket | null = null;
|
|
let messageId: string | null = null;
|
|
|
|
// Connect and send a message
|
|
client1 = ioClient(`http://127.0.0.1:${serverPort}/agents`, {
|
|
auth: { jwt: jwt1 },
|
|
});
|
|
|
|
await new Promise<void>((res, rej) => {
|
|
client1!.on('agent:hello-ack', () => {
|
|
client1!.emit(
|
|
'message:send',
|
|
{
|
|
roomId,
|
|
body: 'Test message for history',
|
|
},
|
|
(ack: any) => {
|
|
if (ack.error) {
|
|
rej(new Error(ack.error));
|
|
} else {
|
|
messageId = ack.messageId;
|
|
res();
|
|
}
|
|
},
|
|
);
|
|
});
|
|
|
|
setTimeout(() => rej(new Error('Timeout sending message')), 2000);
|
|
});
|
|
|
|
// Disconnect
|
|
client1.disconnect();
|
|
|
|
// Wait a bit
|
|
await new Promise((res) => setTimeout(res, 200));
|
|
|
|
// Now fetch history via REST
|
|
const res = await app.inject({
|
|
method: 'GET',
|
|
url: `/rooms/${roomId}/messages`,
|
|
headers: {
|
|
'x-agent-id': agent1Id,
|
|
},
|
|
});
|
|
|
|
expect(res.statusCode).toBe(200);
|
|
const body = JSON.parse(res.body);
|
|
expect(body.messages).toBeDefined();
|
|
expect(Array.isArray(body.messages)).toBe(true);
|
|
expect(body.messages.length).toBeGreaterThan(0);
|
|
|
|
// Find our message
|
|
const ourMessage = body.messages.find((m: any) => m.id === messageId);
|
|
expect(ourMessage).toBeDefined();
|
|
expect(ourMessage.body).toBe('Test message for history');
|
|
expect(ourMessage.authorAgentId).toBe(agent1Id);
|
|
|
|
resolve();
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
});
|
|
});
|
|
});
|