#!/usr/bin/env python3
"""
OpusBike — Bosch Smart System BLE Test Script (bleak)
Connects to a pre-paired Bosch eBike via BLE, decodes telemetry,
prints to terminal, serves dashboard at localhost:3000, logs to CSV.
Usage:
python3 cli/test_bleak.py # Real BLE mode
python3 cli/test_bleak.py --demo # Simulated data
python3 cli/test_bleak.py --uuid XX # Specific device UUID
"""
import argparse
import asyncio
import csv
import json
import math
import os
import signal
import sys
import time
from datetime import datetime, timedelta
# ---------------------------------------------------------------------------
# Bosch BLE constants
# ---------------------------------------------------------------------------
SERVICE_UUID = "00000010-eaa2-11e9-81b4-2a2ae2dbcce4"
CHAR_UUID = "00000011-eaa2-11e9-81b4-2a2ae2dbcce4"
DEVICE_NAME_FILTERS = ["smart system", "Bosch", "BRC"]
ASSIST_MODES = {0: "OFF", 1: "ECO", 2: "TOUR", 3: "eMTB", 4: "TURBO"}
# Confirmed data IDs from nRF Connect Phase 2 testing
KNOWN_DATA_IDS = {
0x9809: ("assist_mode", 1), # 0=OFF 1=ECO 2=TOUR 3=eMTB 4=TURBO
0x8088: ("battery", 1), # %
0x982D: ("speed", 10), # km/h (raw / 10)
0x985A: ("cadence", 2), # rpm (raw / 2)
0x985B: ("human_power", 1), # W
0x985D: ("motor_power", 1), # W
}
# ---------------------------------------------------------------------------
# Shared telemetry state
# ---------------------------------------------------------------------------
telemetry = {
"speed": 0.0,
"cadence": 0,
"human_power": 0,
"motor_power": 0,
"battery": 0,
"assist_mode": "OFF",
"ride_time": "00:00:00",
"total_km": 0.0,
"connected": False,
"demo": False,
"last_update": None,
}
connection_start: float = 0.0
csv_writer = None
csv_file = None
# ---------------------------------------------------------------------------
# Varint decoder
# ---------------------------------------------------------------------------
def decode_varint(data: bytes, offset: int) -> tuple:
"""Decode a protobuf-style varint. Returns (value, bytes_read)."""
result = 0
shift = 0
bytes_read = 0
while offset + bytes_read < len(data):
b = data[offset + bytes_read]
result |= (b & 0x7F) << shift
bytes_read += 1
if (b & 0x80) == 0:
break
shift += 7
return result, bytes_read
# ---------------------------------------------------------------------------
# Bosch protocol parser
# ---------------------------------------------------------------------------
def parse_notification(data: bytes) -> dict:
"""
Parse a Bosch BLE notification.
Format: [0x30] [length] [id_hi] [id_lo] [0x08] [varint data...]
Multiple messages can be concatenated.
"""
parsed = {}
offset = 0
while offset < len(data):
# Expect 0x30 header
if offset >= len(data):
break
header = data[offset]
if header != 0x30:
# Try to skip unknown bytes
offset += 1
continue
offset += 1 # skip 0x30
if offset >= len(data):
break
# Read length
length = data[offset]
offset += 1
if length == 0:
continue
msg_end = offset + length
if msg_end > len(data):
break
# Read 2-byte data ID
if offset + 1 >= len(data):
break
id_hi = data[offset]
id_lo = data[offset + 1]
data_id = (id_hi << 8) | id_lo
offset += 2
# Length 0x02 means just the ID, value = 0
if length == 2:
if data_id in KNOWN_DATA_IDS:
field_name, divisor = KNOWN_DATA_IDS[data_id]
parsed[field_name] = 0
log_raw(data, data_id, 0, 0)
continue
# Expect 0x08 marker before varint
if offset < len(data) and data[offset] == 0x08:
offset += 1
# Decode varint value
if offset < msg_end:
value, vbytes = decode_varint(data, offset)
offset += vbytes
if data_id in KNOWN_DATA_IDS:
field_name, divisor = KNOWN_DATA_IDS[data_id]
decoded = value / divisor if divisor > 1 else value
parsed[field_name] = decoded
log_raw(data, data_id, value, decoded)
else:
log_raw(data, data_id, value, None)
else:
offset = msg_end
# Advance to end of this message if we haven't reached it
if offset < msg_end:
offset = msg_end
return parsed
def log_raw(raw_data: bytes, data_id: int, raw_value: int, decoded_value):
"""Log a decoded field to CSV."""
global csv_writer
if csv_writer is None:
return
field_name = "unknown"
if data_id in KNOWN_DATA_IDS:
field_name = KNOWN_DATA_IDS[data_id][0]
try:
csv_writer.writerow([
datetime.now().isoformat(),
raw_data.hex(),
f"0x{data_id:04X}",
field_name,
raw_value,
decoded_value if decoded_value is not None else "",
])
except Exception:
pass
# ---------------------------------------------------------------------------
# Apply parsed data to global telemetry
# ---------------------------------------------------------------------------
def apply_parsed(parsed: dict):
"""Merge parsed BLE fields into the global telemetry dict."""
if "speed" in parsed:
telemetry["speed"] = round(parsed["speed"], 1)
if "cadence" in parsed:
telemetry["cadence"] = int(parsed["cadence"])
if "human_power" in parsed:
telemetry["human_power"] = int(parsed["human_power"])
if "motor_power" in parsed:
telemetry["motor_power"] = int(parsed["motor_power"])
if "battery" in parsed:
telemetry["battery"] = int(parsed["battery"])
if "assist_mode" in parsed:
mode_val = int(parsed["assist_mode"])
telemetry["assist_mode"] = ASSIST_MODES.get(mode_val, f"UNK({mode_val})")
# Update ride time
if connection_start > 0:
elapsed = time.time() - connection_start
h, rem = divmod(int(elapsed), 3600)
m, s = divmod(rem, 60)
telemetry["ride_time"] = f"{h:02d}:{m:02d}:{s:02d}"
telemetry["last_update"] = datetime.now().isoformat()
# ---------------------------------------------------------------------------
# Terminal output
# ---------------------------------------------------------------------------
def print_telemetry():
"""Print one-line telemetry to terminal."""
ts = datetime.now().strftime("%H:%M:%S")
mode = telemetry["assist_mode"]
prefix = "[DEMO] " if telemetry["demo"] else ""
print(
f"\r{prefix}[{ts}] "
f"Speed: {telemetry['speed']:5.1f} km/h | "
f"Cadence: {telemetry['cadence']:3d} rpm | "
f"Power: {telemetry['human_power']:3d}W | "
f"Motor: {telemetry['motor_power']:3d}W | "
f"Battery: {telemetry['battery']:3d}% | "
f"Mode: {mode:<5s} | "
f"Time: {telemetry['ride_time']}",
end="", flush=True,
)
# ---------------------------------------------------------------------------
# Web dashboard (aiohttp)
# ---------------------------------------------------------------------------
DASHBOARD_HTML = """
OpusBike — BLE Test Dashboard
OpusBike BLE TestDEMO
Python bleak — Bosch Smart System
Disconnected
"""
async def start_web_server(port: int = 3000):
"""Start aiohttp web server for the dashboard."""
try:
from aiohttp import web
except ImportError:
print("\n[WARN] aiohttp not installed — dashboard disabled. Run: pip3 install aiohttp")
return
app = web.Application()
async def handle_index(request):
return web.Response(text=DASHBOARD_HTML, content_type="text/html")
async def handle_api(request):
return web.json_response(telemetry)
app.router.add_get("/", handle_index)
app.router.add_get("/api/telemetry", handle_api)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
print(f"[INFO] Dashboard: http://localhost:{port}")
# ---------------------------------------------------------------------------
# Demo mode
# ---------------------------------------------------------------------------
async def run_demo():
"""Generate fake BLE data simulating a ride."""
global connection_start
telemetry["demo"] = True
telemetry["connected"] = True
telemetry["battery"] = 87
connection_start = time.time()
modes = ["ECO", "TOUR", "eMTB", "TURBO", "TOUR", "ECO"]
mode_idx = 0
tick = 0
total_km = 1847.2
print("[DEMO] Simulating Bosch Smart System ride data...")
print("[DEMO] Press Ctrl+C to stop\n")
while True:
t = tick * 0.5 # 2 Hz
# Speed: sinusoidal 0-35 km/h with noise
speed = max(0, 22 + 13 * math.sin(t / 8) + 2 * math.sin(t / 3))
# Cadence correlates with speed
cadence = int(max(0, speed * 2.8 + 5 * math.sin(t / 5)))
# Human power correlates with cadence
human_power = int(max(0, cadence * 1.4 + 10 * math.sin(t / 4)))
# Motor power depends on mode
mode_multiplier = {"OFF": 0, "ECO": 0.5, "TOUR": 1.0, "eMTB": 1.5, "TURBO": 2.2}
mode_name = modes[mode_idx % len(modes)]
motor_power = int(human_power * mode_multiplier.get(mode_name, 1.0))
# Battery decreases slowly
battery = max(0, 87 - int(tick * 0.02))
# Total km increases with speed
total_km += speed / 3600 * 0.5 # 0.5s tick
telemetry["speed"] = round(speed, 1)
telemetry["cadence"] = cadence
telemetry["human_power"] = human_power
telemetry["motor_power"] = motor_power
telemetry["battery"] = battery
telemetry["assist_mode"] = mode_name
telemetry["total_km"] = round(total_km, 1)
# Update ride time
elapsed = time.time() - connection_start
h, rem = divmod(int(elapsed), 3600)
m, s = divmod(rem, 60)
telemetry["ride_time"] = f"{h:02d}:{m:02d}:{s:02d}"
print_telemetry()
tick += 1
if tick % 30 == 0: # Change mode every 15s
mode_idx += 1
await asyncio.sleep(0.5)
# ---------------------------------------------------------------------------
# Real BLE mode
# ---------------------------------------------------------------------------
async def run_ble(device_uuid: str = None):
"""Connect to real Bosch eBike via BLE and stream data."""
global connection_start
try:
from bleak import BleakClient, BleakScanner
except ImportError:
print("[ERROR] bleak not installed. Run: pip3 install bleak")
sys.exit(1)
# Scan for device
print("[INFO] Scanning for Bosch Smart System eBike...")
device = None
if device_uuid:
print(f"[INFO] Looking for device UUID: {device_uuid}")
device = await BleakScanner.find_device_by_address(device_uuid, timeout=15.0)
else:
devices = await BleakScanner.discover(timeout=10.0)
for d in devices:
name = d.name or ""
if any(f.lower() in name.lower() for f in DEVICE_NAME_FILTERS):
device = d
print(f"[INFO] Found: {d.name} ({d.address})")
break
if device is None:
print("[ERROR] Bike not found. Make sure:")
print(" 1. Bike is ON (battery connected)")
print(" 2. LED Remote is active")
print(" 3. Bike is paired in macOS System Settings > Bluetooth")
print(" 4. No other device (Bosch Flow app) is connected to the bike")
print(f"\nTip: Run with --demo to test without the bike")
sys.exit(1)
print(f"[INFO] Connecting to {device.name} ({device.address})...")
def on_disconnect(client):
elapsed = time.time() - connection_start if connection_start > 0 else 0
telemetry["connected"] = False
print(f"\n[WARN] Disconnected after {elapsed:.1f}s")
if 35 < elapsed < 40:
print("[WARN] Disconnected at ~37s — BLE bonding likely failed!")
print("[WARN] Pair the bike in macOS System Settings > Bluetooth first (R-000138)")
else:
print("[INFO] Will attempt reconnection in 3 seconds...")
def notification_handler(sender, data: bytearray):
"""Handle incoming BLE notifications."""
parsed = parse_notification(bytes(data))
if parsed:
apply_parsed(parsed)
print_telemetry()
# Connection loop with auto-reconnect
while True:
try:
async with BleakClient(
device.address,
disconnected_callback=on_disconnect,
timeout=20.0,
) as client:
if not client.is_connected:
print("[ERROR] Failed to connect")
await asyncio.sleep(3)
continue
connection_start = time.time()
telemetry["connected"] = True
print(f"[INFO] Connected! Subscribing to {CHAR_UUID}...")
# List services for debugging
for service in client.services:
if SERVICE_UUID.lower() in service.uuid.lower():
print(f"[INFO] Found Bosch service: {service.uuid}")
for char in service.characteristics:
props = ", ".join(char.properties)
print(f" Char: {char.uuid} [{props}]")
# Subscribe to notifications
await client.start_notify(CHAR_UUID, notification_handler)
print("[INFO] Subscribed — receiving data. Press Ctrl+C to stop.\n")
# Keep alive — check connection every second
while client.is_connected:
await asyncio.sleep(1.0)
except Exception as e:
print(f"\n[ERROR] {e}")
telemetry["connected"] = False
print("[INFO] Reconnecting in 3 seconds...")
await asyncio.sleep(3)
# ---------------------------------------------------------------------------
# CSV setup
# ---------------------------------------------------------------------------
def setup_csv(path: str):
global csv_writer, csv_file
csv_file = open(path, "a", newline="")
csv_writer = csv.writer(csv_file)
if os.path.getsize(path) == 0:
csv_writer.writerow(["timestamp", "raw_hex", "id_hex", "field_name", "raw_value", "decoded_value"])
print(f"[INFO] CSV log: {path}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main():
parser = argparse.ArgumentParser(description="OpusBike Bosch BLE Test (bleak)")
parser.add_argument("--demo", action="store_true", help="Run with simulated data")
parser.add_argument("--uuid", type=str, default=None, help="BLE device UUID/address")
parser.add_argument("--port", type=int, default=3000, help="Dashboard port (default: 3000)")
args = parser.parse_args()
# CSV logging
csv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ble_log.csv")
if not args.demo:
setup_csv(csv_path)
# Start web dashboard
await start_web_server(args.port)
# Run BLE or demo
if args.demo:
await run_demo()
else:
await run_ble(args.uuid)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n[INFO] Stopped.")
if csv_file:
csv_file.close()
sys.exit(0)