/** * OpusBike WebSocket Relay Server * Receives telemetry from phone (BLE bridge via 5G) and broadcasts to all viewers */ const { WebSocketServer, WebSocket } = require('ws'); const http = require('http'); const PORT = process.env.PORT || 3030; const HEARTBEAT_INTERVAL = 30000; // 30 seconds // Store latest telemetry state so new viewers get immediate data let latestState = null; // Create HTTP server for health checks const httpServer = http.createServer((req, res) => { if (req.url === '/health') { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ status: 'ok', clients: clients.size, sources: [...clients].filter(([, info]) => info.role === 'source').length, viewers: [...clients].filter(([, info]) => info.role === 'viewer').length, hasState: !!latestState, uptime: process.uptime() })); return; } res.writeHead(404); res.end(); }); // Create WebSocket server const wss = new WebSocketServer({ server: httpServer }); // Track connected clients const clients = new Map(); // ws -> { role, connectedAt, alive } // Heartbeat — ping all clients every 30s, terminate unresponsive ones const heartbeat = setInterval(() => { for (const [ws, info] of clients) { if (!info.alive) { console.log(`Terminating unresponsive client (${info.role})`); ws.terminate(); continue; } info.alive = false; ws.ping(); } }, HEARTBEAT_INTERVAL); wss.on('close', () => { clearInterval(heartbeat); }); wss.on('connection', (ws) => { const clientInfo = { role: 'viewer', connectedAt: Date.now(), alive: true }; clients.set(ws, clientInfo); console.log(`Client connected (total: ${clients.size})`); ws.on('pong', () => { clientInfo.alive = true; }); ws.on('message', (raw) => { try { const msg = JSON.parse(raw); if (msg.type === 'register') { clientInfo.role = msg.role || 'viewer'; console.log(`Client registered as: ${clientInfo.role}`); // Notify viewers that a source has connected if (clientInfo.role === 'source') { broadcast({ type: 'source_connected' }, ws); } // Send latest state to new viewers so they get immediate data if (clientInfo.role === 'viewer' && latestState) { ws.send(JSON.stringify({ type: 'state', data: latestState.data, ts: latestState.ts })); } return; } if (msg.type === 'telemetry' && clientInfo.role === 'source') { const ts = new Date().toISOString(); // Store latest state for new viewers latestState = { data: msg.data, ts }; // Broadcast telemetry with timestamp to all viewers const payload = JSON.stringify({ type: 'telemetry', data: msg.data, ts }); for (const [client] of clients) { if (client !== ws && client.readyState === WebSocket.OPEN) { client.send(payload); } } } } catch (e) { // Ignore malformed messages } }); ws.on('close', () => { const wasSource = clientInfo.role === 'source'; clients.delete(ws); console.log(`Client disconnected (total: ${clients.size})`); // Notify viewers if source disconnected if (wasSource) { broadcast({ type: 'source_disconnected' }); } }); ws.on('error', (err) => { console.error('WS error:', err.message); }); }); /** * Broadcast a message to all connected clients */ function broadcast(msg, exclude = null) { const payload = JSON.stringify(msg); for (const [client] of clients) { if (client !== exclude && client.readyState === WebSocket.OPEN) { client.send(payload); } } } httpServer.listen(PORT, () => { console.log(`OpusBike relay server running on port ${PORT}`); });