benlink.controller
Overview
This module provides a high-level async interface for communicating with and controlling Benshi radios over BLE.
Examples
To run the examples below, you will need to pair your radio with your computer,
locate the radio's device UUID (e.g. XX:XX:XX:XX:XX:XX
), and substitute it
into the example code.
Connecting To The Device
The following will connect to a radio and print its device info:
import asyncio
from benlink.controller import RadioController
async def main():
async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
print(radio.device_info)
asyncio.run(main())
You can also connect to the radio over RFCOMM:
import asyncio
from benlink.controller import RadioController
async def main():
async with RadioController.new_rfcomm("XX:XX:XX:XX:XX:XX", channel=1) as radio:
print(radio.device_info)
asyncio.run(main())
At the present, you have to figure out the correct channel number yourself (in Linux,
you can get a list of the available channels by running sdptool records XX:XX:XX:XX:XX:XX
).
Planned support of channel autodetection can be tracked in this issue.
Changing Settings
The following will connect to a radio and change the name of the first channel:
import asyncio
from benlink.controller import RadioController
async def main():
async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
print(f"Channel 0 name: {radio.channels[0].name}")
print("Setting 0 name to Foo...")
await radio.set_channel(0, name="Foo")
print("Done")
asyncio.run(main())
Handling Events
The RadioController
class provides a add_event_handler
method for
registering a callback function to handle events. The callback function
will be called with an EventMessage
object whenever an event is
received from the radio.
Note that add_event_handler
returns a function that can be called
to unregister the event handler.
import asyncio
from benlink.controller import RadioController
async def main():
async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
def handle_event(event):
print(f"Received event: {event}")
unregister = radio.add_event_handler(handle_event)
print("Try changing the channel or updating a radio setting")
print()
print("Press enter to quit...")
await asyncio.to_thread(input)
asyncio.run(main())
Interactive Usage
Python's async REPL is a great tool for interactively exploring the radio's capabilities. To run Python's REPL in async mode, run:
python -m asyncio
Instead of using the async context manager (async with RadioController(...) as radio:
),
you can use await radio.connect()
and await radio.disconnect()
to manage the
connection manually:
from benlink.controller import RadioController
radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX")
await radio.connect()
print(radio.device_info) # Prints device info
print(await radio.battery_voltage()) # Prints battery voltage
await radio.disconnect() # When you're done with your session disconnect nicely
Events registered with add_event_handler
will run in the background:
import asyncio
from benlink.controller import RadioController
radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX")
await radio.connect()
unsubscribe = radio.add_event_handler(lambda x: print(f"Received event: {x}
"))
# Change the channel on the radio a few times to generate some events
unsubscribe() # Unsubscribe the event handler
# Change the channel on the radio a few times to generate some events and
# observe that the event handler is no longer called
await radio.disconnect() # When you're done with your session disconnect nicely
(Note for IPython users: The IPython async REPL blocks the async event
loop while waiting for a prompt, so events will queue up until you defer
execution to the event loop by running something like await asyncio.sleep(0)
.)
1""" 2# Overview 3 4This module provides a high-level async interface for communicating 5with and controlling Benshi radios over BLE. 6 7# Examples 8 9To run the examples below, you will need to pair your radio with your computer, 10locate the radio's device UUID (e.g. `XX:XX:XX:XX:XX:XX`), and substitute it 11into the example code. 12 13## Connecting To The Device 14 15The following will connect to a radio and print its device info: 16 17```python 18import asyncio 19from benlink.controller import RadioController 20 21async def main(): 22 async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio: 23 print(radio.device_info) 24 25asyncio.run(main()) 26``` 27 28You can also connect to the radio over RFCOMM: 29 30```python 31import asyncio 32from benlink.controller import RadioController 33 34async def main(): 35 async with RadioController.new_rfcomm("XX:XX:XX:XX:XX:XX", channel=1) as radio: 36 print(radio.device_info) 37 38asyncio.run(main()) 39``` 40 41At the present, you have to figure out the correct channel number yourself (in Linux, 42you can get a list of the available channels by running `sdptool records XX:XX:XX:XX:XX:XX`). 43Planned support of channel autodetection can be tracked in [this issue](https://github.com/khusmann/benlink/issues/9). 44 45## Changing Settings 46 47The following will connect to a radio and change the name of the first channel: 48 49```python 50import asyncio 51from benlink.controller import RadioController 52 53 54async def main(): 55 async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio: 56 print(f"Channel 0 name: {radio.channels[0].name}") 57 print("Setting 0 name to Foo...") 58 await radio.set_channel(0, name="Foo") 59 print("Done") 60 61asyncio.run(main()) 62``` 63 64## Handling Events 65 66The `RadioController` class provides a `add_event_handler` method for 67registering a callback function to handle events. The callback function 68will be called with an `EventMessage` object whenever an event is 69received from the radio. 70 71Note that `add_event_handler` returns a function that can be called 72to unregister the event handler. 73 74```python 75import asyncio 76from benlink.controller import RadioController 77 78async def main(): 79 async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio: 80 def handle_event(event): 81 print(f"Received event: {event}") 82 83 unregister = radio.add_event_handler(handle_event) 84 85 print("Try changing the channel or updating a radio setting") 86 print() 87 print("Press enter to quit...") 88 await asyncio.to_thread(input) 89 90asyncio.run(main()) 91``` 92 93# Interactive Usage 94 95Python's async REPL is a great tool for interactively exploring the radio's 96capabilities. To run Python's REPL in async mode, run: 97 98```bash 99python -m asyncio 100``` 101 102Instead of using the async context manager (`async with RadioController(...) as radio:`), 103you can use `await radio.connect()` and `await radio.disconnect()` to manage the 104connection manually: 105 106```python 107from benlink.controller import RadioController 108 109radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX") 110 111await radio.connect() 112 113print(radio.device_info) # Prints device info 114 115print(await radio.battery_voltage()) # Prints battery voltage 116 117await radio.disconnect() # When you're done with your session disconnect nicely 118``` 119 120Events registered with `add_event_handler` will run in the background: 121 122```python 123import asyncio 124from benlink.controller import RadioController 125 126radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX") 127 128await radio.connect() 129 130unsubscribe = radio.add_event_handler(lambda x: print(f"Received event: {x}\n")) 131 132# Change the channel on the radio a few times to generate some events 133 134unsubscribe() # Unsubscribe the event handler 135 136# Change the channel on the radio a few times to generate some events and 137# observe that the event handler is no longer called 138 139await radio.disconnect() # When you're done with your session disconnect nicely 140``` 141 142(Note for IPython users: The IPython async REPL blocks the async event 143loop while waiting for a prompt, so events will queue up until you defer 144execution to the event loop by running something like `await asyncio.sleep(0)`.) 145""" 146 147 148from __future__ import annotations 149from typing_extensions import Unpack 150from dataclasses import dataclass 151import typing as t 152import sys 153 154from .command import ( 155 CommandConnection, 156 EventHandler, 157 DeviceInfo, 158 Channel, 159 ChannelArgs, 160 Settings, 161 SettingsArgs, 162 BeaconSettings, 163 BeaconSettingsArgs, 164 TncDataFragment, 165 EventMessage, 166 EventType, 167 SettingsChangedEvent, 168 TncDataFragmentReceivedEvent, 169 ChannelChangedEvent, 170 StatusChangedEvent, 171 UnknownProtocolMessage, 172 Status, 173 Position, 174) 175 176 177@dataclass 178class _RadioState: 179 device_info: DeviceInfo 180 beacon_settings: BeaconSettings 181 status: Status 182 settings: Settings 183 channels: t.List[Channel] 184 185 186class RadioController: 187 _conn: CommandConnection 188 _state: _RadioState | None 189 190 def __init__(self, connection: CommandConnection): 191 self._conn = connection 192 self._state = None 193 194 @classmethod 195 def new_ble(cls, device_uuid: str) -> RadioController: 196 return RadioController(CommandConnection.new_ble(device_uuid)) 197 198 @classmethod 199 def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> RadioController: 200 return RadioController(CommandConnection.new_rfcomm(device_uuid, channel)) 201 202 def __repr__(self): 203 if not self.is_connected(): 204 return f"<{self.__class__.__name__} (disconnected)>" 205 return f"<{self.__class__.__name__} (connected)>" 206 207 @property 208 def beacon_settings(self) -> BeaconSettings: 209 if self._state is None: 210 raise StateNotInitializedError() 211 return self._state.beacon_settings 212 213 async def set_beacon_settings(self, **packet_settings_args: Unpack[BeaconSettingsArgs]): 214 if self._state is None: 215 raise StateNotInitializedError() 216 217 new_beacon_settings = self._state.beacon_settings.model_copy( 218 update=dict(packet_settings_args) 219 ) 220 221 await self._conn.set_beacon_settings(new_beacon_settings) 222 223 self._state.beacon_settings = new_beacon_settings 224 225 @property 226 def status(self) -> Status: 227 if self._state is None: 228 raise StateNotInitializedError() 229 return self._state.status 230 231 @property 232 def settings(self) -> Settings: 233 if self._state is None: 234 raise StateNotInitializedError() 235 return self._state.settings 236 237 async def set_settings(self, **settings_args: Unpack[SettingsArgs]): 238 if self._state is None: 239 raise StateNotInitializedError() 240 241 new_settings = self._state.settings.model_copy( 242 update=dict(settings_args) 243 ) 244 245 await self._conn.set_settings(new_settings) 246 247 self._state.settings = new_settings 248 249 @property 250 def device_info(self) -> DeviceInfo: 251 if self._state is None: 252 raise StateNotInitializedError() 253 return self._state.device_info 254 255 @property 256 def channels(self) -> t.List[Channel]: 257 if self._state is None: 258 raise StateNotInitializedError() 259 return self._state.channels 260 261 async def set_channel( 262 self, channel_id: int, **channel_args: Unpack[ChannelArgs] 263 ): 264 if self._state is None: 265 raise StateNotInitializedError() 266 267 new_channel = self._state.channels[channel_id].model_copy( 268 update=dict(channel_args) 269 ) 270 271 await self._conn.set_channel(new_channel) 272 273 self._state.channels[channel_id] = new_channel 274 275 def is_connected(self) -> bool: 276 return self._state is not None and self._conn.is_connected() 277 278 async def send_bytes(self, command: bytes) -> None: 279 """For debugging - Use at your own risk!""" 280 await self._conn.send_bytes(command) 281 282 async def battery_voltage(self) -> float: 283 return await self._conn.get_battery_voltage() 284 285 async def battery_level(self) -> int: 286 return await self._conn.get_battery_level() 287 288 async def battery_level_as_percentage(self) -> int: 289 return await self._conn.get_battery_level_as_percentage() 290 291 async def rc_battery_level(self) -> int: 292 return await self._conn.get_rc_battery_level() 293 294 async def position(self) -> Position: 295 return await self._conn.get_position() 296 297 async def send_tnc_data(self, data: bytes) -> None: 298 if len(data) > 50: 299 raise ValueError("Data too long -- TODO: implement fragmentation") 300 301 await self._conn.send_tnc_data_fragment(TncDataFragment( 302 is_final_fragment=True, 303 fragment_id=0, 304 data=data 305 )) 306 307 def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]: 308 return self._conn.add_event_handler(handler) 309 310 async def enable_event(self, event_type: EventType): 311 await self._conn.enable_event(event_type) 312 313 async def _hydrate(self) -> None: 314 device_info = await self._conn.get_device_info() 315 316 channels: t.List[Channel] = [] 317 318 for i in range(device_info.channel_count): 319 channel_settings = await self._conn.get_channel(i) 320 channels.append(channel_settings) 321 322 settings = await self._conn.get_settings() 323 324 beacon_settings = await self._conn.get_beacon_settings() 325 326 status = await self._conn.get_status() 327 328 # No need to save the remove event handler function, since we don't 329 # need to unregister it when we disconnect (the connection will take care of that) 330 self._conn.add_event_handler( 331 self._on_event_message 332 ) 333 334 # For some reason, enabling the HT_STATUS_CHANGED event 335 # also enables the DATA_RXD event, and maybe others... 336 # need to investigate further. 337 await self.enable_event("HT_STATUS_CHANGED") 338 339 # TODO: should these events be enabled by default? perhaps I should have 340 # users enable events manually, while simultaneously registering handlers 341 # of the proper type? 342 343 self._state = _RadioState( 344 device_info=device_info, 345 beacon_settings=beacon_settings, 346 status=status, 347 settings=settings, 348 channels=channels, 349 ) 350 351 def _on_event_message(self, event_message: EventMessage) -> None: 352 if self._state is None: 353 raise ValueError( 354 "Radio state not initialized. Try calling connect() first." 355 ) 356 357 match event_message: 358 case ChannelChangedEvent(channel): 359 self._state.channels[channel.channel_id] = channel 360 case SettingsChangedEvent(settings): 361 self._state.settings = settings 362 case TncDataFragmentReceivedEvent(): 363 pass 364 case StatusChangedEvent(status): 365 self._state.status = status 366 case UnknownProtocolMessage(message): 367 print( 368 f"[DEBUG] Unknown protocol message: {message}", 369 file=sys.stderr 370 ) 371 372 # Async Context Manager 373 async def __aenter__(self): 374 await self.connect() 375 return self 376 377 async def __aexit__( 378 self, 379 exc_type: t.Any, 380 exc_value: t.Any, 381 traceback: t.Any, 382 ) -> None: 383 await self.disconnect() 384 385 async def connect(self) -> None: 386 if self._state is not None: 387 raise RuntimeError("Already connected") 388 389 await self._conn.connect() 390 await self._hydrate() 391 392 async def disconnect(self) -> None: 393 if self._state is None: 394 raise StateNotInitializedError() 395 396 await self._conn.disconnect() 397 self._state = None 398 399 400class StateNotInitializedError(RuntimeError): 401 """Raised when trying to access radio state before it has been initialized.""" 402 403 def __init__(self): 404 super().__init__( 405 "Radio state not initialized. Try calling connect() first." 406 )
187class RadioController: 188 _conn: CommandConnection 189 _state: _RadioState | None 190 191 def __init__(self, connection: CommandConnection): 192 self._conn = connection 193 self._state = None 194 195 @classmethod 196 def new_ble(cls, device_uuid: str) -> RadioController: 197 return RadioController(CommandConnection.new_ble(device_uuid)) 198 199 @classmethod 200 def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> RadioController: 201 return RadioController(CommandConnection.new_rfcomm(device_uuid, channel)) 202 203 def __repr__(self): 204 if not self.is_connected(): 205 return f"<{self.__class__.__name__} (disconnected)>" 206 return f"<{self.__class__.__name__} (connected)>" 207 208 @property 209 def beacon_settings(self) -> BeaconSettings: 210 if self._state is None: 211 raise StateNotInitializedError() 212 return self._state.beacon_settings 213 214 async def set_beacon_settings(self, **packet_settings_args: Unpack[BeaconSettingsArgs]): 215 if self._state is None: 216 raise StateNotInitializedError() 217 218 new_beacon_settings = self._state.beacon_settings.model_copy( 219 update=dict(packet_settings_args) 220 ) 221 222 await self._conn.set_beacon_settings(new_beacon_settings) 223 224 self._state.beacon_settings = new_beacon_settings 225 226 @property 227 def status(self) -> Status: 228 if self._state is None: 229 raise StateNotInitializedError() 230 return self._state.status 231 232 @property 233 def settings(self) -> Settings: 234 if self._state is None: 235 raise StateNotInitializedError() 236 return self._state.settings 237 238 async def set_settings(self, **settings_args: Unpack[SettingsArgs]): 239 if self._state is None: 240 raise StateNotInitializedError() 241 242 new_settings = self._state.settings.model_copy( 243 update=dict(settings_args) 244 ) 245 246 await self._conn.set_settings(new_settings) 247 248 self._state.settings = new_settings 249 250 @property 251 def device_info(self) -> DeviceInfo: 252 if self._state is None: 253 raise StateNotInitializedError() 254 return self._state.device_info 255 256 @property 257 def channels(self) -> t.List[Channel]: 258 if self._state is None: 259 raise StateNotInitializedError() 260 return self._state.channels 261 262 async def set_channel( 263 self, channel_id: int, **channel_args: Unpack[ChannelArgs] 264 ): 265 if self._state is None: 266 raise StateNotInitializedError() 267 268 new_channel = self._state.channels[channel_id].model_copy( 269 update=dict(channel_args) 270 ) 271 272 await self._conn.set_channel(new_channel) 273 274 self._state.channels[channel_id] = new_channel 275 276 def is_connected(self) -> bool: 277 return self._state is not None and self._conn.is_connected() 278 279 async def send_bytes(self, command: bytes) -> None: 280 """For debugging - Use at your own risk!""" 281 await self._conn.send_bytes(command) 282 283 async def battery_voltage(self) -> float: 284 return await self._conn.get_battery_voltage() 285 286 async def battery_level(self) -> int: 287 return await self._conn.get_battery_level() 288 289 async def battery_level_as_percentage(self) -> int: 290 return await self._conn.get_battery_level_as_percentage() 291 292 async def rc_battery_level(self) -> int: 293 return await self._conn.get_rc_battery_level() 294 295 async def position(self) -> Position: 296 return await self._conn.get_position() 297 298 async def send_tnc_data(self, data: bytes) -> None: 299 if len(data) > 50: 300 raise ValueError("Data too long -- TODO: implement fragmentation") 301 302 await self._conn.send_tnc_data_fragment(TncDataFragment( 303 is_final_fragment=True, 304 fragment_id=0, 305 data=data 306 )) 307 308 def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]: 309 return self._conn.add_event_handler(handler) 310 311 async def enable_event(self, event_type: EventType): 312 await self._conn.enable_event(event_type) 313 314 async def _hydrate(self) -> None: 315 device_info = await self._conn.get_device_info() 316 317 channels: t.List[Channel] = [] 318 319 for i in range(device_info.channel_count): 320 channel_settings = await self._conn.get_channel(i) 321 channels.append(channel_settings) 322 323 settings = await self._conn.get_settings() 324 325 beacon_settings = await self._conn.get_beacon_settings() 326 327 status = await self._conn.get_status() 328 329 # No need to save the remove event handler function, since we don't 330 # need to unregister it when we disconnect (the connection will take care of that) 331 self._conn.add_event_handler( 332 self._on_event_message 333 ) 334 335 # For some reason, enabling the HT_STATUS_CHANGED event 336 # also enables the DATA_RXD event, and maybe others... 337 # need to investigate further. 338 await self.enable_event("HT_STATUS_CHANGED") 339 340 # TODO: should these events be enabled by default? perhaps I should have 341 # users enable events manually, while simultaneously registering handlers 342 # of the proper type? 343 344 self._state = _RadioState( 345 device_info=device_info, 346 beacon_settings=beacon_settings, 347 status=status, 348 settings=settings, 349 channels=channels, 350 ) 351 352 def _on_event_message(self, event_message: EventMessage) -> None: 353 if self._state is None: 354 raise ValueError( 355 "Radio state not initialized. Try calling connect() first." 356 ) 357 358 match event_message: 359 case ChannelChangedEvent(channel): 360 self._state.channels[channel.channel_id] = channel 361 case SettingsChangedEvent(settings): 362 self._state.settings = settings 363 case TncDataFragmentReceivedEvent(): 364 pass 365 case StatusChangedEvent(status): 366 self._state.status = status 367 case UnknownProtocolMessage(message): 368 print( 369 f"[DEBUG] Unknown protocol message: {message}", 370 file=sys.stderr 371 ) 372 373 # Async Context Manager 374 async def __aenter__(self): 375 await self.connect() 376 return self 377 378 async def __aexit__( 379 self, 380 exc_type: t.Any, 381 exc_value: t.Any, 382 traceback: t.Any, 383 ) -> None: 384 await self.disconnect() 385 386 async def connect(self) -> None: 387 if self._state is not None: 388 raise RuntimeError("Already connected") 389 390 await self._conn.connect() 391 await self._hydrate() 392 393 async def disconnect(self) -> None: 394 if self._state is None: 395 raise StateNotInitializedError() 396 397 await self._conn.disconnect() 398 self._state = None
214 async def set_beacon_settings(self, **packet_settings_args: Unpack[BeaconSettingsArgs]): 215 if self._state is None: 216 raise StateNotInitializedError() 217 218 new_beacon_settings = self._state.beacon_settings.model_copy( 219 update=dict(packet_settings_args) 220 ) 221 222 await self._conn.set_beacon_settings(new_beacon_settings) 223 224 self._state.beacon_settings = new_beacon_settings
238 async def set_settings(self, **settings_args: Unpack[SettingsArgs]): 239 if self._state is None: 240 raise StateNotInitializedError() 241 242 new_settings = self._state.settings.model_copy( 243 update=dict(settings_args) 244 ) 245 246 await self._conn.set_settings(new_settings) 247 248 self._state.settings = new_settings
262 async def set_channel( 263 self, channel_id: int, **channel_args: Unpack[ChannelArgs] 264 ): 265 if self._state is None: 266 raise StateNotInitializedError() 267 268 new_channel = self._state.channels[channel_id].model_copy( 269 update=dict(channel_args) 270 ) 271 272 await self._conn.set_channel(new_channel) 273 274 self._state.channels[channel_id] = new_channel
279 async def send_bytes(self, command: bytes) -> None: 280 """For debugging - Use at your own risk!""" 281 await self._conn.send_bytes(command)
For debugging - Use at your own risk!
401class StateNotInitializedError(RuntimeError): 402 """Raised when trying to access radio state before it has been initialized.""" 403 404 def __init__(self): 405 super().__init__( 406 "Radio state not initialized. Try calling connect() first." 407 )
Raised when trying to access radio state before it has been initialized.