Edit on GitHub

benlink.controller

Overview

This module provides a high-level async interface for communicating with and controlling Benshi radios over BLE.

Examples

To run the examples below, you will need to pair your radio with your computer, locate the radio's device UUID (e.g. XX:XX:XX:XX:XX:XX), and substitute it into the example code.

Connecting To The Device

The following will connect to a radio and print its device info:

import asyncio
from benlink.controller import RadioController

async def main():
    async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
        print(radio.device_info)

asyncio.run(main())

You can also connect to the radio over RFCOMM:

import asyncio
from benlink.controller import RadioController

async def main():
    async with RadioController.new_rfcomm("XX:XX:XX:XX:XX:XX", channel=1) as radio:
        print(radio.device_info)

asyncio.run(main())

At the present, you have to figure out the correct channel number yourself (in Linux, you can get a list of the available channels by running sdptool records XX:XX:XX:XX:XX:XX). Planned support of channel autodetection can be tracked in this issue.

Changing Settings

The following will connect to a radio and change the name of the first channel:

import asyncio
from benlink.controller import RadioController


async def main():
    async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
        print(f"Channel 0 name: {radio.channels[0].name}")
        print("Setting 0 name to Foo...")
        await radio.set_channel(0, name="Foo")
        print("Done")

asyncio.run(main())

Handling Events

The RadioController class provides a add_event_handler method for registering a callback function to handle events. The callback function will be called with an EventMessage object whenever an event is received from the radio.

Note that add_event_handler returns a function that can be called to unregister the event handler.

import asyncio
from benlink.controller import RadioController

async def main():
    async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
        def handle_event(event):
            print(f"Received event: {event}")

        unregister = radio.add_event_handler(handle_event)

        print("Try changing the channel or updating a radio setting")
        print()
        print("Press enter to quit...")
        await asyncio.to_thread(input)

asyncio.run(main())

Interactive Usage

Python's async REPL is a great tool for interactively exploring the radio's capabilities. To run Python's REPL in async mode, run:

python -m asyncio

Instead of using the async context manager (async with RadioController(...) as radio:), you can use await radio.connect() and await radio.disconnect() to manage the connection manually:

from benlink.controller import RadioController

radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX")

await radio.connect()

print(radio.device_info) # Prints device info

print(await radio.battery_voltage()) # Prints battery voltage

await radio.disconnect() # When you're done with your session disconnect nicely

Events registered with add_event_handler will run in the background:

import asyncio
from benlink.controller import RadioController

radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX")

await radio.connect()

