#!/usr/bin/env python3 """ OpusBike — Bosch Smart System BLE Test Script (bleak) Connects to a pre-paired Bosch eBike via BLE, decodes telemetry, prints to terminal, serves dashboard at localhost:3000, logs to CSV. Usage: python3 cli/test_bleak.py # Real BLE mode python3 cli/test_bleak.py --demo # Simulated data python3 cli/test_bleak.py --uuid XX # Specific device UUID """ import argparse import asyncio import csv import json import math import os import signal import sys import time from datetime import datetime, timedelta # --------------------------------------------------------------------------- # Bosch BLE constants # --------------------------------------------------------------------------- SERVICE_UUID = "00000010-eaa2-11e9-81b4-2a2ae2dbcce4" CHAR_UUID = "00000011-eaa2-11e9-81b4-2a2ae2dbcce4" DEVICE_NAME_FILTERS = ["smart system", "Bosch", "BRC"] ASSIST_MODES = {0: "OFF", 1: "ECO", 2: "TOUR", 3: "eMTB", 4: "TURBO"} # Confirmed data IDs from nRF Connect Phase 2 testing KNOWN_DATA_IDS = { 0x9809: ("assist_mode", 1), # 0=OFF 1=ECO 2=TOUR 3=eMTB 4=TURBO 0x8088: ("battery", 1), # % 0x982D: ("speed", 10), # km/h (raw / 10) 0x985A: ("cadence", 2), # rpm (raw / 2) 0x985B: ("human_power", 1), # W 0x985D: ("motor_power", 1), # W } # --------------------------------------------------------------------------- # Shared telemetry state # --------------------------------------------------------------------------- telemetry = { "speed": 0.0, "cadence": 0, "human_power": 0, "motor_power": 0, "battery": 0, "assist_mode": "OFF", "ride_time": "00:00:00", "total_km": 0.0, "connected": False, "demo": False, "last_update": None, } connection_start: float = 0.0 csv_writer = None csv_file = None # --------------------------------------------------------------------------- # Varint decoder # --------------------------------------------------------------------------- def decode_varint(data: bytes, offset: int) -> tuple: """Decode a protobuf-style varint. Returns (value, bytes_read).""" result = 0 shift = 0 bytes_read = 0 while offset + bytes_read < len(data): b = data[offset + bytes_read] result |= (b & 0x7F) << shift bytes_read += 1 if (b & 0x80) == 0: break shift += 7 return result, bytes_read # --------------------------------------------------------------------------- # Bosch protocol parser # --------------------------------------------------------------------------- def parse_notification(data: bytes) -> dict: """ Parse a Bosch BLE notification. Format: [0x30] [length] [id_hi] [id_lo] [0x08] [varint data...] Multiple messages can be concatenated. """ parsed = {} offset = 0 while offset < len(data): # Expect 0x30 header if offset >= len(data): break header = data[offset] if header != 0x30: # Try to skip unknown bytes offset += 1 continue offset += 1 # skip 0x30 if offset >= len(data): break # Read length length = data[offset] offset += 1 if length == 0: continue msg_end = offset + length if msg_end > len(data): break # Read 2-byte data ID if offset + 1 >= len(data): break id_hi = data[offset] id_lo = data[offset + 1] data_id = (id_hi << 8) | id_lo offset += 2 # Length 0x02 means just the ID, value = 0 if length == 2: if data_id in KNOWN_DATA_IDS: field_name, divisor = KNOWN_DATA_IDS[data_id] parsed[field_name] = 0 log_raw(data, data_id, 0, 0) continue # Expect 0x08 marker before varint if offset < len(data) and data[offset] == 0x08: offset += 1 # Decode varint value if offset < msg_end: value, vbytes = decode_varint(data, offset) offset += vbytes if data_id in KNOWN_DATA_IDS: field_name, divisor = KNOWN_DATA_IDS[data_id] decoded = value / divisor if divisor > 1 else value parsed[field_name] = decoded log_raw(data, data_id, value, decoded) else: log_raw(data, data_id, value, None) else: offset = msg_end # Advance to end of this message if we haven't reached it if offset < msg_end: offset = msg_end return parsed def log_raw(raw_data: bytes, data_id: int, raw_value: int, decoded_value): """Log a decoded field to CSV.""" global csv_writer if csv_writer is None: return field_name = "unknown" if data_id in KNOWN_DATA_IDS: field_name = KNOWN_DATA_IDS[data_id][0] try: csv_writer.writerow([ datetime.now().isoformat(), raw_data.hex(), f"0x{data_id:04X}", field_name, raw_value, decoded_value if decoded_value is not None else "", ]) except Exception: pass # --------------------------------------------------------------------------- # Apply parsed data to global telemetry # --------------------------------------------------------------------------- def apply_parsed(parsed: dict): """Merge parsed BLE fields into the global telemetry dict.""" if "speed" in parsed: telemetry["speed"] = round(parsed["speed"], 1) if "cadence" in parsed: telemetry["cadence"] = int(parsed["cadence"]) if "human_power" in parsed: telemetry["human_power"] = int(parsed["human_power"]) if "motor_power" in parsed: telemetry["motor_power"] = int(parsed["motor_power"]) if "battery" in parsed: telemetry["battery"] = int(parsed["battery"]) if "assist_mode" in parsed: mode_val = int(parsed["assist_mode"]) telemetry["assist_mode"] = ASSIST_MODES.get(mode_val, f"UNK({mode_val})") # Update ride time if connection_start > 0: elapsed = time.time() - connection_start h, rem = divmod(int(elapsed), 3600) m, s = divmod(rem, 60) telemetry["ride_time"] = f"{h:02d}:{m:02d}:{s:02d}" telemetry["last_update"] = datetime.now().isoformat() # --------------------------------------------------------------------------- # Terminal output # --------------------------------------------------------------------------- def print_telemetry(): """Print one-line telemetry to terminal.""" ts = datetime.now().strftime("%H:%M:%S") mode = telemetry["assist_mode"] prefix = "[DEMO] " if telemetry["demo"] else "" print( f"\r{prefix}[{ts}] " f"Speed: {telemetry['speed']:5.1f} km/h | " f"Cadence: {telemetry['cadence']:3d} rpm | " f"Power: {telemetry['human_power']:3d}W | " f"Motor: {telemetry['motor_power']:3d}W | " f"Battery: {telemetry['battery']:3d}% | " f"Mode: {mode:<5s} | " f"Time: {telemetry['ride_time']}", end="", flush=True, ) # --------------------------------------------------------------------------- # Web dashboard (aiohttp) # --------------------------------------------------------------------------- DASHBOARD_HTML = """ OpusBike — BLE Test Dashboard

