OpusBike/cli/test_bleak.py

596 lines
20 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
OpusBike Bosch Smart System BLE Test Script (bleak)
Connects to a pre-paired Bosch eBike via BLE, decodes telemetry,
prints to terminal, serves dashboard at localhost:3000, logs to CSV.
Usage:
python3 cli/test_bleak.py # Real BLE mode
python3 cli/test_bleak.py --demo # Simulated data
python3 cli/test_bleak.py --uuid XX # Specific device UUID
"""
import argparse
import asyncio
import csv
import json
import math
import os
import signal
import sys
import time
from datetime import datetime, timedelta
# ---------------------------------------------------------------------------
# Bosch BLE constants
# ---------------------------------------------------------------------------
SERVICE_UUID = "00000010-eaa2-11e9-81b4-2a2ae2dbcce4"
CHAR_UUID = "00000011-eaa2-11e9-81b4-2a2ae2dbcce4"
DEVICE_NAME_FILTERS = ["smart system", "Bosch", "BRC"]
ASSIST_MODES = {0: "OFF", 1: "ECO", 2: "TOUR", 3: "eMTB", 4: "TURBO"}
# Confirmed data IDs from nRF Connect Phase 2 testing
KNOWN_DATA_IDS = {
0x9809: ("assist_mode", 1), # 0=OFF 1=ECO 2=TOUR 3=eMTB 4=TURBO
0x8088: ("battery", 1), # %
0x982D: ("speed", 10), # km/h (raw / 10)
0x985A: ("cadence", 2), # rpm (raw / 2)
0x985B: ("human_power", 1), # W
0x985D: ("motor_power", 1), # W
}
# ---------------------------------------------------------------------------
# Shared telemetry state
# ---------------------------------------------------------------------------
telemetry = {
"speed": 0.0,
"cadence": 0,
"human_power": 0,
"motor_power": 0,
"battery": 0,
"assist_mode": "OFF",
"ride_time": "00:00:00",
"total_km": 0.0,
"connected": False,
"demo": False,
"last_update": None,
}
connection_start: float = 0.0
csv_writer = None
csv_file = None
# ---------------------------------------------------------------------------
# Varint decoder
# ---------------------------------------------------------------------------
def decode_varint(data: bytes, offset: int) -> tuple:
"""Decode a protobuf-style varint. Returns (value, bytes_read)."""
result = 0
shift = 0
bytes_read = 0
while offset + bytes_read < len(data):
b = data[offset + bytes_read]
result |= (b & 0x7F) << shift
bytes_read += 1
if (b & 0x80) == 0:
break
shift += 7
return result, bytes_read
# ---------------------------------------------------------------------------
# Bosch protocol parser
# ---------------------------------------------------------------------------
def parse_notification(data: bytes) -> dict:
"""
Parse a Bosch BLE notification.
Format: [0x30] [length] [id_hi] [id_lo] [0x08] [varint data...]
Multiple messages can be concatenated.
"""
parsed = {}
offset = 0
while offset < len(data):
# Expect 0x30 header
if offset >= len(data):
break
header = data[offset]
if header != 0x30:
# Try to skip unknown bytes
offset += 1
continue
offset += 1 # skip 0x30
if offset >= len(data):
break
# Read length
length = data[offset]
offset += 1
if length == 0:
continue
msg_end = offset + length
if msg_end > len(data):
break
# Read 2-byte data ID
if offset + 1 >= len(data):
break
id_hi = data[offset]
id_lo = data[offset + 1]
data_id = (id_hi << 8) | id_lo
offset += 2
# Length 0x02 means just the ID, value = 0
if length == 2:
if data_id in KNOWN_DATA_IDS:
field_name, divisor = KNOWN_DATA_IDS[data_id]
parsed[field_name] = 0
log_raw(data, data_id, 0, 0)
continue
# Expect 0x08 marker before varint
if offset < len(data) and data[offset] == 0x08:
offset += 1
# Decode varint value
if offset < msg_end:
value, vbytes = decode_varint(data, offset)
offset += vbytes
if data_id in KNOWN_DATA_IDS:
field_name, divisor = KNOWN_DATA_IDS[data_id]
decoded = value / divisor if divisor > 1 else value
parsed[field_name] = decoded
log_raw(data, data_id, value, decoded)
else:
log_raw(data, data_id, value, None)
else:
offset = msg_end
# Advance to end of this message if we haven't reached it
if offset < msg_end:
offset = msg_end
return parsed
def log_raw(raw_data: bytes, data_id: int, raw_value: int, decoded_value):
"""Log a decoded field to CSV."""
global csv_writer
if csv_writer is None:
return
field_name = "unknown"
if data_id in KNOWN_DATA_IDS:
field_name = KNOWN_DATA_IDS[data_id][0]
try:
csv_writer.writerow([
datetime.now().isoformat(),
raw_data.hex(),
f"0x{data_id:04X}",
field_name,
raw_value,
decoded_value if decoded_value is not None else "",
])
except Exception:
pass
# ---------------------------------------------------------------------------
# Apply parsed data to global telemetry
# ---------------------------------------------------------------------------
def apply_parsed(parsed: dict):
"""Merge parsed BLE fields into the global telemetry dict."""
if "speed" in parsed:
telemetry["speed"] = round(parsed["speed"], 1)
if "cadence" in parsed:
telemetry["cadence"] = int(parsed["cadence"])
if "human_power" in parsed:
telemetry["human_power"] = int(parsed["human_power"])
if "motor_power" in parsed:
telemetry["motor_power"] = int(parsed["motor_power"])
if "battery" in parsed:
telemetry["battery"] = int(parsed["battery"])
if "assist_mode" in parsed:
mode_val = int(parsed["assist_mode"])
telemetry["assist_mode"] = ASSIST_MODES.get(mode_val, f"UNK({mode_val})")
# Update ride time
if connection_start > 0:
elapsed = time.time() - connection_start
h, rem = divmod(int(elapsed), 3600)
m, s = divmod(rem, 60)
telemetry["ride_time"] = f"{h:02d}:{m:02d}:{s:02d}"
telemetry["last_update"] = datetime.now().isoformat()
# ---------------------------------------------------------------------------
# Terminal output
# ---------------------------------------------------------------------------
def print_telemetry():
"""Print one-line telemetry to terminal."""
ts = datetime.now().strftime("%H:%M:%S")
mode = telemetry["assist_mode"]
prefix = "[DEMO] " if telemetry["demo"] else ""
print(
f"\r{prefix}[{ts}] "
f"Speed: {telemetry['speed']:5.1f} km/h | "
f"Cadence: {telemetry['cadence']:3d} rpm | "
f"Power: {telemetry['human_power']:3d}W | "
f"Motor: {telemetry['motor_power']:3d}W | "
f"Battery: {telemetry['battery']:3d}% | "
f"Mode: {mode:<5s} | "
f"Time: {telemetry['ride_time']}",
end="", flush=True,
)
# ---------------------------------------------------------------------------
# Web dashboard (aiohttp)
# ---------------------------------------------------------------------------
DASHBOARD_HTML = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>OpusBike BLE Test Dashboard</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
background: #0a0a0a; color: #fff;
font-family: -apple-system, 'Segoe UI', Helvetica, Arial, sans-serif;
display: flex; flex-direction: column; align-items: center;
min-height: 100vh; padding: 20px;
}
h1 { font-size: 1.4rem; color: #4285F4; margin-bottom: 6px; }
.subtitle { font-size: 0.8rem; color: #666; margin-bottom: 20px; }
.demo-badge {
display: inline-block; background: #FF9800; color: #000;
font-weight: 700; font-size: 0.7rem; padding: 2px 8px;
border-radius: 4px; margin-left: 8px;
}
.status {
font-size: 0.75rem; padding: 4px 12px; border-radius: 12px;
margin-bottom: 16px;
}
.status.connected { background: #1b5e20; color: #81c784; }
.status.disconnected { background: #4a1010; color: #e57373; }
.grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(160px, 1fr));
gap: 12px; width: 100%; max-width: 700px;
}
.card {
background: #1a1a1a; border-radius: 12px; padding: 16px;
text-align: center; border: 1px solid #222;
}
.card.hero {
grid-column: 1 / -1; padding: 24px;
}
.card .label { font-size: 0.7rem; color: #888; text-transform: uppercase; letter-spacing: 1px; }
.card .value { font-size: 2rem; font-weight: 700; margin: 4px 0; font-variant-numeric: tabular-nums; }
.card.hero .value { font-size: 4rem; color: #4285F4; }
.card .unit { font-size: 0.8rem; color: #666; }
.mode { font-size: 1.2rem; font-weight: 700; }
.mode.OFF { color: #666; } .mode.ECO { color: #4caf50; }
.mode.TOUR { color: #4285F4; } .mode.eMTB { color: #ff9800; }
.mode.TURBO { color: #f44336; }
.battery-bar {
width: 100%; height: 8px; background: #333; border-radius: 4px;
margin-top: 6px; overflow: hidden;
}
.battery-fill { height: 100%; border-radius: 4px; transition: width 0.3s; }
.battery-fill.green { background: #4caf50; }
.battery-fill.yellow { background: #ff9800; }
.battery-fill.red { background: #f44336; }
.timer { font-variant-numeric: tabular-nums; color: #aaa; }
</style>
</head>
<body>
<h1>OpusBike BLE Test<span id="demoBadge" class="demo-badge" style="display:none">DEMO</span></h1>
<div class="subtitle">Python bleak Bosch Smart System</div>
<div id="status" class="status disconnected">Disconnected</div>
<div class="grid">
<div class="card hero">
<div class="label">Speed</div>
<div class="value" id="speed">0.0</div>
<div class="unit">km/h</div>
</div>
<div class="card">
<div class="label">Cadence</div>
<div class="value" id="cadence">0</div>
<div class="unit">rpm</div>
</div>
<div class="card">
<div class="label">Human Power</div>
<div class="value" id="human_power">0</div>
<div class="unit">W</div>
</div>
<div class="card">
<div class="label">Motor Power</div>
<div class="value" id="motor_power">0</div>
<div class="unit">W</div>
</div>
<div class="card">
<div class="label">Battery</div>
<div class="value" id="battery">0<span class="unit">%</span></div>
<div class="battery-bar"><div class="battery-fill green" id="batteryBar" style="width:0%"></div></div>
</div>
<div class="card">
<div class="label">Assist Mode</div>
<div class="mode OFF" id="mode">OFF</div>
</div>
<div class="card">
<div class="label">Ride Time</div>
<div class="value timer" id="ride_time">00:00:00</div>
</div>
</div>
<script>
async function poll() {
try {
const r = await fetch('/api/telemetry');
const d = await r.json();
document.getElementById('speed').textContent = d.speed.toFixed(1);
document.getElementById('cadence').textContent = d.cadence;
document.getElementById('human_power').textContent = d.human_power;
document.getElementById('motor_power').textContent = d.motor_power;
document.getElementById('battery').innerHTML = d.battery + '<span class="unit">%</span>';
const bar = document.getElementById('batteryBar');
bar.style.width = d.battery + '%';
bar.className = 'battery-fill ' + (d.battery > 50 ? 'green' : d.battery > 20 ? 'yellow' : 'red');
const modeEl = document.getElementById('mode');
modeEl.textContent = d.assist_mode;
modeEl.className = 'mode ' + d.assist_mode;
document.getElementById('ride_time').textContent = d.ride_time;
const st = document.getElementById('status');
if (d.connected || d.demo) {
st.textContent = d.demo ? 'Demo Mode' : 'BLE Connected';
st.className = 'status connected';
} else {
st.textContent = 'Disconnected';
st.className = 'status disconnected';
}
if (d.demo) document.getElementById('demoBadge').style.display = 'inline-block';
} catch (e) {}
}
setInterval(poll, 500);
poll();
</script>
</body>
</html>"""
async def start_web_server(port: int = 3000):
"""Start aiohttp web server for the dashboard."""
try:
from aiohttp import web
except ImportError:
print("\n[WARN] aiohttp not installed — dashboard disabled. Run: pip3 install aiohttp")
return
app = web.Application()
async def handle_index(request):
return web.Response(text=DASHBOARD_HTML, content_type="text/html")
async def handle_api(request):
return web.json_response(telemetry)
app.router.add_get("/", handle_index)
app.router.add_get("/api/telemetry", handle_api)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
print(f"[INFO] Dashboard: http://localhost:{port}")
# ---------------------------------------------------------------------------
# Demo mode
# ---------------------------------------------------------------------------
async def run_demo():
"""Generate fake BLE data simulating a ride."""
global connection_start
telemetry["demo"] = True
telemetry["connected"] = True
telemetry["battery"] = 87
connection_start = time.time()
modes = ["ECO", "TOUR", "eMTB", "TURBO", "TOUR", "ECO"]
mode_idx = 0
tick = 0
total_km = 1847.2
print("[DEMO] Simulating Bosch Smart System ride data...")
print("[DEMO] Press Ctrl+C to stop\n")
while True:
t = tick * 0.5 # 2 Hz
# Speed: sinusoidal 0-35 km/h with noise
speed = max(0, 22 + 13 * math.sin(t / 8) + 2 * math.sin(t / 3))
# Cadence correlates with speed
cadence = int(max(0, speed * 2.8 + 5 * math.sin(t / 5)))
# Human power correlates with cadence
human_power = int(max(0, cadence * 1.4 + 10 * math.sin(t / 4)))
# Motor power depends on mode
mode_multiplier = {"OFF": 0, "ECO": 0.5, "TOUR": 1.0, "eMTB": 1.5, "TURBO": 2.2}
mode_name = modes[mode_idx % len(modes)]
motor_power = int(human_power * mode_multiplier.get(mode_name, 1.0))
# Battery decreases slowly
battery = max(0, 87 - int(tick * 0.02))
# Total km increases with speed
total_km += speed / 3600 * 0.5 # 0.5s tick
telemetry["speed"] = round(speed, 1)
telemetry["cadence"] = cadence
telemetry["human_power"] = human_power
telemetry["motor_power"] = motor_power
telemetry["battery"] = battery
telemetry["assist_mode"] = mode_name
telemetry["total_km"] = round(total_km, 1)
# Update ride time
elapsed = time.time() - connection_start
h, rem = divmod(int(elapsed), 3600)
m, s = divmod(rem, 60)
telemetry["ride_time"] = f"{h:02d}:{m:02d}:{s:02d}"
print_telemetry()
tick += 1
if tick % 30 == 0: # Change mode every 15s
mode_idx += 1
await asyncio.sleep(0.5)
# ---------------------------------------------------------------------------
# Real BLE mode
# ---------------------------------------------------------------------------
async def run_ble(device_uuid: str = None):
"""Connect to real Bosch eBike via BLE and stream data."""
global connection_start
try:
from bleak import BleakClient, BleakScanner
except ImportError:
print("[ERROR] bleak not installed. Run: pip3 install bleak")
sys.exit(1)
# Scan for device
print("[INFO] Scanning for Bosch Smart System eBike...")
device = None
if device_uuid:
print(f"[INFO] Looking for device UUID: {device_uuid}")
device = await BleakScanner.find_device_by_address(device_uuid, timeout=15.0)
else:
devices = await BleakScanner.discover(timeout=10.0)
for d in devices:
name = d.name or ""
if any(f.lower() in name.lower() for f in DEVICE_NAME_FILTERS):
device = d
print(f"[INFO] Found: {d.name} ({d.address})")
break
if device is None:
print("[ERROR] Bike not found. Make sure:")
print(" 1. Bike is ON (battery connected)")
print(" 2. LED Remote is active")
print(" 3. Bike is paired in macOS System Settings > Bluetooth")
print(" 4. No other device (Bosch Flow app) is connected to the bike")
print(f"\nTip: Run with --demo to test without the bike")
sys.exit(1)
print(f"[INFO] Connecting to {device.name} ({device.address})...")
def on_disconnect(client):
elapsed = time.time() - connection_start if connection_start > 0 else 0
telemetry["connected"] = False
print(f"\n[WARN] Disconnected after {elapsed:.1f}s")
if 35 < elapsed < 40:
print("[WARN] Disconnected at ~37s — BLE bonding likely failed!")
print("[WARN] Pair the bike in macOS System Settings > Bluetooth first (R-000138)")
else:
print("[INFO] Will attempt reconnection in 3 seconds...")
def notification_handler(sender, data: bytearray):
"""Handle incoming BLE notifications."""
parsed = parse_notification(bytes(data))
if parsed:
apply_parsed(parsed)
print_telemetry()
# Connection loop with auto-reconnect
while True:
try:
async with BleakClient(
device.address,
disconnected_callback=on_disconnect,
timeout=20.0,
) as client:
if not client.is_connected:
print("[ERROR] Failed to connect")
await asyncio.sleep(3)
continue
connection_start = time.time()
telemetry["connected"] = True
print(f"[INFO] Connected! Subscribing to {CHAR_UUID}...")
# List services for debugging
for service in client.services:
if SERVICE_UUID.lower() in service.uuid.lower():
print(f"[INFO] Found Bosch service: {service.uuid}")
for char in service.characteristics:
props = ", ".join(char.properties)
print(f" Char: {char.uuid} [{props}]")
# Subscribe to notifications
await client.start_notify(CHAR_UUID, notification_handler)
print("[INFO] Subscribed — receiving data. Press Ctrl+C to stop.\n")
# Keep alive — check connection every second
while client.is_connected:
await asyncio.sleep(1.0)
except Exception as e:
print(f"\n[ERROR] {e}")
telemetry["connected"] = False
print("[INFO] Reconnecting in 3 seconds...")
await asyncio.sleep(3)
# ---------------------------------------------------------------------------
# CSV setup
# ---------------------------------------------------------------------------
def setup_csv(path: str):
global csv_writer, csv_file
csv_file = open(path, "a", newline="")
csv_writer = csv.writer(csv_file)
if os.path.getsize(path) == 0:
csv_writer.writerow(["timestamp", "raw_hex", "id_hex", "field_name", "raw_value", "decoded_value"])
print(f"[INFO] CSV log: {path}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main():
parser = argparse.ArgumentParser(description="OpusBike Bosch BLE Test (bleak)")
parser.add_argument("--demo", action="store_true", help="Run with simulated data")
parser.add_argument("--uuid", type=str, default=None, help="BLE device UUID/address")
parser.add_argument("--port", type=int, default=3000, help="Dashboard port (default: 3000)")
args = parser.parse_args()
# CSV logging
csv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ble_log.csv")
if not args.demo:
setup_csv(csv_path)
# Start web dashboard
await start_web_server(args.port)
# Run BLE or demo
if args.demo:
await run_demo()
else:
await run_ble(args.uuid)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n[INFO] Stopped.")
if csv_file:
csv_file.close()
sys.exit(0)