unsubscribe = radio.add_event_handler(lambda x: print(f"Received event: {x}
"))

# Change the channel on the radio a few times to generate some events

unsubscribe() # Unsubscribe the event handler

# Change the channel on the radio a few times to generate some events and
# observe that the event handler is no longer called

await radio.disconnect() # When you're done with your session disconnect nicely

(Note for IPython users: The IPython async REPL blocks the async event loop while waiting for a prompt, so events will queue up until you defer execution to the event loop by running something like await asyncio.sleep(0).)

  1"""
  2# Overview
  3
  4This module provides a high-level async interface for communicating
  5with and controlling Benshi radios over BLE.
  6
  7# Examples
  8
  9To run the examples below, you will need to pair your radio with your computer,
 10locate the radio's device UUID (e.g. `XX:XX:XX:XX:XX:XX`), and substitute it
 11into the example code.
 12
 13## Connecting To The Device
 14
 15The following will connect to a radio and print its device info:
 16
 17```python
 18import asyncio
 19from benlink.controller import RadioController
 20
 21async def main():
 22    async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
 23        print(radio.device_info)
 24
 25asyncio.run(main())
 26```
 27
 28You can also connect to the radio over RFCOMM:
 29
 30```python
 31import asyncio
 32from benlink.controller import RadioController
 33
 34async def main():
 35    async with RadioController.new_rfcomm("XX:XX:XX:XX:XX:XX", channel=1) as radio:
 36        print(radio.device_info)
 37
 38asyncio.run(main())
 39```
 40
 41At the present, you have to figure out the correct channel number yourself (in Linux,
 42you can get a list of the available channels by running `sdptool records XX:XX:XX:XX:XX:XX`).
 43Planned support of channel autodetection can be tracked in [this issue](https://github.com/khusmann/benlink/issues/9).
 44
 45## Changing Settings
 46
 47The following will connect to a radio and change the name of the first channel:
 48
 49```python
 50import asyncio
 51from benlink.controller import RadioController
 52
 53
 54async def main():
 55    async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
 56        print(f"Channel 0 name: {radio.channels[0].name}")
 57        print("Setting 0 name to Foo...")
 58        await radio.set_channel(0, name="Foo")
 59        print("Done")
 60
 61asyncio.run(main())
 62```
 63
 64## Handling Events
 65
 66The `RadioController` class provides a `add_event_handler` method for
 67registering a callback function to handle events. The callback function
 68will be called with an `EventMessage` object whenever an event is
 69received from the radio.
 70
 71Note that `add_event_handler` returns a function that can be called
 72to unregister the event handler.
 73
 74```python
 75import asyncio
 76from benlink.controller import RadioController
 77
 78async def main():
 79    async with RadioController.new_ble("XX:XX:XX:XX:XX:XX") as radio:
 80        def handle_event(event):
 81            print(f"Received event: {event}")
 82
 83        unregister = radio.add_event_handler(handle_event)
 84
 85        print("Try changing the channel or updating a radio setting")
 86        print()
 87        print("Press enter to quit...")
 88        await asyncio.to_thread(input)
 89
 90asyncio.run(main())
 91```
 92
 93# Interactive Usage
 94
 95Python's async REPL is a great tool for interactively exploring the radio's
 96capabilities. To run Python's REPL in async mode, run:
 97
 98```bash
 99python -m asyncio
100```
101
102Instead of using the async context manager (`async with RadioController(...) as radio:`),
103you can use `await radio.connect()` and `await radio.disconnect()` to manage the
104connection manually:
105
106```python
107from benlink.controller import RadioController
108
109radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX")
110
111await radio.connect()
112
113print(radio.device_info) # Prints device info
114
115print(await radio.battery_voltage()) # Prints battery voltage
116
117await radio.disconnect() # When you're done with your session disconnect nicely
118```
119
120Events registered with `add_event_handler` will run in the background:
121
122```python
123import asyncio
124from benlink.controller import RadioController
125
126radio = RadioController.new_ble("XX:XX:XX:XX:XX:XX")
127
128await radio.connect()
129
130unsubscribe = radio.add_event_handler(lambda x: print(f"Received event: {x}\n"))
131
132# Change the channel on the radio a few times to generate some events
133
134unsubscribe() # Unsubscribe the event handler
135
136# Change the channel on the radio a few times to generate some events and
137# observe that the event handler is no longer called
138
139await radio.disconnect() # When you're done with your session disconnect nicely
140```
141
142(Note for IPython users: The IPython async REPL blocks the async event
143loop while waiting for a prompt, so events will queue up until you defer 
144execution to the event loop by running something like `await asyncio.sleep(0)`.)
145"""
146
147
148from __future__ import annotations
149from typing_extensions import Unpack
150from dataclasses import dataclass
151import typing as t
152import sys
153
154from .command import (
155    CommandConnection,
156    EventHandler,
157    DeviceInfo,
158    Channel,
159    ChannelArgs,
160    Settings,
161    SettingsArgs,
162    BeaconSettings,
163    BeaconSettingsArgs,
164    TncDataFragment,
165    EventMessage,
166    EventType,
167    SettingsChangedEvent,
168    TncDataFragmentReceivedEvent,
169    ChannelChangedEvent,
170    StatusChangedEvent,
171    UnknownProtocolMessage,
172    Status,
173    Position,
174)
175
176
177@dataclass
178class _RadioState:
179    device_info: DeviceInfo
180    beacon_settings: BeaconSettings
181    status: Status
182    settings: Settings
183    channels: t.List[Channel]
184
185
186class RadioController:
187    _conn: CommandConnection
188    _state: _RadioState | None
189
190    def __init__(self, connection: CommandConnection):
191        self._conn = connection
192        self._state = None
193
194    @classmethod
195    def new_ble(cls, device_uuid: str) -> RadioController:
196        return RadioController(CommandConnection.new_ble(device_uuid))
197
198    @classmethod
199    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> RadioController:
200        return RadioController(CommandConnection.new_rfcomm(device_uuid, channel))
201
202    def __repr__(self):
203        if not self.is_connected():
204            return f"<{self.__class__.__name__} (disconnected)>"
205        return f"<{self.__class__.__name__} (connected)>"
206
207    @property
208    def beacon_settings(self) -> BeaconSettings:
209        if self._state is None:
210            raise StateNotInitializedError()
211        return self._state.beacon_settings
212
213    async def set_beacon_settings(self, **packet_settings_args: Unpack[BeaconSettingsArgs]):
214        if self._state is None:
215            raise StateNotInitializedError()
216
217        new_beacon_settings = self._state.beacon_settings.model_copy(
218            update=dict(packet_settings_args)
219        )
220
221        await self._conn.set_beacon_settings(new_beacon_settings)
222
223        self._state.beacon_settings = new_beacon_settings
224
225    @property
226    def status(self) -> Status:
227        if self._state is None:
228            raise StateNotInitializedError()
229        return self._state.status
230
231    @property
232    def settings(self) -> Settings:
233        if self._state is None:
234            raise StateNotInitializedError()
235        return self._state.settings
236
237    async def set_settings(self, **settings_args: Unpack[SettingsArgs]):
238        if self._state is None:
239            raise StateNotInitializedError()
240
241        new_settings = self._state.settings.model_copy(
242            update=dict(settings_args)
243        )
244
245        await self._conn.set_settings(new_settings)
246
247        self._state.settings = new_settings
248
249    @property
250    def device_info(self) -> DeviceInfo:
251        if self._state is None:
252            raise StateNotInitializedError()
253        return self._state.device_info
254
255    @property
256    def channels(self) -> t.List[Channel]:
257        if self._state is None:
258            raise StateNotInitializedError()
259        return self._state.channels
260
261    async def set_channel(
262        self, channel_id: int, **channel_args: Unpack[ChannelArgs]
263    ):
264        if self._state is None:
265            raise StateNotInitializedError()
266
267        new_channel = self._state.channels[channel_id].model_copy(
268            update=dict(channel_args)
269        )
270
271        await self._conn.set_channel(new_channel)
272
273        self._state.channels[channel_id] = new_channel
274
275    def is_connected(self) -> bool:
276        return self._state is not None and self._conn.is_connected()
277
278    async def send_bytes(self, command: bytes) -> None:
279        """For debugging - Use at your own risk!"""
280        await self._conn.send_bytes(command)
281
282    async def battery_voltage(self) -> float:
283        return await self._conn.get_battery_voltage()
284
285    async def battery_level(self) -> int:
286        return await self._conn.get_battery_level()
287
288    async def battery_level_as_percentage(self) -> int:
289        return await self._conn.get_battery_level_as_percentage()
290
291    async def rc_battery_level(self) -> int:
292        return await self._conn.get_rc_battery_level()
293
294    async def position(self) -> Position:
295        return await self._conn.get_position()
296
297    async def send_tnc_data(self, data: bytes) -> None:
298        if len(data) > 50:
299            raise ValueError("Data too long -- TODO: implement fragmentation")
300
301        await self._conn.send_tnc_data_fragment(TncDataFragment(
302            is_final_fragment=True,
303            fragment_id=0,
304            data=data
305        ))
306
307    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
308        return self._conn.add_event_handler(handler)
309
310    async def enable_event(self, event_type: EventType):
311        await self._conn.enable_event(event_type)
312
313    async def _hydrate(self) -> None:
314        device_info = await self._conn.get_device_info()
315
316        channels: t.List[Channel] = []
317
318        for i in range(device_info.channel_count):
319            channel_settings = await self._conn.get_channel(i)
320            channels.append(channel_settings)
321
322        settings = await self._conn.get_settings()
323
324        beacon_settings = await self._conn.get_beacon_settings()
325
326        status = await self._conn.get_status()
327
328        # No need to save the remove event handler function, since we don't
329        # need to unregister it when we disconnect (the connection will take care of that)
330        self._conn.add_event_handler(
331            self._on_event_message
332        )
333
334        # For some reason, enabling the HT_STATUS_CHANGED event
335        # also enables the DATA_RXD event, and maybe others...
336        # need to investigate further.
337        await self.enable_event("HT_STATUS_CHANGED")
338
339        # TODO: should these events be enabled by default? perhaps I should have
340        # users enable events manually, while simultaneously registering handlers
341        # of the proper type?
342
343        self._state = _RadioState(
344            device_info=device_info,
345            beacon_settings=beacon_settings,
346            status=status,
347            settings=settings,
348            channels=channels,
349        )
350
351    def _on_event_message(self, event_message: EventMessage) -> None:
352        if self._state is None:
353            raise ValueError(
354                "Radio state not initialized. Try calling connect() first."
355            )
356
357        match event_message:
358            case ChannelChangedEvent(channel):
359                self._state.channels[channel.channel_id] = channel
360            case SettingsChangedEvent(settings):
361                self._state.settings = settings
362            case TncDataFragmentReceivedEvent():
363                pass
364            case StatusChangedEvent(status):
365                self._state.status = status
366            case UnknownProtocolMessage(message):
367                print(
368                    f"[DEBUG] Unknown protocol message: {message}",
369                    file=sys.stderr
370                )
371
372    # Async Context Manager
373    async def __aenter__(self):
374        await self.connect()
375        return self
376
377    async def __aexit__(
378        self,
379        exc_type: t.Any,
380        exc_value: t.Any,
381        traceback: t.Any,
382    ) -> None:
383        await self.disconnect()
384
385    async def connect(self) -> None:
386        if self._state is not None:
387            raise RuntimeError("Already connected")
388
389        await self._conn.connect()
390        await self._hydrate()
391
392    async def disconnect(self) -> None:
393        if self._state is None:
394            raise StateNotInitializedError()
395
396        await self._conn.disconnect()
397        self._state = None
398
399
400class StateNotInitializedError(RuntimeError):
401    """Raised when trying to access radio state before it has been initialized."""
402
403    def __init__(self):
404        super().__init__(
405            "Radio state not initialized. Try calling connect() first."
406        )
class RadioController:
187class RadioController:
188    _conn: CommandConnection
189    _state: _RadioState | None
190
191    def __init__(self, connection: CommandConnection):
192        self._conn = connection
193        self._state = None
194
195    @classmethod
196    def new_ble(cls, device_uuid: str) -> RadioController:
197        return RadioController(CommandConnection.new_ble(device_uuid))
198
199    @classmethod
200    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> RadioController:
201        return RadioController(CommandConnection.new_rfcomm(device_uuid, channel))
202
203    def __repr__(self):
204        if not self.is_connected():
205            return f"<{self.__class__.__name__} (disconnected)>"
206        return f"<{self.__class__.__name__} (connected)>"
207
208    @property
209    def beacon_settings(self) -> BeaconSettings:
210        if self._state is None:
211            raise StateNotInitializedError()
212        return self._state.beacon_settings
213
214    async def set_beacon_settings(self, **packet_settings_args: Unpack[BeaconSettingsArgs]):
215        if self._state is None:
216            raise StateNotInitializedError()
217
218        new_beacon_settings = self._state.beacon_settings.model_copy(
219            update=dict(packet_settings_args)
220        )
221
222        await self._conn.set_beacon_settings(new_beacon_settings)
223
224        self._state.beacon_settings = new_beacon_settings
225
226    @property
227    def status(self) -> Status:
228        if self._state is None:
229            raise StateNotInitializedError()
230        return self._state.status
231
232    @property
233    def settings(self) -> Settings:
234        if self._state is None:
235            raise StateNotInitializedError()
236        return self._state.settings
237
238    async def set_settings(self, **settings_args: Unpack[SettingsArgs]):
239        if self._state is None:
240            raise StateNotInitializedError()
241
242        new_settings = self._state.settings.model_copy(
243            update=dict(settings_args)
244        )
245
246        await self._conn.set_settings(new_settings)
247
248        self._state.settings = new_settings
249
250    @property
251    def device_info(self) -> DeviceInfo:
252        if self._state is None:
253            raise StateNotInitializedError()
254        return self._state.device_info
255
256    @property
257    def channels(self) -> t.List[Channel]:
258        if self._state is None:
259            raise StateNotInitializedError()
260        return self._state.channels
261
262    async def set_channel(
263        self, channel_id: int, **channel_args: Unpack[ChannelArgs]
264    ):
265        if self._state is None:
266            raise StateNotInitializedError()
267
268        new_channel = self._state.channels[channel_id].model_copy(
269            update=dict(channel_args)
270        )
271
272        await self._conn.set_channel(new_channel)
273
274        self._state.channels[channel_id] = new_channel
275
276    def is_connected(self) -> bool:
277        return self._state is not None and self._conn.is_connected()
278
279    async def send_bytes(self, command: bytes) -> None:
280        """For debugging - Use at your own risk!"""
281        await self._conn.send_bytes(command)
282
283    async def battery_voltage(self) -> float:
284        return await self._conn.get_battery_voltage()
285
286    async def battery_level(self) -> int:
287        return await self._conn.get_battery_level()
288
289    async def battery_level_as_percentage(self) -> int:
290        return await self._conn.get_battery_level_as_percentage()
291
292    async def rc_battery_level(self) -> int:
293        return await self._conn.get_rc_battery_level()
294
295    async def position(self) -> Position:
296        return await self._conn.get_position()
297
298    async def send_tnc_data(self, data: bytes) -> None:
299        if len(data) > 50:
300            raise ValueError("Data too long -- TODO: implement fragmentation")
301
302        await self._conn.send_tnc_data_fragment(TncDataFragment(
303            is_final_fragment=True,
304            fragment_id=0,
305            data=data
306        ))
307
308    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
309        return self._conn.add_event_handler(handler)
310
311    async def enable_event(self, event_type: EventType):
312        await self._conn.enable_event(event_type)
313
314    async def _hydrate(self) -> None:
315        device_info = await self._conn.get_device_info()
316
317        channels: t.List[Channel] = []
318
319        for i in range(device_info.channel_count):
320            channel_settings = await self._conn.get_channel(i)
321            channels.append(channel_settings)
322
323        settings = await self._conn.get_settings()
324
325        beacon_settings = await self._conn.get_beacon_settings()
326
327        status = await self._conn.get_status()
328
329        # No need to save the remove event handler function, since we don't
330        # need to unregister it when we disconnect (the connection will take care of that)
331        self._conn.add_event_handler(
332            self._on_event_message
333        )
334
335        # For some reason, enabling the HT_STATUS_CHANGED event
336        # also enables the DATA_RXD event, and maybe others...
337        # need to investigate further.
338        await self.enable_event("HT_STATUS_CHANGED")
339
340        # TODO: should these events be enabled by default? perhaps I should have
341        # users enable events manually, while simultaneously registering handlers
342        # of the proper type?
343
344        self._state = _RadioState(
345            device_info=device_info,
346            beacon_settings=beacon_settings,
347            status=status,
348            settings=settings,
349            channels=channels,
350        )
351
352    def _on_event_message(self, event_message: EventMessage) -> None:
353        if self._state is None:
354            raise ValueError(
355                "Radio state not initialized. Try calling connect() first."
356            )
357
358        match event_message:
359            case ChannelChangedEvent(channel):
360                self._state.channels[channel.channel_id] = channel
361            case SettingsChangedEvent(settings):
362                self._state.settings = settings
363            case TncDataFragmentReceivedEvent():
364                pass
365            case StatusChangedEvent(status):
366                self._state.status = status
367            case UnknownProtocolMessage(message):
368                print(
369                    f"[DEBUG] Unknown protocol message: {message}",
370                    file=sys.stderr
371                )
372
373    # Async Context Manager
374    async def __aenter__(self):
375        await self.connect()
376        return self
377
378    async def __aexit__(
379        self,
380        exc_type: t.Any,
381        exc_value: t.Any,
382        traceback: t.Any,
383    ) -> None:
384        await self.disconnect()
385
386    async def connect(self) -> None:
387        if self._state is not None:
388            raise RuntimeError("Already connected")
389
390        await self._conn.connect()
391        await self._hydrate()
392
393    async def disconnect(self) -> None:
394        if self._state is None:
395            raise StateNotInitializedError()
396
397        await self._conn.disconnect()
398        self._state = None
RadioController(connection: benlink.command.CommandConnection)
191    def __init__(self, connection: CommandConnection):
192        self._conn = connection
193        self._state = None
@classmethod
def new_ble(cls, device_uuid: str) -> RadioController:
195    @classmethod
196    def new_ble(cls, device_uuid: str) -> RadioController:
197        return RadioController(CommandConnection.new_ble(device_uuid))
@classmethod
def new_rfcomm( cls, device_uuid: str, channel: Union[int, Literal['auto']] = 'auto') -> RadioController:
199    @classmethod
200    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> RadioController:
201        return RadioController(CommandConnection.new_rfcomm(device_uuid, channel))
beacon_settings: benlink.command.BeaconSettings
208    @property
209    def beacon_settings(self) -> BeaconSettings:
210        if self._state is None:
211            raise StateNotInitializedError()
212        return self._state.beacon_settings
async def set_beacon_settings( self, **packet_settings_args: Unpack[benlink.command.BeaconSettingsArgs]):
214    async def set_beacon_settings(self, **packet_settings_args: Unpack[BeaconSettingsArgs]):
215        if self._state is None:
216            raise StateNotInitializedError()
217
218        new_beacon_settings = self._state.beacon_settings.model_copy(
219            update=dict(packet_settings_args)
220        )
221
222        await self._conn.set_beacon_settings(new_beacon_settings)
223
224        self._state.beacon_settings = new_beacon_settings
status: benlink.command.Status
226    @property
227    def status(self) -> Status:
228        if self._state is None:
229            raise StateNotInitializedError()
230        return self._state.status
settings: benlink.command.Settings
232    @property
233    def settings(self) -> Settings:
234        if self._state is None:
235            raise StateNotInitializedError()
236        return self._state.settings
async def set_settings(self, **settings_args: Unpack[benlink.command.SettingsArgs]):
238    async def set_settings(self, **settings_args: Unpack[SettingsArgs]):
239        if self._state is None:
240            raise StateNotInitializedError()
241
242        new_settings = self._state.settings.model_copy(
243            update=dict(settings_args)
244        )
245
246        await self._conn.set_settings(new_settings)
247
248        self._state.settings = new_settings
device_info: benlink.command.DeviceInfo
250    @property
251    def device_info(self) -> DeviceInfo:
252        if self._state is None:
253            raise StateNotInitializedError()
254        return self._state.device_info
channels: List[benlink.command.Channel]
256    @property
257    def channels(self) -> t.List[Channel]:
258        if self._state is None:
259            raise StateNotInitializedError()
260        return self._state.channels
async def set_channel( self, channel_id: int, **channel_args: Unpack[benlink.command.ChannelArgs]):
262    async def set_channel(
263        self, channel_id: int, **channel_args: Unpack[ChannelArgs]
264    ):
265        if self._state is None:
266            raise StateNotInitializedError()
267
268        new_channel = self._state.channels[channel_id].model_copy(
269            update=dict(channel_args)
270        )
271
272        await self._conn.set_channel(new_channel)
273
274        self._state.channels[channel_id] = new_channel
def is_connected(self) -> bool:
276    def is_connected(self) -> bool:
277        return self._state is not None and self._conn.is_connected()
async def send_bytes(self, command: bytes) -> None:
279    async def send_bytes(self, command: bytes) -> None:
280        """For debugging - Use at your own risk!"""
281        await self._conn.send_bytes(command)

For debugging - Use at your own risk!

async def battery_voltage(self) -> float:
283    async def battery_voltage(self) -> float:
284        return await self._conn.get_battery_voltage()
async def battery_level(self) -> int:
286    async def battery_level(self) -> int:
287        return await self._conn.get_battery_level()
async def battery_level_as_percentage(self) -> int:
289    async def battery_level_as_percentage(self) -> int:
290        return await self._conn.get_battery_level_as_percentage()
async def rc_battery_level(self) -> int:
292    async def rc_battery_level(self) -> int:
293        return await self._conn.get_rc_battery_level()
async def position(self) -> benlink.command.Position:
295    async def position(self) -> Position:
296        return await self._conn.get_position()
async def send_tnc_data(self, data: bytes) -> None:
298    async def send_tnc_data(self, data: bytes) -> None:
299        if len(data) > 50:
300            raise ValueError("Data too long -- TODO: implement fragmentation")
301
302        await self._conn.send_tnc_data_fragment(TncDataFragment(
303            is_final_fragment=True,
304            fragment_id=0,
305            data=data
306        ))
308    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
309        return self._conn.add_event_handler(handler)
async def enable_event( self, event_type: Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']):
311    async def enable_event(self, event_type: EventType):
312        await self._conn.enable_event(event_type)
async def connect(self) -> None:
386    async def connect(self) -> None:
387        if self._state is not None:
388            raise RuntimeError("Already connected")
389
390        await self._conn.connect()
391        await self._hydrate()
async def disconnect(self) -> None:
393    async def disconnect(self) -> None:
394        if self._state is None:
395            raise StateNotInitializedError()
396
397        await self._conn.disconnect()
398        self._state = None
class StateNotInitializedError(builtins.RuntimeError):
401class StateNotInitializedError(RuntimeError):
402    """Raised when trying to access radio state before it has been initialized."""
403
404    def __init__(self):
405        super().__init__(
406            "Radio state not initialized. Try calling connect() first."
407        )

Raised when trying to access radio state before it has been initialized.