OpusBike/server/relay.js

139 lines
4.2 KiB
JavaScript
Raw Normal View History

/**
* OpusBike WebSocket Relay Server
* Receives telemetry from phone (BLE bridge via 5G) and broadcasts to all viewers
*/
const { WebSocketServer, WebSocket } = require('ws');
const http = require('http');
const PORT = process.env.PORT || 3030;
const HEARTBEAT_INTERVAL = 30000; // 30 seconds
// Store latest telemetry state so new viewers get immediate data
let latestState = null;
// Create HTTP server for health checks
const httpServer = http.createServer((req, res) => {
if (req.url === '/health') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
status: 'ok',
clients: clients.size,
sources: [...clients].filter(([, info]) => info.role === 'source').length,
viewers: [...clients].filter(([, info]) => info.role === 'viewer').length,
hasState: !!latestState,
uptime: process.uptime()
}));
return;
}
res.writeHead(404);
res.end();
});
// Create WebSocket server
const wss = new WebSocketServer({ server: httpServer });
// Track connected clients
const clients = new Map(); // ws -> { role, connectedAt, alive }
// Heartbeat — ping all clients every 30s, terminate unresponsive ones
const heartbeat = setInterval(() => {
for (const [ws, info] of clients) {
if (!info.alive) {
console.log(`Terminating unresponsive client (${info.role})`);
ws.terminate();
continue;
}
info.alive = false;
ws.ping();
}
}, HEARTBEAT_INTERVAL);
wss.on('close', () => {
clearInterval(heartbeat);
});
wss.on('connection', (ws) => {
const clientInfo = { role: 'viewer', connectedAt: Date.now(), alive: true };
clients.set(ws, clientInfo);
console.log(`Client connected (total: ${clients.size})`);
ws.on('pong', () => {
clientInfo.alive = true;
});
ws.on('message', (raw) => {
try {
const msg = JSON.parse(raw);
if (msg.type === 'register') {
clientInfo.role = msg.role || 'viewer';
console.log(`Client registered as: ${clientInfo.role}`);
// Notify viewers that a source has connected
if (clientInfo.role === 'source') {
broadcast({ type: 'source_connected' }, ws);
}
// Send latest state to new viewers so they get immediate data
if (clientInfo.role === 'viewer' && latestState) {
ws.send(JSON.stringify({
type: 'state',
data: latestState.data,
ts: latestState.ts
}));
}
return;
}
if (msg.type === 'telemetry' && clientInfo.role === 'source') {
const ts = new Date().toISOString();
// Store latest state for new viewers
latestState = { data: msg.data, ts };
// Broadcast telemetry with timestamp to all viewers
const payload = JSON.stringify({ type: 'telemetry', data: msg.data, ts });
for (const [client] of clients) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(payload);
}
}
}
} catch (e) {
// Ignore malformed messages
}
});
ws.on('close', () => {
const wasSource = clientInfo.role === 'source';
clients.delete(ws);
console.log(`Client disconnected (total: ${clients.size})`);
// Notify viewers if source disconnected
if (wasSource) {
broadcast({ type: 'source_disconnected' });
}
});
ws.on('error', (err) => {
console.error('WS error:', err.message);
});
});
/**
* Broadcast a message to all connected clients
*/
function broadcast(msg, exclude = null) {
const payload = JSON.stringify(msg);
for (const [client] of clients) {
if (client !== exclude && client.readyState === WebSocket.OPEN) {
client.send(payload);
}
}
}
httpServer.listen(PORT, () => {
console.log(`OpusBike relay server running on port ${PORT}`);
});