OpusBike BLE Test

Python bleak — Bosch Smart System
Disconnected
Speed
0.0
km/h
Cadence
0
rpm
Human Power
0
W
Motor Power
0
W
Battery
0%
Assist Mode
OFF
Ride Time
00:00:00
""" async def start_web_server(port: int = 3000): """Start aiohttp web server for the dashboard.""" try: from aiohttp import web except ImportError: print("\n[WARN] aiohttp not installed — dashboard disabled. Run: pip3 install aiohttp") return app = web.Application() async def handle_index(request): return web.Response(text=DASHBOARD_HTML, content_type="text/html") async def handle_api(request): return web.json_response(telemetry) app.router.add_get("/", handle_index) app.router.add_get("/api/telemetry", handle_api) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", port) await site.start() print(f"[INFO] Dashboard: http://localhost:{port}") # --------------------------------------------------------------------------- # Demo mode # --------------------------------------------------------------------------- async def run_demo(): """Generate fake BLE data simulating a ride.""" global connection_start telemetry["demo"] = True telemetry["connected"] = True telemetry["battery"] = 87 connection_start = time.time() modes = ["ECO", "TOUR", "eMTB", "TURBO", "TOUR", "ECO"] mode_idx = 0 tick = 0 total_km = 1847.2 print("[DEMO] Simulating Bosch Smart System ride data...") print("[DEMO] Press Ctrl+C to stop\n") while True: t = tick * 0.5 # 2 Hz # Speed: sinusoidal 0-35 km/h with noise speed = max(0, 22 + 13 * math.sin(t / 8) + 2 * math.sin(t / 3)) # Cadence correlates with speed cadence = int(max(0, speed * 2.8 + 5 * math.sin(t / 5))) # Human power correlates with cadence human_power = int(max(0, cadence * 1.4 + 10 * math.sin(t / 4))) # Motor power depends on mode mode_multiplier = {"OFF": 0, "ECO": 0.5, "TOUR": 1.0, "eMTB": 1.5, "TURBO": 2.2} mode_name = modes[mode_idx % len(modes)] motor_power = int(human_power * mode_multiplier.get(mode_name, 1.0)) # Battery decreases slowly battery = max(0, 87 - int(tick * 0.02)) # Total km increases with speed total_km += speed / 3600 * 0.5 # 0.5s tick telemetry["speed"] = round(speed, 1) telemetry["cadence"] = cadence telemetry["human_power"] = human_power telemetry["motor_power"] = motor_power telemetry["battery"] = battery telemetry["assist_mode"] = mode_name telemetry["total_km"] = round(total_km, 1) # Update ride time elapsed = time.time() - connection_start h, rem = divmod(int(elapsed), 3600) m, s = divmod(rem, 60) telemetry["ride_time"] = f"{h:02d}:{m:02d}:{s:02d}" print_telemetry() tick += 1 if tick % 30 == 0: # Change mode every 15s mode_idx += 1 await asyncio.sleep(0.5) # --------------------------------------------------------------------------- # Real BLE mode # --------------------------------------------------------------------------- async def run_ble(device_uuid: str = None): """Connect to real Bosch eBike via BLE and stream data.""" global connection_start try: from bleak import BleakClient, BleakScanner except ImportError: print("[ERROR] bleak not installed. Run: pip3 install bleak") sys.exit(1) # Scan for device print("[INFO] Scanning for Bosch Smart System eBike...") device = None if device_uuid: print(f"[INFO] Looking for device UUID: {device_uuid}") device = await BleakScanner.find_device_by_address(device_uuid, timeout=15.0) else: devices = await BleakScanner.discover(timeout=10.0) for d in devices: name = d.name or "" if any(f.lower() in name.lower() for f in DEVICE_NAME_FILTERS): device = d print(f"[INFO] Found: {d.name} ({d.address})") break if device is None: print("[ERROR] Bike not found. Make sure:") print(" 1. Bike is ON (battery connected)") print(" 2. LED Remote is active") print(" 3. Bike is paired in macOS System Settings > Bluetooth") print(" 4. No other device (Bosch Flow app) is connected to the bike") print(f"\nTip: Run with --demo to test without the bike") sys.exit(1) print(f"[INFO] Connecting to {device.name} ({device.address})...") def on_disconnect(client): elapsed = time.time() - connection_start if connection_start > 0 else 0 telemetry["connected"] = False print(f"\n[WARN] Disconnected after {elapsed:.1f}s") if 35 < elapsed < 40: print("[WARN] Disconnected at ~37s — BLE bonding likely failed!") print("[WARN] Pair the bike in macOS System Settings > Bluetooth first (R-000138)") else: print("[INFO] Will attempt reconnection in 3 seconds...") def notification_handler(sender, data: bytearray): """Handle incoming BLE notifications.""" parsed = parse_notification(bytes(data)) if parsed: apply_parsed(parsed) print_telemetry() # Connection loop with auto-reconnect while True: try: async with BleakClient( device.address, disconnected_callback=on_disconnect, timeout=20.0, ) as client: if not client.is_connected: print("[ERROR] Failed to connect") await asyncio.sleep(3) continue connection_start = time.time() telemetry["connected"] = True print(f"[INFO] Connected! Subscribing to {CHAR_UUID}...") # List services for debugging for service in client.services: if SERVICE_UUID.lower() in service.uuid.lower(): print(f"[INFO] Found Bosch service: {service.uuid}") for char in service.characteristics: props = ", ".join(char.properties) print(f" Char: {char.uuid} [{props}]") # Subscribe to notifications await client.start_notify(CHAR_UUID, notification_handler) print("[INFO] Subscribed — receiving data. Press Ctrl+C to stop.\n") # Keep alive — check connection every second while client.is_connected: await asyncio.sleep(1.0) except Exception as e: print(f"\n[ERROR] {e}") telemetry["connected"] = False print("[INFO] Reconnecting in 3 seconds...") await asyncio.sleep(3) # --------------------------------------------------------------------------- # CSV setup # --------------------------------------------------------------------------- def setup_csv(path: str): global csv_writer, csv_file csv_file = open(path, "a", newline="") csv_writer = csv.writer(csv_file) if os.path.getsize(path) == 0: csv_writer.writerow(["timestamp", "raw_hex", "id_hex", "field_name", "raw_value", "decoded_value"]) print(f"[INFO] CSV log: {path}") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main(): parser = argparse.ArgumentParser(description="OpusBike Bosch BLE Test (bleak)") parser.add_argument("--demo", action="store_true", help="Run with simulated data") parser.add_argument("--uuid", type=str, default=None, help="BLE device UUID/address") parser.add_argument("--port", type=int, default=3000, help="Dashboard port (default: 3000)") args = parser.parse_args() # CSV logging csv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ble_log.csv") if not args.demo: setup_csv(csv_path) # Start web dashboard await start_web_server(args.port) # Run BLE or demo if args.demo: await run_demo() else: await run_ble(args.uuid) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n\n[INFO] Stopped.") if csv_file: csv_file.close() sys.exit(0)