Edit on GitHub

benlink.command

Overview

This module defines a low-level interface for communicating with a radio over BLE or Rfcomm.

Unless you know what you are doing, you probably want to use the benlink.controller module instead.

Messages

The RadioMessage type is a union of all the possible messages that can sent or received from the radio.

It RadioMessages are divided into three categories:

  1. CommandMessage: Messages that are sent to the radio to request information or to change settings.

  2. ReplyMessage: Messages that are received in response to a CommandMessage.

  3. EventMessage: Messages that are received from the radio to indicate that an event has occurred. (e.g. a channel has changed, a packet has been received)

Data

The data objects (e.g. DeviceInfo, Settings) are used to represent the data that is sent or received in the messages. Some of these data objects have accompanying Args types that are used in the API to allow for functions that take keyword arguments to set these parameters.

   1"""
   2# Overview
   3
   4This module defines a low-level interface for communicating
   5with a radio over BLE or Rfcomm.
   6
   7Unless you know what you are doing, you probably want to use the
   8`benlink.controller` module instead.
   9
  10# Messages
  11
  12The `RadioMessage` type is a union of all the possible messages that can
  13sent or received from the radio.
  14
  15It `RadioMessage`s are divided into three categories:
  16
  171. `CommandMessage`: Messages that are sent to the radio to request
  18    information or to change settings.
  19
  202. `ReplyMessage`: Messages that are received in response to a
  21    `CommandMessage`.
  22
  233. `EventMessage`: Messages that are received from the radio to indicate
  24    that an event has occurred. (e.g. a channel has changed, a packet has
  25    been received)
  26
  27# Data
  28
  29The data objects (e.g. `DeviceInfo`, `Settings`) are used to represent
  30the data that is sent or received in the messages. Some of these data
  31objects have accompanying `Args` types that are used in the API to allow
  32for functions that take keyword arguments to set these parameters.
  33"""
  34
  35from __future__ import annotations
  36import typing as t
  37import asyncio
  38from pydantic import BaseModel, ConfigDict
  39from . import protocol as p
  40from .link import CommandLink, BleCommandLink, RfcommCommandLink
  41from datetime import datetime
  42
  43RADIO_SERVICE_UUID = "00001100-d102-11e1-9b23-00025b00a5a5"
  44"""@private"""
  45
  46RADIO_WRITE_UUID = "00001101-d102-11e1-9b23-00025b00a5a5"
  47"""@private"""
  48
  49RADIO_INDICATE_UUID = "00001102-d102-11e1-9b23-00025b00a5a5"
  50"""@private"""
  51
  52
  53class CommandConnection:
  54    _link: CommandLink
  55    _handlers: t.List[RadioMessageHandler] = []
  56
  57    def __init__(self, link: CommandLink):
  58        self._link = link
  59        self._handlers = []
  60
  61    @classmethod
  62    def new_ble(cls, device_uuid: str) -> CommandConnection:
  63        return cls(BleCommandLink(device_uuid))
  64
  65    @classmethod
  66    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection:
  67        return cls(RfcommCommandLink(device_uuid, channel))
  68
  69    def is_connected(self) -> bool:
  70        return self._link.is_connected()
  71
  72    async def connect(self) -> None:
  73        await self._link.connect(self._on_recv)
  74
  75    async def disconnect(self) -> None:
  76        self._handlers.clear()
  77        await self._link.disconnect()
  78
  79    async def send_bytes(self, data: bytes) -> None:
  80        await self._link.send_bytes(data)
  81
  82    async def send_message(self, command: CommandMessage) -> None:
  83        await self._link.send(command_message_to_protocol(command))
  84
  85    async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError:
  86        queue: asyncio.Queue[RadioMessageT |
  87                             MessageReplyError] = asyncio.Queue()
  88
  89        def reply_handler(reply: RadioMessage):
  90            if (
  91                isinstance(reply, expect) or
  92                (
  93                    isinstance(reply, MessageReplyError) and
  94                    reply.message_type is expect
  95                )
  96            ):
  97                queue.put_nowait(reply)
  98
  99        remove_handler = self._add_message_handler(reply_handler)
 100
 101        await self.send_message(command)
 102
 103        out = await queue.get()
 104
 105        remove_handler()
 106
 107        return out
 108
 109    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
 110        def event_handler(msg: RadioMessage):
 111            if isinstance(msg, EventMessage):
 112                handler(msg)
 113        return self._add_message_handler(event_handler)
 114
 115    def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]:
 116        self._handlers.append(handler)
 117
 118        def remove_handler():
 119            self._handlers.remove(handler)
 120
 121        return remove_handler
 122
 123    def _on_recv(self, msg: p.Message) -> None:
 124        radio_message = radio_message_from_protocol(msg)
 125        for handler in self._handlers:
 126            handler(radio_message)
 127
 128    # Command API
 129
 130    async def enable_event(self, event_type: EventType) -> None:
 131        """Enable an event"""
 132        # This event doesn't get a reply version of EnableEvent. It does, however
 133        # does trigger a StatusChangedEvent... but we don't wait for it here.
 134        # Instead, we just fire and forget
 135        await self.send_message(EnableEvent(event_type))
 136
 137    async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
 138        """Send Tnc data"""
 139        reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply)
 140        if isinstance(reply, MessageReplyError):
 141            raise reply.as_exception()
 142
 143    async def get_beacon_settings(self) -> BeaconSettings:
 144        """Get the current packet settings"""
 145        reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply)
 146        if isinstance(reply, MessageReplyError):
 147            raise reply.as_exception()
 148        return reply.tnc_settings
 149
 150    async def set_beacon_settings(self, packet_settings: BeaconSettings):
 151        """Set the packet settings"""
 152        reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply)
 153        if isinstance(reply, MessageReplyError):
 154            raise reply.as_exception()
 155
 156    async def get_battery_level(self) -> int:
 157        """Get the battery level"""
 158        reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply)
 159        if isinstance(reply, MessageReplyError):
 160            raise reply.as_exception()
 161        return reply.battery_level
 162
 163    async def get_battery_level_as_percentage(self) -> int:
 164        """Get the battery level as a percentage"""
 165        reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply)
 166        if isinstance(reply, MessageReplyError):
 167            raise reply.as_exception()
 168        return reply.battery_level_as_percentage
 169
 170    async def get_rc_battery_level(self) -> int:
 171        """Get the RC battery level"""
 172        reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply)
 173        if isinstance(reply, MessageReplyError):
 174            raise reply.as_exception()
 175        return reply.rc_battery_level
 176
 177    async def get_battery_voltage(self) -> float:
 178        """Get the battery voltage"""
 179        reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply)
 180        if isinstance(reply, MessageReplyError):
 181            raise reply.as_exception()
 182        return reply.battery_voltage
 183
 184    async def get_device_info(self) -> DeviceInfo:
 185        """Get the device info"""
 186        reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply)
 187        if isinstance(reply, MessageReplyError):
 188            raise reply.as_exception()
 189        return reply.device_info
 190
 191    async def get_settings(self) -> Settings:
 192        """Get the settings"""
 193        reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply)
 194        if isinstance(reply, MessageReplyError):
 195            raise reply.as_exception()
 196        return reply.settings
 197
 198    async def set_settings(self, settings: Settings) -> None:
 199        """Set the settings"""
 200        reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply)
 201        if isinstance(reply, MessageReplyError):
 202            raise reply.as_exception()
 203
 204    async def get_channel(self, channel_id: int) -> Channel:
 205        """Get a channel"""
 206        reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply)
 207        if isinstance(reply, MessageReplyError):
 208            raise reply.as_exception()
 209        return reply.channel
 210
 211    async def set_channel(self, channel: Channel):
 212        """Set a channel"""
 213        reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply)
 214        if isinstance(reply, MessageReplyError):
 215            raise reply.as_exception()
 216
 217    async def get_status(self) -> Status:
 218        """Get the radio status"""
 219        reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply)
 220        if isinstance(reply, MessageReplyError):
 221            raise reply.as_exception()
 222        return reply.status
 223
 224    async def get_position(self) -> Position:
 225        """Get the radio position"""
 226        reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply)
 227        if isinstance(reply, MessageReplyError):
 228            raise reply.as_exception()
 229        return reply.position
 230
 231    async def __aenter__(self):
 232        await self.connect()
 233        return self
 234
 235    async def __aexit__(
 236        self,
 237        exc_type: t.Any,
 238        exc_value: t.Any,
 239        traceback: t.Any,
 240    ) -> None:
 241        await self.disconnect()
 242
 243
 244class ImmutableBaseModel(BaseModel):
 245    """@private (A base class for immutable data objects)"""
 246
 247    model_config = ConfigDict(frozen=True)
 248    """@private"""
 249
 250
 251def command_message_to_protocol(m: CommandMessage) -> p.Message:
 252    """@private (Protocol helper)"""
 253    match m:
 254        case EnableEvent(event_type):
 255            return p.Message(
 256                command_group=p.CommandGroup.BASIC,
 257                is_reply=False,
 258                command=p.BasicCommand.REGISTER_NOTIFICATION,
 259                body=p.RegisterNotificationBody(
 260                    event_type=p.EventType[event_type],
 261                )
 262            )
 263        case SendTncDataFragment(tnc_data_fragment):
 264            return p.Message(
 265                command_group=p.CommandGroup.BASIC,
 266                is_reply=False,
 267                command=p.BasicCommand.HT_SEND_DATA,
 268                body=p.HTSendDataBody(
 269                    tnc_data_fragment=tnc_data_fragment.to_protocol()
 270                )
 271            )
 272        case GetBeaconSettings():
 273            return p.Message(
 274                command_group=p.CommandGroup.BASIC,
 275                is_reply=False,
 276                command=p.BasicCommand.READ_BSS_SETTINGS,
 277                body=p.ReadBSSSettingsBody()
 278            )
 279        case SetBeaconSettings(packet_settings):
 280            return p.Message(
 281                command_group=p.CommandGroup.BASIC,
 282                is_reply=False,
 283                command=p.BasicCommand.WRITE_BSS_SETTINGS,
 284                body=p.WriteBSSSettingsBody(
 285                    bss_settings=packet_settings.to_protocol()
 286                )
 287            )
 288        case GetSettings():
 289            return p.Message(
 290                command_group=p.CommandGroup.BASIC,
 291                is_reply=False,
 292                command=p.BasicCommand.READ_SETTINGS,
 293                body=p.ReadSettingsBody()
 294            )
 295        case SetSettings(settings):
 296            return p.Message(
 297                command_group=p.CommandGroup.BASIC,
 298                is_reply=False,
 299                command=p.BasicCommand.WRITE_SETTINGS,
 300                body=p.WriteSettingsBody(
 301                    settings=settings.to_protocol()
 302                )
 303            )
 304        case GetDeviceInfo():
 305            return p.Message(
 306                command_group=p.CommandGroup.BASIC,
 307                is_reply=False,
 308                command=p.BasicCommand.GET_DEV_INFO,
 309                body=p.GetDevInfoBody()
 310            )
 311        case GetChannel(channel_id):
 312            return p.Message(
 313                command_group=p.CommandGroup.BASIC,
 314                is_reply=False,
 315                command=p.BasicCommand.READ_RF_CH,
 316                body=p.ReadRFChBody(channel_id=channel_id)
 317            )
 318        case SetChannel(channel):
 319            return p.Message(
 320                command_group=p.CommandGroup.BASIC,
 321                is_reply=False,
 322                command=p.BasicCommand.WRITE_RF_CH,
 323                body=p.WriteRFChBody(
 324                    rf_ch=channel.to_protocol()
 325                )
 326            )
 327        case GetBatteryVoltage():
 328            return p.Message(
 329                command_group=p.CommandGroup.BASIC,
 330                is_reply=False,
 331                command=p.BasicCommand.READ_STATUS,
 332                body=p.ReadPowerStatusBody(
 333                    status_type=p.PowerStatusType.BATTERY_VOLTAGE
 334                )
 335            )
 336        case GetBatteryLevel():
 337            return p.Message(
 338                command_group=p.CommandGroup.BASIC,
 339                is_reply=False,
 340                command=p.BasicCommand.READ_STATUS,
 341                body=p.ReadPowerStatusBody(
 342                    status_type=p.PowerStatusType.BATTERY_LEVEL
 343                )
 344            )
 345        case GetBatteryLevelAsPercentage():
 346            return p.Message(
 347                command_group=p.CommandGroup.BASIC,
 348                is_reply=False,
 349                command=p.BasicCommand.READ_STATUS,
 350                body=p.ReadPowerStatusBody(
 351                    status_type=p.PowerStatusType.BATTERY_LEVEL_AS_PERCENTAGE
 352                )
 353            )
 354        case GetRCBatteryLevel():
 355            return p.Message(
 356                command_group=p.CommandGroup.BASIC,
 357                is_reply=False,
 358                command=p.BasicCommand.READ_STATUS,
 359                body=p.ReadPowerStatusBody(
 360                    status_type=p.PowerStatusType.RC_BATTERY_LEVEL
 361                )
 362            )
 363        case GetStatus():
 364            return p.Message(
 365                command_group=p.CommandGroup.BASIC,
 366                is_reply=False,
 367                command=p.BasicCommand.GET_HT_STATUS,
 368                body=p.GetHtStatusBody()
 369            )
 370        case GetPosition():
 371            return p.Message(
 372                command_group=p.CommandGroup.BASIC,
 373                is_reply=False,
 374                command=p.BasicCommand.GET_POSITION,
 375                body=p.GetPositionBody()
 376            )
 377
 378
 379def radio_message_from_protocol(mf: p.Message) -> RadioMessage:
 380    """@private (Protocol helper)"""
 381    match mf.body:
 382        case p.GetPositionReplyBody(reply_status=reply_status, position=position):
 383            if position is None:
 384                return MessageReplyError(
 385                    message_type=GetPositionReply,
 386                    reason=reply_status.name
 387                )
 388            return GetPositionReply(Position.from_protocol(position))
 389        case p.GetHtStatusReplyBody(reply_status=reply_status, status=status):
 390            if status is None:
 391                return MessageReplyError(
 392                    message_type=GetStatusReply,
 393                    reason=reply_status.name,
 394                )
 395            return GetStatusReply(Status.from_protocol(status))
 396        case p.HTSendDataReplyBody(
 397            reply_status=reply_status
 398        ):
 399            if reply_status != p.ReplyStatus.SUCCESS:
 400                return MessageReplyError(
 401                    message_type=SendTncDataFragmentReply,
 402                    reason=reply_status.name,
 403                )
 404            return SendTncDataFragmentReply()
 405        case p.ReadBSSSettingsReplyBody(
 406            reply_status=reply_status,
 407            bss_settings=bss_settings,
 408        ):
 409            if bss_settings is None:
 410                return MessageReplyError(
 411                    message_type=GetBeaconSettingsReply,
 412                    reason=reply_status.name,
 413                )
 414
 415            return GetBeaconSettingsReply(BeaconSettings.from_protocol(bss_settings))
 416        case p.WriteBSSSettingsReplyBody(
 417            reply_status=reply_status,
 418        ):
 419            if reply_status != p.ReplyStatus.SUCCESS:
 420                return MessageReplyError(
 421                    message_type=SetBeaconSettingsReply,
 422                    reason=reply_status.name,
 423                )
 424            return SetBeaconSettingsReply()
 425        case p.ReadPowerStatusReplyBody(
 426            reply_status=reply_status,
 427            status=power_status
 428        ):
 429            if power_status is None:
 430                return MessageReplyError(
 431                    message_type=GetBatteryVoltageReply,
 432                    reason=reply_status.name,
 433                )
 434
 435            match power_status.value:
 436                case p.BatteryVoltageStatus(
 437                    battery_voltage=battery_voltage
 438                ):
 439                    return GetBatteryVoltageReply(
 440                        battery_voltage=battery_voltage
 441                    )
 442                case p.BatteryLevelPercentageStatus(
 443                    battery_level_as_percentage=battery_level_as_percentage
 444                ):
 445                    return GetBatteryLevelAsPercentageReply(
 446                        battery_level_as_percentage=battery_level_as_percentage
 447                    )
 448                case p.BatteryLevelStatus(
 449                    battery_level=battery_level
 450                ):
 451                    return GetBatteryLevelReply(
 452                        battery_level=battery_level
 453                    )
 454                case p.RCBatteryLevelStatus(
 455                    rc_battery_level=rc_battery_level
 456                ):
 457                    return GetRCBatteryLevelReply(
 458                        rc_battery_level=rc_battery_level
 459                    )
 460        case p.EventNotificationBody(
 461            event=event
 462        ):
 463            match event:
 464                case p.HTSettingsChangedEvent(
 465                    settings=settings
 466                ):
 467                    return SettingsChangedEvent(Settings.from_protocol(settings))
 468                case p.DataRxdEvent(
 469                    tnc_data_fragment=tnc_data_fragment
 470                ):
 471                    return TncDataFragmentReceivedEvent(
 472                        tnc_data_fragment=TncDataFragment.from_protocol(
 473                            tnc_data_fragment
 474                        )
 475                    )
 476                case p.HTChChangedEvent(
 477                    rf_ch=rf_ch
 478                ):
 479                    return ChannelChangedEvent(Channel.from_protocol(rf_ch))
 480                case p.HTStatusChangedEvent(
 481                    status=status
 482                ):
 483                    return StatusChangedEvent(Status.from_protocol(status))
 484                case _:
 485                    return UnknownProtocolMessage(mf)
 486        case p.ReadSettingsReplyBody(
 487            reply_status=reply_status,
 488            settings=settings
 489        ):
 490            if settings is None:
 491                return MessageReplyError(
 492                    message_type=GetSettingsReply,
 493                    reason=reply_status.name,
 494                )
 495            return GetSettingsReply(Settings.from_protocol(settings))
 496        case p.WriteSettingsReplyBody(
 497            reply_status=reply_status,
 498        ):
 499            if reply_status != p.ReplyStatus.SUCCESS:
 500                return MessageReplyError(
 501                    message_type=SetSettingsReply,
 502                    reason=reply_status.name,
 503                )
 504            return SetSettingsReply()
 505        case p.GetDevInfoReplyBody(
 506            reply_status=reply_status,
 507            dev_info=dev_info
 508        ):
 509            if dev_info is None:
 510                return MessageReplyError(
 511                    message_type=GetDeviceInfoReply,
 512                    reason=reply_status.name,
 513                )
 514            return GetDeviceInfoReply(DeviceInfo.from_protocol(dev_info))
 515        case p.ReadRFChReplyBody(
 516            reply_status=reply_status,
 517            rf_ch=rf_ch
 518        ):
 519            if rf_ch is None:
 520                return MessageReplyError(
 521                    message_type=GetChannelReply,
 522                    reason=reply_status.name,
 523                )
 524            return GetChannelReply(Channel.from_protocol(rf_ch))
 525        case p.WriteRFChReplyBody(
 526            reply_status=reply_status,
 527        ):
 528            if reply_status != p.ReplyStatus.SUCCESS:
 529                return MessageReplyError(
 530                    message_type=SetChannelReply,
 531                    reason=reply_status.name,
 532                )
 533            return SetChannelReply()
 534
 535        case _:
 536            return UnknownProtocolMessage(mf)
 537
 538
 539#####################
 540# CommandMessage
 541
 542class EnableEvent(t.NamedTuple):
 543    event_type: EventType
 544
 545
 546class GetBeaconSettings(t.NamedTuple):
 547    pass
 548
 549
 550class SetBeaconSettings(t.NamedTuple):
 551    tnc_settings: BeaconSettings
 552
 553
 554class SetSettings(t.NamedTuple):
 555    settings: Settings
 556
 557
 558class GetBatteryLevelAsPercentage(t.NamedTuple):
 559    pass
 560
 561
 562class GetRCBatteryLevel(t.NamedTuple):
 563    pass
 564
 565
 566class GetBatteryLevel(t.NamedTuple):
 567    pass
 568
 569
 570class GetBatteryVoltage(t.NamedTuple):
 571    pass
 572
 573
 574class GetDeviceInfo(t.NamedTuple):
 575    pass
 576
 577
 578class GetChannel(t.NamedTuple):
 579    channel_id: int
 580
 581
 582class SetChannel(t.NamedTuple):
 583    channel: Channel
 584
 585
 586class GetSettings(t.NamedTuple):
 587    pass
 588
 589
 590class GetStatus(t.NamedTuple):
 591    pass
 592
 593
 594class GetPosition(t.NamedTuple):
 595    pass
 596
 597
 598class SendTncDataFragment(t.NamedTuple):
 599    tnc_data_fragment: TncDataFragment
 600
 601
 602CommandMessage = t.Union[
 603    GetBeaconSettings,
 604    SetBeaconSettings,
 605    GetRCBatteryLevel,
 606    GetBatteryLevelAsPercentage,
 607    GetBatteryLevel,
 608    GetBatteryVoltage,
 609    GetDeviceInfo,
 610    GetChannel,
 611    SetChannel,
 612    GetSettings,
 613    SetSettings,
 614    SendTncDataFragment,
 615    EnableEvent,
 616    GetStatus,
 617    GetPosition,
 618]
 619
 620#####################
 621# ReplyMessage
 622
 623
 624class SendTncDataFragmentReply(t.NamedTuple):
 625    pass
 626
 627
 628class GetBeaconSettingsReply(t.NamedTuple):
 629    tnc_settings: BeaconSettings
 630
 631
 632class SetBeaconSettingsReply(t.NamedTuple):
 633    pass
 634
 635
 636class SetSettingsReply(t.NamedTuple):
 637    pass
 638
 639
 640class GetBatteryLevelAsPercentageReply(t.NamedTuple):
 641    battery_level_as_percentage: int
 642
 643
 644class GetRCBatteryLevelReply(t.NamedTuple):
 645    rc_battery_level: int
 646
 647
 648class GetBatteryLevelReply(t.NamedTuple):
 649    battery_level: int
 650
 651
 652class GetBatteryVoltageReply(t.NamedTuple):
 653    battery_voltage: float
 654
 655
 656class GetDeviceInfoReply(t.NamedTuple):
 657    device_info: DeviceInfo
 658
 659
 660class GetChannelReply(t.NamedTuple):
 661    channel: Channel
 662
 663
 664class SetChannelReply(t.NamedTuple):
 665    pass
 666
 667
 668class GetStatusReply(t.NamedTuple):
 669    status: Status
 670
 671
 672class GetSettingsReply(t.NamedTuple):
 673    settings: Settings
 674
 675
 676class GetPositionReply(t.NamedTuple):
 677    position: Position
 678
 679
 680ReplyStatus = t.Literal[
 681    "SUCCESS",
 682    "NOT_SUPPORTED",
 683    "NOT_AUTHENTICATED",
 684    "INSUFFICIENT_RESOURCES",
 685    "AUTHENTICATING",
 686    "INVALID_PARAMETER",
 687    "INCORRECT_STATE",
 688    "IN_PROGRESS",
 689]
 690
 691
 692class MessageReplyError(t.NamedTuple):
 693    message_type: t.Type[t.Any]
 694    reason: ReplyStatus
 695
 696    def as_exception(self):
 697        return ValueError(f"{self.message_type.__name__} failed: {self.reason}")
 698
 699
 700ReplyMessage = t.Union[
 701    GetBeaconSettingsReply,
 702    SetBeaconSettingsReply,
 703    GetBatteryLevelAsPercentageReply,
 704    GetRCBatteryLevelReply,
 705    GetBatteryLevelReply,
 706    GetBatteryVoltageReply,
 707    GetDeviceInfoReply,
 708    GetChannelReply,
 709    SetChannelReply,
 710    GetSettingsReply,
 711    SetSettingsReply,
 712    SendTncDataFragmentReply,
 713    GetStatusReply,
 714    GetPositionReply,
 715    MessageReplyError,
 716]
 717
 718#####################
 719# EventMessage
 720
 721
 722class StatusChangedEvent(t.NamedTuple):
 723    status: Status
 724
 725
 726class ChannelChangedEvent(t.NamedTuple):
 727    channel: Channel
 728
 729
 730class TncDataFragmentReceivedEvent(t.NamedTuple):
 731    tnc_data_fragment: TncDataFragment
 732
 733
 734class SettingsChangedEvent(t.NamedTuple):
 735    settings: Settings
 736
 737
 738class UnknownProtocolMessage(t.NamedTuple):
 739    message: p.Message
 740
 741
 742EventMessage = t.Union[
 743    TncDataFragmentReceivedEvent,
 744    SettingsChangedEvent,
 745    ChannelChangedEvent,
 746    StatusChangedEvent,
 747    UnknownProtocolMessage,
 748]
 749
 750EventType = t.Literal[
 751    "HT_STATUS_CHANGED",
 752    "DATA_RXD",
 753    "NEW_INQUIRY_DATA",
 754    "RESTORE_FACTORY_SETTINGS",
 755    "HT_CH_CHANGED",
 756    "HT_SETTINGS_CHANGED",
 757    "RINGING_STOPPED"
 758    "RADIO_STATUS_CHANGED",
 759    "USER_ACTION",
 760    "SYSTEM_EVENT",
 761    "BSS_SETTINGS_CHANGED",
 762    "DATA_TXD",
 763    "POSITION_CHANGED"
 764]
 765
 766RadioMessage = ReplyMessage | EventMessage
 767
 768RadioMessageT = t.TypeVar("RadioMessageT", bound=RadioMessage)
 769
 770#####################
 771# Handlers
 772
 773RadioMessageHandler = t.Callable[[RadioMessage], None]
 774"""@private"""
 775
 776EventHandler = t.Callable[[EventMessage], None]
 777"""@private"""
 778
 779#####################
 780# Protocol to data object conversions
 781
 782
 783class IntSplit(t.NamedTuple):
 784    """@private (A helper for working with integers split into upper and lower parts)"""
 785
 786    n_upper: int
 787    n_lower: int
 788
 789    def from_parts(self, upper: int, lower: int) -> int:
 790        if upper >= 1 << self.n_upper:
 791            raise ValueError(
 792                f"Upper part {upper} is too large for {self.n_upper} bits")
 793        if lower >= 1 << self.n_lower:
 794            raise ValueError(
 795                f"Lower part {lower} is too large for {self.n_lower} bits")
 796
 797        return (upper << self.n_lower) | lower
 798
 799    def get_upper(self, n: int) -> int:
 800        if n >= 1 << (self.n_upper + self.n_lower):
 801            raise ValueError(
 802                f"Value {n} is too large for {self.n_upper + self.n_lower} bits"
 803            )
 804        return n >> self.n_lower
 805
 806    def get_lower(self, n: int) -> int:
 807        if n >= 1 << (self.n_upper + self.n_lower):
 808            raise ValueError(
 809                f"Value {n} is too large for {self.n_upper + self.n_lower} bits"
 810            )
 811        return n & ((1 << self.n_lower) - 1)
 812
 813
 814class TncDataFragment(ImmutableBaseModel):
 815    """A data object representing a message packet"""
 816    is_final_fragment: bool
 817    fragment_id: int
 818    data: bytes
 819    channel_id: int | None = None
 820
 821    @classmethod
 822    def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment:
 823        """@private (Protocol helper)"""
 824        return TncDataFragment(
 825            is_final_fragment=mp.is_final_fragment,
 826            fragment_id=mp.fragment_id,
 827            data=mp.data,
 828            channel_id=mp.channel_id
 829        )
 830
 831    def to_protocol(self) -> p.TncDataFragment:
 832        """@private (Protocol helper)"""
 833        return p.TncDataFragment(
 834            is_final_fragment=self.is_final_fragment,
 835            with_channel_id=self.channel_id is not None,
 836            fragment_id=self.fragment_id,
 837            data=self.data,
 838            channel_id=self.channel_id
 839        )
 840
 841
 842ModulationType = t.Literal["AM", "FM", "DMR"]
 843
 844BandwidthType = t.Literal["NARROW", "WIDE"]
 845
 846
 847class DCS(t.NamedTuple):
 848    """A type for setting Digital Coded Squelch (DCS) on channels"""
 849
 850    n: int
 851    """The DCS Normal (N) code"""
 852
 853
 854def sub_audio_from_protocol(x: float | p.DCS | None) -> float | DCS | None:
 855    """@private (Protocol helper)"""
 856    match x:
 857        case p.DCS(n):
 858            return DCS(n=n)
 859        case _:
 860            return x
 861
 862
 863def sub_audio_to_protocol(x: float | DCS | None) -> float | p.DCS | None:
 864    """@private (Protocol helper)"""
 865    match x:
 866        case DCS(n):
 867            return p.DCS(n=n)
 868        case _:
 869            return x
 870
 871
 872class ChannelArgs(t.TypedDict, total=False):
 873    """A dictionary of the parameters that can be set on a channel"""
 874    tx_mod: ModulationType
 875    tx_freq: float
 876    rx_mod: ModulationType
 877    rx_freq: float
 878    tx_sub_audio: float | DCS | None
 879    rx_sub_audio: float | DCS | None
 880    scan: bool
 881    tx_at_max_power: bool
 882    talk_around: bool
 883    bandwidth: BandwidthType
 884    pre_de_emph_bypass: bool
 885    sign: bool
 886    tx_at_med_power: bool
 887    tx_disable: bool
 888    fixed_freq: bool
 889    fixed_bandwidth: bool
 890    fixed_tx_power: bool
 891    mute: bool
 892    name: str
 893
 894
 895class Channel(ImmutableBaseModel):
 896    """A data object representing a radio channel"""
 897    channel_id: int
 898    tx_mod: ModulationType
 899    tx_freq: float
 900    rx_mod: ModulationType
 901    rx_freq: float
 902    tx_sub_audio: float | DCS | None
 903    rx_sub_audio: float | DCS | None
 904    scan: bool
 905    tx_at_max_power: bool
 906    talk_around: bool
 907    bandwidth: BandwidthType
 908    pre_de_emph_bypass: bool
 909    sign: bool
 910    tx_at_med_power: bool
 911    tx_disable: bool
 912    fixed_freq: bool
 913    fixed_bandwidth: bool
 914    fixed_tx_power: bool
 915    mute: bool
 916    name: str
 917
 918    @classmethod
 919    def from_protocol(cls, cs: p.RfCh) -> Channel:
 920        """@private (Protocol helper)"""
 921        return Channel(
 922            channel_id=cs.channel_id,
 923            tx_mod=cs.tx_mod.name,
 924            tx_freq=cs.tx_freq,
 925            rx_mod=cs.rx_mod.name,
 926            rx_freq=cs.rx_freq,
 927            tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio),
 928            rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio),
 929            scan=cs.scan,
 930            tx_at_max_power=cs.tx_at_max_power,
 931            talk_around=cs.talk_around,
 932            bandwidth=cs.bandwidth.name,
 933            pre_de_emph_bypass=cs.pre_de_emph_bypass,
 934            sign=cs.sign,
 935            tx_at_med_power=cs.tx_at_med_power,
 936            tx_disable=cs.tx_disable,
 937            fixed_freq=cs.fixed_freq,
 938            fixed_bandwidth=cs.fixed_bandwidth,
 939            fixed_tx_power=cs.fixed_tx_power,
 940            mute=cs.mute,
 941            name=cs.name_str
 942        )
 943
 944    def to_protocol(self) -> p.RfCh:
 945        """@private (Protocol helper)"""
 946        return p.RfCh(
 947            channel_id=self.channel_id,
 948            tx_mod=p.ModulationType[self.tx_mod],
 949            tx_freq=self.tx_freq,
 950            rx_mod=p.ModulationType[self.rx_mod],
 951            rx_freq=self.rx_freq,
 952            tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio),
 953            rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio),
 954            scan=self.scan,
 955            tx_at_max_power=self.tx_at_max_power,
 956            talk_around=self.talk_around,
 957            bandwidth=p.BandwidthType[self.bandwidth],
 958            pre_de_emph_bypass=self.pre_de_emph_bypass,
 959            sign=self.sign,
 960            tx_at_med_power=self.tx_at_med_power,
 961            tx_disable=self.tx_disable,
 962            fixed_freq=self.fixed_freq,
 963            fixed_bandwidth=self.fixed_bandwidth,
 964            fixed_tx_power=self.fixed_tx_power,
 965            mute=self.mute,
 966            name_str=self.name
 967        )
 968
 969
 970class SettingsArgs(t.TypedDict, total=False):
 971    """A dictionary of the parameters that can be set in the radio settings"""
 972    channel_a: int
 973    channel_b: int
 974    scan: bool
 975    aghfp_call_mode: int
 976    double_channel: int
 977    squelch_level: int
 978    tail_elim: bool
 979    auto_relay_en: bool
 980    auto_power_on: bool
 981    keep_aghfp_link: bool
 982    mic_gain: int
 983    tx_hold_time: int
 984    tx_time_limit: int
 985    local_speaker: int
 986    bt_mic_gain: int
 987    adaptive_response: bool
 988    dis_tone: bool
 989    power_saving_mode: bool
 990    auto_power_off: int
 991    auto_share_loc_ch: int | t.Literal["current"]
 992    hm_speaker: int
 993    positioning_system: int
 994    time_offset: int
 995    use_freq_range_2: bool
 996    ptt_lock: bool
 997    leading_sync_bit_en: bool
 998    pairing_at_power_on: bool
 999    screen_timeout: int
1000    vfo_x: int
1001    imperial_unit: bool
1002    wx_mode: int
1003    noaa_ch: int
1004    vfol_tx_power_x: int
1005    vfo2_tx_power_x: int
1006    dis_digital_mute: bool
1007    signaling_ecc_en: bool
1008    ch_data_lock: bool
1009    vfo1_mod_freq_x: int
1010    vfo2_mod_freq_x: int
1011
1012
1013class Settings(ImmutableBaseModel):
1014    """A data object representing the radio settings"""
1015    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1016    channel_a: int
1017    channel_b: int
1018    scan: bool
1019    aghfp_call_mode: int
1020    double_channel: int
1021    squelch_level: int
1022    tail_elim: bool
1023    auto_relay_en: bool
1024    auto_power_on: bool
1025    keep_aghfp_link: bool
1026    mic_gain: int
1027    tx_hold_time: int
1028    tx_time_limit: int
1029    local_speaker: int
1030    bt_mic_gain: int
1031    adaptive_response: bool
1032    dis_tone: bool
1033    power_saving_mode: bool
1034    auto_power_off: int
1035    auto_share_loc_ch: int | t.Literal["current"]
1036    hm_speaker: int
1037    positioning_system: int
1038    time_offset: int
1039    use_freq_range_2: bool
1040    ptt_lock: bool
1041    leading_sync_bit_en: bool
1042    pairing_at_power_on: bool
1043    screen_timeout: int
1044    vfo_x: int
1045    imperial_unit: bool
1046    wx_mode: int
1047    noaa_ch: int
1048    vfol_tx_power_x: int
1049    vfo2_tx_power_x: int
1050    dis_digital_mute: bool
1051    signaling_ecc_en: bool
1052    ch_data_lock: bool
1053    vfo1_mod_freq_x: int
1054    vfo2_mod_freq_x: int
1055
1056    @classmethod
1057    def from_protocol(cls, rs: p.Settings) -> Settings:
1058        """@private (Protocol helper)"""
1059        return Settings(
1060            channel_a=cls._channel_split.from_parts(
1061                rs.channel_a_upper, rs.channel_a_lower
1062            ),
1063            channel_b=cls._channel_split.from_parts(
1064                rs.channel_b_upper, rs.channel_b_lower
1065            ),
1066            scan=rs.scan,
1067            aghfp_call_mode=rs.aghfp_call_mode,
1068            double_channel=rs.double_channel,
1069            squelch_level=rs.squelch_level,
1070            tail_elim=rs.tail_elim,
1071            auto_relay_en=rs.auto_relay_en,
1072            auto_power_on=rs.auto_power_on,
1073            keep_aghfp_link=rs.keep_aghfp_link,
1074            mic_gain=rs.mic_gain,
1075            tx_hold_time=rs.tx_hold_time,
1076            tx_time_limit=rs.tx_time_limit,
1077            local_speaker=rs.local_speaker,
1078            bt_mic_gain=rs.bt_mic_gain,
1079            adaptive_response=rs.adaptive_response,
1080            dis_tone=rs.dis_tone,
1081            power_saving_mode=rs.power_saving_mode,
1082            auto_power_off=rs.auto_power_off,
1083            auto_share_loc_ch=rs.auto_share_loc_ch,
1084            hm_speaker=rs.hm_speaker,
1085            positioning_system=rs.positioning_system,
1086            time_offset=rs.time_offset,
1087            use_freq_range_2=rs.use_freq_range_2,
1088            ptt_lock=rs.ptt_lock,
1089            leading_sync_bit_en=rs.leading_sync_bit_en,
1090            pairing_at_power_on=rs.pairing_at_power_on,
1091            screen_timeout=rs.screen_timeout,
1092            vfo_x=rs.vfo_x,
1093            imperial_unit=rs.imperial_unit,
1094            wx_mode=rs.wx_mode,
1095            noaa_ch=rs.noaa_ch,
1096            vfol_tx_power_x=rs.vfol_tx_power_x,
1097            vfo2_tx_power_x=rs.vfo2_tx_power_x,
1098            dis_digital_mute=rs.dis_digital_mute,
1099            signaling_ecc_en=rs.signaling_ecc_en,
1100            ch_data_lock=rs.ch_data_lock,
1101            vfo1_mod_freq_x=rs.vfo1_mod_freq_x,
1102            vfo2_mod_freq_x=rs.vfo2_mod_freq_x
1103        )
1104
1105    def to_protocol(self):
1106        """@private (Protocol helper)"""
1107        return p.Settings(
1108            channel_a_lower=self._channel_split.get_lower(self.channel_a),
1109            channel_b_lower=self._channel_split.get_lower(self.channel_b),
1110            scan=self.scan,
1111            aghfp_call_mode=self.aghfp_call_mode,
1112            double_channel=self.double_channel,
1113            squelch_level=self.squelch_level,
1114            tail_elim=self.tail_elim,
1115            auto_relay_en=self.auto_relay_en,
1116            auto_power_on=self.auto_power_on,
1117            keep_aghfp_link=self.keep_aghfp_link,
1118            mic_gain=self.mic_gain,
1119            tx_hold_time=self.tx_hold_time,
1120            tx_time_limit=self.tx_time_limit,
1121            local_speaker=self.local_speaker,
1122            bt_mic_gain=self.bt_mic_gain,
1123            adaptive_response=self.adaptive_response,
1124            dis_tone=self.dis_tone,
1125            power_saving_mode=self.power_saving_mode,
1126            auto_power_off=self.auto_power_off,
1127            auto_share_loc_ch=self.auto_share_loc_ch,
1128            hm_speaker=self.hm_speaker,
1129            positioning_system=self.positioning_system,
1130            time_offset=self.time_offset,
1131            use_freq_range_2=self.use_freq_range_2,
1132            ptt_lock=self.ptt_lock,
1133            leading_sync_bit_en=self.leading_sync_bit_en,
1134            pairing_at_power_on=self.pairing_at_power_on,
1135            screen_timeout=self.screen_timeout,
1136            vfo_x=self.vfo_x,
1137            imperial_unit=self.imperial_unit,
1138            channel_a_upper=self._channel_split.get_upper(self.channel_a),
1139            channel_b_upper=self._channel_split.get_upper(self.channel_b),
1140            wx_mode=self.wx_mode,
1141            noaa_ch=self.noaa_ch,
1142            vfol_tx_power_x=self.vfol_tx_power_x,
1143            vfo2_tx_power_x=self.vfo2_tx_power_x,
1144            dis_digital_mute=self.dis_digital_mute,
1145            signaling_ecc_en=self.signaling_ecc_en,
1146            ch_data_lock=self.ch_data_lock,
1147            vfo1_mod_freq_x=self.vfo1_mod_freq_x,
1148            vfo2_mod_freq_x=self.vfo2_mod_freq_x
1149        )
1150
1151
1152class DeviceInfo(ImmutableBaseModel):
1153    """A data object representing the device information"""
1154    vendor_id: int
1155    product_id: int
1156    hardware_version: int
1157    firmware_version: int
1158    supports_radio: bool
1159    supports_medium_power: bool
1160    fixed_location_speaker_volume: bool
1161    has_speaker: bool
1162    has_hand_microphone_speaker: bool
1163    region_count: int
1164    supports_noaa: bool
1165    supports_gmrs: bool
1166    supports_vfo: bool
1167    supports_dmr: bool
1168    supports_software_power_control: bool
1169    channel_count: int
1170    frequency_range_count: int
1171
1172    @classmethod
1173    def from_protocol(cls, info: p.DevInfo) -> DeviceInfo:
1174        """@private (Protocol helper)"""
1175        return DeviceInfo(
1176            vendor_id=info.vendor_id,
1177            product_id=info.product_id,
1178            hardware_version=info.hw_ver,
1179            firmware_version=info.soft_ver,
1180            supports_radio=info.support_radio,
1181            supports_medium_power=info.support_medium_power,
1182            fixed_location_speaker_volume=info.fixed_loc_speaker_vol,
1183            supports_software_power_control=not info.not_support_soft_power_ctrl,
1184            has_speaker=not info.have_no_speaker,
1185            has_hand_microphone_speaker=info.have_hm_speaker,
1186            region_count=info.region_count,
1187            supports_noaa=info.support_noaa,
1188            supports_gmrs=info.gmrs,
1189            supports_vfo=info.support_vfo,
1190            supports_dmr=info.support_dmr,
1191            channel_count=info.channel_count,
1192            frequency_range_count=info.freq_range_count
1193        )
1194
1195    def to_protocol(self) -> p.DevInfo:
1196        """@private (Protocol helper)"""
1197        return p.DevInfo(
1198            vendor_id=self.vendor_id,
1199            product_id=self.product_id,
1200            hw_ver=self.hardware_version,
1201            soft_ver=self.firmware_version,
1202            support_radio=self.supports_radio,
1203            support_medium_power=self.supports_medium_power,
1204            fixed_loc_speaker_vol=self.fixed_location_speaker_volume,
1205            not_support_soft_power_ctrl=not self.supports_software_power_control,
1206            have_no_speaker=not self.has_speaker,
1207            have_hm_speaker=self.has_hand_microphone_speaker,
1208            region_count=self.region_count,
1209            support_noaa=self.supports_noaa,
1210            gmrs=self.supports_gmrs,
1211            support_vfo=self.supports_vfo,
1212            support_dmr=self.supports_dmr,
1213            channel_count=self.channel_count,
1214            freq_range_count=self.frequency_range_count
1215        )
1216
1217
1218class BeaconSettingsArgs(t.TypedDict, total=False):
1219    """A dictionary of the parameters that can be set in the beacon settings"""
1220    max_fwd_times: int
1221    time_to_live: int
1222    ptt_release_send_location: bool
1223    ptt_release_send_id_info: bool
1224    ptt_release_send_bss_user_id: bool
1225    should_share_location: bool
1226    send_pwr_voltage: bool
1227    packet_format: t.Literal["BSS", "APRS"]
1228    allow_position_check: bool
1229    aprs_ssid: int
1230    location_share_interval: int
1231    bss_user_id: int
1232    ptt_release_id_info: str
1233    beacon_message: str
1234    aprs_symbol: str
1235    aprs_callsign: str
1236
1237
1238class BeaconSettings(ImmutableBaseModel):
1239    """A data object representing the beacon settings"""
1240    _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32)
1241    max_fwd_times: int
1242    time_to_live: int
1243    ptt_release_send_location: bool
1244    ptt_release_send_id_info: bool
1245    ptt_release_send_bss_user_id: bool
1246    should_share_location: bool
1247    send_pwr_voltage: bool
1248    packet_format: t.Literal["BSS", "APRS"]
1249    allow_position_check: bool
1250    aprs_ssid: int
1251    location_share_interval: int
1252    bss_user_id: int
1253    ptt_release_id_info: str
1254    beacon_message: str
1255    aprs_symbol: str
1256    aprs_callsign: str
1257
1258    @classmethod
1259    def from_protocol(cls, bs: p.BSSSettingsExt | p.BSSSettings) -> BeaconSettings:
1260        """@private (Protocol helper)"""
1261
1262        if not isinstance(bs, p.BSSSettingsExt):
1263            raise ValueError(
1264                "Radio replied with old BSSSettings message version. Upgrade your firmware!"
1265            )
1266
1267        return BeaconSettings(
1268            max_fwd_times=bs.max_fwd_times,
1269            time_to_live=bs.time_to_live,
1270            ptt_release_send_location=bs.ptt_release_send_location,
1271            ptt_release_send_id_info=bs.ptt_release_send_id_info,
1272            ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id,
1273            should_share_location=bs.should_share_location,
1274            send_pwr_voltage=bs.send_pwr_voltage,
1275            packet_format=bs.packet_format.name,
1276            allow_position_check=bs.allow_position_check,
1277            aprs_ssid=bs.aprs_ssid,
1278            location_share_interval=bs.location_share_interval,
1279            bss_user_id=cls._bss_user_id_split.from_parts(
1280                bs.bss_user_id_upper, bs.bss_user_id_lower
1281            ),
1282            ptt_release_id_info=bs.ptt_release_id_info,
1283            beacon_message=bs.beacon_message,
1284            aprs_symbol=bs.aprs_symbol,
1285            aprs_callsign=bs.aprs_callsign
1286        )
1287
1288    def to_protocol(self) -> p.BSSSettingsExt:
1289        """@private (Protocol helper)"""
1290        return p.BSSSettingsExt(
1291            max_fwd_times=self.max_fwd_times,
1292            time_to_live=self.time_to_live,
1293            ptt_release_send_location=self.ptt_release_send_location,
1294            ptt_release_send_id_info=self.ptt_release_send_id_info,
1295            ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id,
1296            should_share_location=self.should_share_location,
1297            send_pwr_voltage=self.send_pwr_voltage,
1298            packet_format=p.PacketFormat[self.packet_format],
1299            allow_position_check=self.allow_position_check,
1300            aprs_ssid=self.aprs_ssid,
1301            location_share_interval=self.location_share_interval,
1302            bss_user_id_lower=self._bss_user_id_split.get_lower(
1303                self.bss_user_id
1304            ),
1305            ptt_release_id_info=self.ptt_release_id_info,
1306            beacon_message=self.beacon_message,
1307            aprs_symbol=self.aprs_symbol,
1308            aprs_callsign=self.aprs_callsign,
1309            bss_user_id_upper=self._bss_user_id_split.get_upper(
1310                self.bss_user_id
1311            ),
1312        )
1313
1314
1315ChannelType = t.Literal["OFF", "A", "B"]
1316
1317
1318class Status(ImmutableBaseModel):
1319    """A data object representing the radio status"""
1320    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1321    is_power_on: bool
1322    is_in_tx: bool
1323    is_sq: bool
1324    is_in_rx: bool
1325    double_channel: ChannelType
1326    is_scan: bool
1327    is_radio: bool
1328    curr_ch_id: int
1329    is_gps_locked: bool
1330    is_hfp_connected: bool
1331    is_aoc_connected: bool
1332    rssi: float
1333    curr_region: int
1334
1335    @classmethod
1336    def from_protocol(cls, s: p.Status | p.StatusExt) -> Status:
1337        """@private (Protocol helper)"""
1338        if not isinstance(s, p.StatusExt):
1339            raise ValueError(
1340                "Radio replied with old Status message version. Upgrade your firmware!"
1341            )
1342
1343        return Status(
1344            is_power_on=s.is_power_on,
1345            is_in_tx=s.is_in_tx,
1346            is_sq=s.is_sq,
1347            is_in_rx=s.is_in_rx,
1348            double_channel=s.double_channel.name,
1349            is_scan=s.is_scan,
1350            is_radio=s.is_radio,
1351            curr_ch_id=cls._channel_split.from_parts(
1352                s.curr_channel_id_upper, s.curr_ch_id_lower
1353            ),
1354            is_gps_locked=s.is_gps_locked,
1355            is_hfp_connected=s.is_hfp_connected,
1356            is_aoc_connected=s.is_aoc_connected,
1357            rssi=s.rssi,
1358            curr_region=s.curr_region
1359        )
1360
1361    def to_protocol(self) -> p.StatusExt:
1362        """@private (Protocol helper)"""
1363        return p.StatusExt(
1364            is_power_on=self.is_power_on,
1365            is_in_tx=self.is_in_tx,
1366            is_sq=self.is_sq,
1367            is_in_rx=self.is_in_rx,
1368            double_channel=p.ChannelType[self.double_channel],
1369            is_scan=self.is_scan,
1370            is_radio=self.is_radio,
1371            curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id),
1372            is_gps_locked=self.is_gps_locked,
1373            is_hfp_connected=self.is_hfp_connected,
1374            is_aoc_connected=self.is_aoc_connected,
1375            rssi=self.rssi,
1376            curr_region=self.curr_region,
1377            curr_channel_id_upper=self._channel_split.get_upper(
1378                self.curr_ch_id)
1379        )
1380
1381
1382class Position(ImmutableBaseModel):
1383    """A data object for representing GPS positions"""
1384    latitude: float
1385    longitude: float
1386    altitude: int | None
1387    speed: int | None
1388    heading: int | None
1389    time: datetime
1390    accuracy: int
1391
1392    @classmethod
1393    def from_protocol(cls, x: p.Position) -> Position:
1394        """@private (Protocol helper)"""
1395        return Position(
1396            latitude=x.latitude,
1397            longitude=x.longitude,
1398            altitude=x.altitude,
1399            speed=x.speed,
1400            heading=x.heading,
1401            time=x.time,
1402            accuracy=x.accuracy,
1403        )
1404
1405    def to_protocol(self) -> p.Position:
1406        """@private (Protocol helper)"""
1407        return p.Position(
1408            latitude=self.latitude,
1409            longitude=self.longitude,
1410            altitude=self.altitude,
1411            speed=self.speed,
1412            heading=self.heading,
1413            time=self.time,
1414            accuracy=self.accuracy,
1415        )
class CommandConnection:
 54class CommandConnection:
 55    _link: CommandLink
 56    _handlers: t.List[RadioMessageHandler] = []
 57
 58    def __init__(self, link: CommandLink):
 59        self._link = link
 60        self._handlers = []
 61
 62    @classmethod
 63    def new_ble(cls, device_uuid: str) -> CommandConnection:
 64        return cls(BleCommandLink(device_uuid))
 65
 66    @classmethod
 67    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection:
 68        return cls(RfcommCommandLink(device_uuid, channel))
 69
 70    def is_connected(self) -> bool:
 71        return self._link.is_connected()
 72
 73    async def connect(self) -> None:
 74        await self._link.connect(self._on_recv)
 75
 76    async def disconnect(self) -> None:
 77        self._handlers.clear()
 78        await self._link.disconnect()
 79
 80    async def send_bytes(self, data: bytes) -> None:
 81        await self._link.send_bytes(data)
 82
 83    async def send_message(self, command: CommandMessage) -> None:
 84        await self._link.send(command_message_to_protocol(command))
 85
 86    async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError:
 87        queue: asyncio.Queue[RadioMessageT |
 88                             MessageReplyError] = asyncio.Queue()
 89
 90        def reply_handler(reply: RadioMessage):
 91            if (
 92                isinstance(reply, expect) or
 93                (
 94                    isinstance(reply, MessageReplyError) and
 95                    reply.message_type is expect
 96                )
 97            ):
 98                queue.put_nowait(reply)
 99
100        remove_handler = self._add_message_handler(reply_handler)
101
102        await self.send_message(command)
103
104        out = await queue.get()
105
106        remove_handler()
107
108        return out
109
110    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
111        def event_handler(msg: RadioMessage):
112            if isinstance(msg, EventMessage):
113                handler(msg)
114        return self._add_message_handler(event_handler)
115
116    def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]:
117        self._handlers.append(handler)
118
119        def remove_handler():
120            self._handlers.remove(handler)
121
122        return remove_handler
123
124    def _on_recv(self, msg: p.Message) -> None:
125        radio_message = radio_message_from_protocol(msg)
126        for handler in self._handlers:
127            handler(radio_message)
128
129    # Command API
130
131    async def enable_event(self, event_type: EventType) -> None:
132        """Enable an event"""
133        # This event doesn't get a reply version of EnableEvent. It does, however
134        # does trigger a StatusChangedEvent... but we don't wait for it here.
135        # Instead, we just fire and forget
136        await self.send_message(EnableEvent(event_type))
137
138    async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
139        """Send Tnc data"""
140        reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply)
141        if isinstance(reply, MessageReplyError):
142            raise reply.as_exception()
143
144    async def get_beacon_settings(self) -> BeaconSettings:
145        """Get the current packet settings"""
146        reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply)
147        if isinstance(reply, MessageReplyError):
148            raise reply.as_exception()
149        return reply.tnc_settings
150
151    async def set_beacon_settings(self, packet_settings: BeaconSettings):
152        """Set the packet settings"""
153        reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply)
154        if isinstance(reply, MessageReplyError):
155            raise reply.as_exception()
156
157    async def get_battery_level(self) -> int:
158        """Get the battery level"""
159        reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply)
160        if isinstance(reply, MessageReplyError):
161            raise reply.as_exception()
162        return reply.battery_level
163
164    async def get_battery_level_as_percentage(self) -> int:
165        """Get the battery level as a percentage"""
166        reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply)
167        if isinstance(reply, MessageReplyError):
168            raise reply.as_exception()
169        return reply.battery_level_as_percentage
170
171    async def get_rc_battery_level(self) -> int:
172        """Get the RC battery level"""
173        reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply)
174        if isinstance(reply, MessageReplyError):
175            raise reply.as_exception()
176        return reply.rc_battery_level
177
178    async def get_battery_voltage(self) -> float:
179        """Get the battery voltage"""
180        reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply)
181        if isinstance(reply, MessageReplyError):
182            raise reply.as_exception()
183        return reply.battery_voltage
184
185    async def get_device_info(self) -> DeviceInfo:
186        """Get the device info"""
187        reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply)
188        if isinstance(reply, MessageReplyError):
189            raise reply.as_exception()
190        return reply.device_info
191
192    async def get_settings(self) -> Settings:
193        """Get the settings"""
194        reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply)
195        if isinstance(reply, MessageReplyError):
196            raise reply.as_exception()
197        return reply.settings
198
199    async def set_settings(self, settings: Settings) -> None:
200        """Set the settings"""
201        reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply)
202        if isinstance(reply, MessageReplyError):
203            raise reply.as_exception()
204
205    async def get_channel(self, channel_id: int) -> Channel:
206        """Get a channel"""
207        reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply)
208        if isinstance(reply, MessageReplyError):
209            raise reply.as_exception()
210        return reply.channel
211
212    async def set_channel(self, channel: Channel):
213        """Set a channel"""
214        reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply)
215        if isinstance(reply, MessageReplyError):
216            raise reply.as_exception()
217
218    async def get_status(self) -> Status:
219        """Get the radio status"""
220        reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply)
221        if isinstance(reply, MessageReplyError):
222            raise reply.as_exception()
223        return reply.status
224
225    async def get_position(self) -> Position:
226        """Get the radio position"""
227        reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply)
228        if isinstance(reply, MessageReplyError):
229            raise reply.as_exception()
230        return reply.position
231
232    async def __aenter__(self):
233        await self.connect()
234        return self
235
236    async def __aexit__(
237        self,
238        exc_type: t.Any,
239        exc_value: t.Any,
240        traceback: t.Any,
241    ) -> None:
242        await self.disconnect()
CommandConnection(link: benlink.link.CommandLink)
58    def __init__(self, link: CommandLink):
59        self._link = link
60        self._handlers = []
@classmethod
def new_ble(cls, device_uuid: str) -> CommandConnection:
62    @classmethod
63    def new_ble(cls, device_uuid: str) -> CommandConnection:
64        return cls(BleCommandLink(device_uuid))
@classmethod
def new_rfcomm( cls, device_uuid: str, channel: Union[int, Literal['auto']] = 'auto') -> CommandConnection:
66    @classmethod
67    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection:
68        return cls(RfcommCommandLink(device_uuid, channel))
def is_connected(self) -> bool:
70    def is_connected(self) -> bool:
71        return self._link.is_connected()
async def connect(self) -> None:
73    async def connect(self) -> None:
74        await self._link.connect(self._on_recv)
async def disconnect(self) -> None:
76    async def disconnect(self) -> None:
77        self._handlers.clear()
78        await self._link.disconnect()
async def send_bytes(self, data: bytes) -> None:
80    async def send_bytes(self, data: bytes) -> None:
81        await self._link.send_bytes(data)
83    async def send_message(self, command: CommandMessage) -> None:
84        await self._link.send(command_message_to_protocol(command))
async def send_message_expect_reply( self, command: Union[GetBeaconSettings, SetBeaconSettings, GetRCBatteryLevel, GetBatteryLevelAsPercentage, GetBatteryLevel, GetBatteryVoltage, GetDeviceInfo, GetChannel, SetChannel, GetSettings, SetSettings, SendTncDataFragment, EnableEvent, GetStatus, GetPosition], expect: Type[~RadioMessageT]) -> Union[~RadioMessageT, MessageReplyError]:
 86    async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError:
 87        queue: asyncio.Queue[RadioMessageT |
 88                             MessageReplyError] = asyncio.Queue()
 89
 90        def reply_handler(reply: RadioMessage):
 91            if (
 92                isinstance(reply, expect) or
 93                (
 94                    isinstance(reply, MessageReplyError) and
 95                    reply.message_type is expect
 96                )
 97            ):
 98                queue.put_nowait(reply)
 99
100        remove_handler = self._add_message_handler(reply_handler)
101
102        await self.send_message(command)
103
104        out = await queue.get()
105
106        remove_handler()
107
108        return out
def add_event_handler( self, handler: Callable[[Union[TncDataFragmentReceivedEvent, SettingsChangedEvent, ChannelChangedEvent, StatusChangedEvent, UnknownProtocolMessage]], NoneType]) -> Callable[[], NoneType]:
110    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
111        def event_handler(msg: RadioMessage):
112            if isinstance(msg, EventMessage):
113                handler(msg)
114        return self._add_message_handler(event_handler)
async def enable_event( self, event_type: Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']) -> None:
131    async def enable_event(self, event_type: EventType) -> None:
132        """Enable an event"""
133        # This event doesn't get a reply version of EnableEvent. It does, however
134        # does trigger a StatusChangedEvent... but we don't wait for it here.
135        # Instead, we just fire and forget
136        await self.send_message(EnableEvent(event_type))

Enable an event

async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
138    async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
139        """Send Tnc data"""
140        reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply)
141        if isinstance(reply, MessageReplyError):
142            raise reply.as_exception()

Send Tnc data

async def get_beacon_settings(self) -> BeaconSettings:
144    async def get_beacon_settings(self) -> BeaconSettings:
145        """Get the current packet settings"""
146        reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply)
147        if isinstance(reply, MessageReplyError):
148            raise reply.as_exception()
149        return reply.tnc_settings

Get the current packet settings

async def set_beacon_settings(self, packet_settings: BeaconSettings):
151    async def set_beacon_settings(self, packet_settings: BeaconSettings):
152        """Set the packet settings"""
153        reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply)
154        if isinstance(reply, MessageReplyError):
155            raise reply.as_exception()

Set the packet settings

async def get_battery_level(self) -> int:
157    async def get_battery_level(self) -> int:
158        """Get the battery level"""
159        reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply)
160        if isinstance(reply, MessageReplyError):
161            raise reply.as_exception()
162        return reply.battery_level

Get the battery level

async def get_battery_level_as_percentage(self) -> int:
164    async def get_battery_level_as_percentage(self) -> int:
165        """Get the battery level as a percentage"""
166        reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply)
167        if isinstance(reply, MessageReplyError):
168            raise reply.as_exception()
169        return reply.battery_level_as_percentage

Get the battery level as a percentage

async def get_rc_battery_level(self) -> int:
171    async def get_rc_battery_level(self) -> int:
172        """Get the RC battery level"""
173        reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply)
174        if isinstance(reply, MessageReplyError):
175            raise reply.as_exception()
176        return reply.rc_battery_level

Get the RC battery level

async def get_battery_voltage(self) -> float:
178    async def get_battery_voltage(self) -> float:
179        """Get the battery voltage"""
180        reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply)
181        if isinstance(reply, MessageReplyError):
182            raise reply.as_exception()
183        return reply.battery_voltage

Get the battery voltage

async def get_device_info(self) -> DeviceInfo:
185    async def get_device_info(self) -> DeviceInfo:
186        """Get the device info"""
187        reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply)
188        if isinstance(reply, MessageReplyError):
189            raise reply.as_exception()
190        return reply.device_info

Get the device info

async def get_settings(self) -> Settings:
192    async def get_settings(self) -> Settings:
193        """Get the settings"""
194        reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply)
195        if isinstance(reply, MessageReplyError):
196            raise reply.as_exception()
197        return reply.settings

Get the settings

async def set_settings(self, settings: Settings) -> None:
199    async def set_settings(self, settings: Settings) -> None:
200        """Set the settings"""
201        reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply)
202        if isinstance(reply, MessageReplyError):
203            raise reply.as_exception()

Set the settings

async def get_channel(self, channel_id: int) -> Channel:
205    async def get_channel(self, channel_id: int) -> Channel:
206        """Get a channel"""
207        reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply)
208        if isinstance(reply, MessageReplyError):
209            raise reply.as_exception()
210        return reply.channel

Get a channel

async def set_channel(self, channel: Channel):
212    async def set_channel(self, channel: Channel):
213        """Set a channel"""
214        reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply)
215        if isinstance(reply, MessageReplyError):
216            raise reply.as_exception()

Set a channel

async def get_status(self) -> Status:
218    async def get_status(self) -> Status:
219        """Get the radio status"""
220        reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply)
221        if isinstance(reply, MessageReplyError):
222            raise reply.as_exception()
223        return reply.status

Get the radio status

async def get_position(self) -> Position:
225    async def get_position(self) -> Position:
226        """Get the radio position"""
227        reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply)
228        if isinstance(reply, MessageReplyError):
229            raise reply.as_exception()
230        return reply.position

Get the radio position

class EnableEvent(typing.NamedTuple):
543class EnableEvent(t.NamedTuple):
544    event_type: EventType

EnableEvent(event_type,)

EnableEvent(event_type: ForwardRef('EventType'))

Create new instance of EnableEvent(event_type,)

event_type: Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']

Alias for field number 0

class GetBeaconSettings(typing.NamedTuple):
547class GetBeaconSettings(t.NamedTuple):
548    pass

GetBeaconSettings()

GetBeaconSettings()

Create new instance of GetBeaconSettings()

class SetBeaconSettings(typing.NamedTuple):
551class SetBeaconSettings(t.NamedTuple):
552    tnc_settings: BeaconSettings

SetBeaconSettings(tnc_settings,)

SetBeaconSettings(tnc_settings: ForwardRef('BeaconSettings'))

Create new instance of SetBeaconSettings(tnc_settings,)

tnc_settings: BeaconSettings

Alias for field number 0

class SetSettings(typing.NamedTuple):
555class SetSettings(t.NamedTuple):
556    settings: Settings

SetSettings(settings,)

SetSettings(settings: ForwardRef('Settings'))

Create new instance of SetSettings(settings,)

settings: Settings

Alias for field number 0

class GetBatteryLevelAsPercentage(typing.NamedTuple):
559class GetBatteryLevelAsPercentage(t.NamedTuple):
560    pass

GetBatteryLevelAsPercentage()

GetBatteryLevelAsPercentage()

Create new instance of GetBatteryLevelAsPercentage()

class GetRCBatteryLevel(typing.NamedTuple):
563class GetRCBatteryLevel(t.NamedTuple):
564    pass

GetRCBatteryLevel()

GetRCBatteryLevel()

Create new instance of GetRCBatteryLevel()

class GetBatteryLevel(typing.NamedTuple):
567class GetBatteryLevel(t.NamedTuple):
568    pass

GetBatteryLevel()

GetBatteryLevel()

Create new instance of GetBatteryLevel()

class GetBatteryVoltage(typing.NamedTuple):
571class GetBatteryVoltage(t.NamedTuple):
572    pass

GetBatteryVoltage()

GetBatteryVoltage()

Create new instance of GetBatteryVoltage()

class GetDeviceInfo(typing.NamedTuple):
575class GetDeviceInfo(t.NamedTuple):
576    pass

GetDeviceInfo()

GetDeviceInfo()

Create new instance of GetDeviceInfo()

class GetChannel(typing.NamedTuple):
579class GetChannel(t.NamedTuple):
580    channel_id: int

GetChannel(channel_id,)

GetChannel(channel_id: int)

Create new instance of GetChannel(channel_id,)

channel_id: int

Alias for field number 0

class SetChannel(typing.NamedTuple):
583class SetChannel(t.NamedTuple):
584    channel: Channel

SetChannel(channel,)

SetChannel(channel: ForwardRef('Channel'))

Create new instance of SetChannel(channel,)

channel: Channel

Alias for field number 0

class GetSettings(typing.NamedTuple):
587class GetSettings(t.NamedTuple):
588    pass

GetSettings()

GetSettings()

Create new instance of GetSettings()

class GetStatus(typing.NamedTuple):
591class GetStatus(t.NamedTuple):
592    pass

GetStatus()

GetStatus()

Create new instance of GetStatus()

class GetPosition(typing.NamedTuple):
595class GetPosition(t.NamedTuple):
596    pass

GetPosition()

GetPosition()

Create new instance of GetPosition()

class SendTncDataFragment(typing.NamedTuple):
599class SendTncDataFragment(t.NamedTuple):
600    tnc_data_fragment: TncDataFragment

SendTncDataFragment(tnc_data_fragment,)

SendTncDataFragment(tnc_data_fragment: ForwardRef('TncDataFragment'))

Create new instance of SendTncDataFragment(tnc_data_fragment,)

tnc_data_fragment: TncDataFragment

Alias for field number 0

class SendTncDataFragmentReply(typing.NamedTuple):
625class SendTncDataFragmentReply(t.NamedTuple):
626    pass

SendTncDataFragmentReply()

SendTncDataFragmentReply()

Create new instance of SendTncDataFragmentReply()

class GetBeaconSettingsReply(typing.NamedTuple):
629class GetBeaconSettingsReply(t.NamedTuple):
630    tnc_settings: BeaconSettings

GetBeaconSettingsReply(tnc_settings,)

GetBeaconSettingsReply(tnc_settings: ForwardRef('BeaconSettings'))

Create new instance of GetBeaconSettingsReply(tnc_settings,)

tnc_settings: BeaconSettings

Alias for field number 0

class SetBeaconSettingsReply(typing.NamedTuple):
633class SetBeaconSettingsReply(t.NamedTuple):
634    pass

SetBeaconSettingsReply()

SetBeaconSettingsReply()

Create new instance of SetBeaconSettingsReply()

class SetSettingsReply(typing.NamedTuple):
637class SetSettingsReply(t.NamedTuple):
638    pass

SetSettingsReply()

SetSettingsReply()

Create new instance of SetSettingsReply()

class GetBatteryLevelAsPercentageReply(typing.NamedTuple):
641class GetBatteryLevelAsPercentageReply(t.NamedTuple):
642    battery_level_as_percentage: int

GetBatteryLevelAsPercentageReply(battery_level_as_percentage,)

GetBatteryLevelAsPercentageReply(battery_level_as_percentage: int)

Create new instance of GetBatteryLevelAsPercentageReply(battery_level_as_percentage,)

battery_level_as_percentage: int

Alias for field number 0

class GetRCBatteryLevelReply(typing.NamedTuple):
645class GetRCBatteryLevelReply(t.NamedTuple):
646    rc_battery_level: int

GetRCBatteryLevelReply(rc_battery_level,)

GetRCBatteryLevelReply(rc_battery_level: int)

Create new instance of GetRCBatteryLevelReply(rc_battery_level,)

rc_battery_level: int

Alias for field number 0

class GetBatteryLevelReply(typing.NamedTuple):
649class GetBatteryLevelReply(t.NamedTuple):
650    battery_level: int

GetBatteryLevelReply(battery_level,)

GetBatteryLevelReply(battery_level: int)

Create new instance of GetBatteryLevelReply(battery_level,)

battery_level: int

Alias for field number 0

class GetBatteryVoltageReply(typing.NamedTuple):
653class GetBatteryVoltageReply(t.NamedTuple):
654    battery_voltage: float

GetBatteryVoltageReply(battery_voltage,)

GetBatteryVoltageReply(battery_voltage: float)

Create new instance of GetBatteryVoltageReply(battery_voltage,)

battery_voltage: float

Alias for field number 0

class GetDeviceInfoReply(typing.NamedTuple):
657class GetDeviceInfoReply(t.NamedTuple):
658    device_info: DeviceInfo

GetDeviceInfoReply(device_info,)

GetDeviceInfoReply(device_info: ForwardRef('DeviceInfo'))

Create new instance of GetDeviceInfoReply(device_info,)

device_info: DeviceInfo

Alias for field number 0

class GetChannelReply(typing.NamedTuple):
661class GetChannelReply(t.NamedTuple):
662    channel: Channel

GetChannelReply(channel,)

GetChannelReply(channel: ForwardRef('Channel'))

Create new instance of GetChannelReply(channel,)

channel: Channel

Alias for field number 0

class SetChannelReply(typing.NamedTuple):
665class SetChannelReply(t.NamedTuple):
666    pass

SetChannelReply()

SetChannelReply()

Create new instance of SetChannelReply()

class GetStatusReply(typing.NamedTuple):
669class GetStatusReply(t.NamedTuple):
670    status: Status

GetStatusReply(status,)

GetStatusReply(status: ForwardRef('Status'))

Create new instance of GetStatusReply(status,)

status: Status

Alias for field number 0

class GetSettingsReply(typing.NamedTuple):
673class GetSettingsReply(t.NamedTuple):
674    settings: Settings

GetSettingsReply(settings,)

GetSettingsReply(settings: ForwardRef('Settings'))

Create new instance of GetSettingsReply(settings,)

settings: Settings

Alias for field number 0

class GetPositionReply(typing.NamedTuple):
677class GetPositionReply(t.NamedTuple):
678    position: Position

GetPositionReply(position,)

GetPositionReply(position: ForwardRef('Position'))

Create new instance of GetPositionReply(position,)

position: Position

Alias for field number 0

ReplyStatus = typing.Literal['SUCCESS', 'NOT_SUPPORTED', 'NOT_AUTHENTICATED', 'INSUFFICIENT_RESOURCES', 'AUTHENTICATING', 'INVALID_PARAMETER', 'INCORRECT_STATE', 'IN_PROGRESS']
class MessageReplyError(typing.NamedTuple):
693class MessageReplyError(t.NamedTuple):
694    message_type: t.Type[t.Any]
695    reason: ReplyStatus
696
697    def as_exception(self):
698        return ValueError(f"{self.message_type.__name__} failed: {self.reason}")

MessageReplyError(message_type, reason)

MessageReplyError( message_type: ForwardRef('t.Type[t.Any]'), reason: ForwardRef('ReplyStatus'))

Create new instance of MessageReplyError(message_type, reason)

message_type: Type[Any]

Alias for field number 0

reason: Literal['SUCCESS', 'NOT_SUPPORTED', 'NOT_AUTHENTICATED', 'INSUFFICIENT_RESOURCES', 'AUTHENTICATING', 'INVALID_PARAMETER', 'INCORRECT_STATE', 'IN_PROGRESS']

Alias for field number 1

def as_exception(self):
697    def as_exception(self):
698        return ValueError(f"{self.message_type.__name__} failed: {self.reason}")
class StatusChangedEvent(typing.NamedTuple):
723class StatusChangedEvent(t.NamedTuple):
724    status: Status

StatusChangedEvent(status,)

StatusChangedEvent(status: ForwardRef('Status'))

Create new instance of StatusChangedEvent(status,)

status: Status

Alias for field number 0

class ChannelChangedEvent(typing.NamedTuple):
727class ChannelChangedEvent(t.NamedTuple):
728    channel: Channel

ChannelChangedEvent(channel,)

ChannelChangedEvent(channel: ForwardRef('Channel'))

Create new instance of ChannelChangedEvent(channel,)

channel: Channel

Alias for field number 0

class TncDataFragmentReceivedEvent(typing.NamedTuple):
731class TncDataFragmentReceivedEvent(t.NamedTuple):
732    tnc_data_fragment: TncDataFragment

TncDataFragmentReceivedEvent(tnc_data_fragment,)

TncDataFragmentReceivedEvent(tnc_data_fragment: ForwardRef('TncDataFragment'))

Create new instance of TncDataFragmentReceivedEvent(tnc_data_fragment,)

tnc_data_fragment: TncDataFragment

Alias for field number 0

class SettingsChangedEvent(typing.NamedTuple):
735class SettingsChangedEvent(t.NamedTuple):
736    settings: Settings

SettingsChangedEvent(settings,)

SettingsChangedEvent(settings: ForwardRef('Settings'))

Create new instance of SettingsChangedEvent(settings,)

settings: Settings

Alias for field number 0

class UnknownProtocolMessage(typing.NamedTuple):
739class UnknownProtocolMessage(t.NamedTuple):
740    message: p.Message

UnknownProtocolMessage(message,)

UnknownProtocolMessage(message: ForwardRef('p.Message'))

Create new instance of UnknownProtocolMessage(message,)

message: benlink.protocol.command.message.Message

Alias for field number 0

EventType = typing.Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']
class TncDataFragment(ImmutableBaseModel):
815class TncDataFragment(ImmutableBaseModel):
816    """A data object representing a message packet"""
817    is_final_fragment: bool
818    fragment_id: int
819    data: bytes
820    channel_id: int | None = None
821
822    @classmethod
823    def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment:
824        """@private (Protocol helper)"""
825        return TncDataFragment(
826            is_final_fragment=mp.is_final_fragment,
827            fragment_id=mp.fragment_id,
828            data=mp.data,
829            channel_id=mp.channel_id
830        )
831
832    def to_protocol(self) -> p.TncDataFragment:
833        """@private (Protocol helper)"""
834        return p.TncDataFragment(
835            is_final_fragment=self.is_final_fragment,
836            with_channel_id=self.channel_id is not None,
837            fragment_id=self.fragment_id,
838            data=self.data,
839            channel_id=self.channel_id
840        )

A data object representing a message packet

is_final_fragment: bool
fragment_id: int
data: bytes
channel_id: int | None
ModulationType = typing.Literal['AM', 'FM', 'DMR']
BandwidthType = typing.Literal['NARROW', 'WIDE']
class DCS(typing.NamedTuple):
848class DCS(t.NamedTuple):
849    """A type for setting Digital Coded Squelch (DCS) on channels"""
850
851    n: int
852    """The DCS Normal (N) code"""

A type for setting Digital Coded Squelch (DCS) on channels

DCS(n: int)

Create new instance of DCS(n,)

n: int

The DCS Normal (N) code

class ChannelArgs(typing.TypedDict):
873class ChannelArgs(t.TypedDict, total=False):
874    """A dictionary of the parameters that can be set on a channel"""
875    tx_mod: ModulationType
876    tx_freq: float
877    rx_mod: ModulationType
878    rx_freq: float
879    tx_sub_audio: float | DCS | None
880    rx_sub_audio: float | DCS | None
881    scan: bool
882    tx_at_max_power: bool
883    talk_around: bool
884    bandwidth: BandwidthType
885    pre_de_emph_bypass: bool
886    sign: bool
887    tx_at_med_power: bool
888    tx_disable: bool
889    fixed_freq: bool
890    fixed_bandwidth: bool
891    fixed_tx_power: bool
892    mute: bool
893    name: str

A dictionary of the parameters that can be set on a channel

tx_mod: Literal['AM', 'FM', 'DMR']
tx_freq: float
rx_mod: Literal['AM', 'FM', 'DMR']
rx_freq: float
tx_sub_audio: float | DCS | None
rx_sub_audio: float | DCS | None
scan: bool
tx_at_max_power: bool
talk_around: bool
bandwidth: Literal['NARROW', 'WIDE']
pre_de_emph_bypass: bool
sign: bool
tx_at_med_power: bool
tx_disable: bool
fixed_freq: bool
fixed_bandwidth: bool
fixed_tx_power: bool
mute: bool
name: str
class Channel(ImmutableBaseModel):
896class Channel(ImmutableBaseModel):
897    """A data object representing a radio channel"""
898    channel_id: int
899    tx_mod: ModulationType
900    tx_freq: float
901    rx_mod: ModulationType
902    rx_freq: float
903    tx_sub_audio: float | DCS | None
904    rx_sub_audio: float | DCS | None
905    scan: bool
906    tx_at_max_power: bool
907    talk_around: bool
908    bandwidth: BandwidthType
909    pre_de_emph_bypass: bool
910    sign: bool
911    tx_at_med_power: bool
912    tx_disable: bool
913    fixed_freq: bool
914    fixed_bandwidth: bool
915    fixed_tx_power: bool
916    mute: bool
917    name: str
918
919    @classmethod
920    def from_protocol(cls, cs: p.RfCh) -> Channel:
921        """@private (Protocol helper)"""
922        return Channel(
923            channel_id=cs.channel_id,
924            tx_mod=cs.tx_mod.name,
925            tx_freq=cs.tx_freq,
926            rx_mod=cs.rx_mod.name,
927            rx_freq=cs.rx_freq,
928            tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio),
929            rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio),
930            scan=cs.scan,
931            tx_at_max_power=cs.tx_at_max_power,
932            talk_around=cs.talk_around,
933            bandwidth=cs.bandwidth.name,
934            pre_de_emph_bypass=cs.pre_de_emph_bypass,
935            sign=cs.sign,
936            tx_at_med_power=cs.tx_at_med_power,
937            tx_disable=cs.tx_disable,
938            fixed_freq=cs.fixed_freq,
939            fixed_bandwidth=cs.fixed_bandwidth,
940            fixed_tx_power=cs.fixed_tx_power,
941            mute=cs.mute,
942            name=cs.name_str
943        )
944
945    def to_protocol(self) -> p.RfCh:
946        """@private (Protocol helper)"""
947        return p.RfCh(
948            channel_id=self.channel_id,
949            tx_mod=p.ModulationType[self.tx_mod],
950            tx_freq=self.tx_freq,
951            rx_mod=p.ModulationType[self.rx_mod],
952            rx_freq=self.rx_freq,
953            tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio),
954            rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio),
955            scan=self.scan,
956            tx_at_max_power=self.tx_at_max_power,
957            talk_around=self.talk_around,
958            bandwidth=p.BandwidthType[self.bandwidth],
959            pre_de_emph_bypass=self.pre_de_emph_bypass,
960            sign=self.sign,
961            tx_at_med_power=self.tx_at_med_power,
962            tx_disable=self.tx_disable,
963            fixed_freq=self.fixed_freq,
964            fixed_bandwidth=self.fixed_bandwidth,
965            fixed_tx_power=self.fixed_tx_power,
966            mute=self.mute,
967            name_str=self.name
968        )

A data object representing a radio channel

channel_id: int
tx_mod: Literal['AM', 'FM', 'DMR']
tx_freq: float
rx_mod: Literal['AM', 'FM', 'DMR']
rx_freq: float
tx_sub_audio: float | DCS | None
rx_sub_audio: float | DCS | None
scan: bool
tx_at_max_power: bool
talk_around: bool
bandwidth: Literal['NARROW', 'WIDE']
pre_de_emph_bypass: bool
sign: bool
tx_at_med_power: bool
tx_disable: bool
fixed_freq: bool
fixed_bandwidth: bool
fixed_tx_power: bool
mute: bool
name: str
class SettingsArgs(typing.TypedDict):
 971class SettingsArgs(t.TypedDict, total=False):
 972    """A dictionary of the parameters that can be set in the radio settings"""
 973    channel_a: int
 974    channel_b: int
 975    scan: bool
 976    aghfp_call_mode: int
 977    double_channel: int
 978    squelch_level: int
 979    tail_elim: bool
 980    auto_relay_en: bool
 981    auto_power_on: bool
 982    keep_aghfp_link: bool
 983    mic_gain: int
 984    tx_hold_time: int
 985    tx_time_limit: int
 986    local_speaker: int
 987    bt_mic_gain: int
 988    adaptive_response: bool
 989    dis_tone: bool
 990    power_saving_mode: bool
 991    auto_power_off: int
 992    auto_share_loc_ch: int | t.Literal["current"]
 993    hm_speaker: int
 994    positioning_system: int
 995    time_offset: int
 996    use_freq_range_2: bool
 997    ptt_lock: bool
 998    leading_sync_bit_en: bool
 999    pairing_at_power_on: bool
1000    screen_timeout: int
1001    vfo_x: int
1002    imperial_unit: bool
1003    wx_mode: int
1004    noaa_ch: int
1005    vfol_tx_power_x: int
1006    vfo2_tx_power_x: int
1007    dis_digital_mute: bool
1008    signaling_ecc_en: bool
1009    ch_data_lock: bool
1010    vfo1_mod_freq_x: int
1011    vfo2_mod_freq_x: int

A dictionary of the parameters that can be set in the radio settings

channel_a: int
channel_b: int
scan: bool
aghfp_call_mode: int
double_channel: int
squelch_level: int
tail_elim: bool
auto_relay_en: bool
auto_power_on: bool
mic_gain: int
tx_hold_time: int
tx_time_limit: int
local_speaker: int
bt_mic_gain: int
adaptive_response: bool
dis_tone: bool
power_saving_mode: bool
auto_power_off: int
auto_share_loc_ch: Union[int, Literal['current']]
hm_speaker: int
positioning_system: int
time_offset: int
use_freq_range_2: bool
ptt_lock: bool
leading_sync_bit_en: bool
pairing_at_power_on: bool
screen_timeout: int
vfo_x: int
imperial_unit: bool
wx_mode: int
noaa_ch: int
vfol_tx_power_x: int
vfo2_tx_power_x: int
dis_digital_mute: bool
signaling_ecc_en: bool
ch_data_lock: bool
vfo1_mod_freq_x: int
vfo2_mod_freq_x: int
class Settings(ImmutableBaseModel):
1014class Settings(ImmutableBaseModel):
1015    """A data object representing the radio settings"""
1016    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1017    channel_a: int
1018    channel_b: int
1019    scan: bool
1020    aghfp_call_mode: int
1021    double_channel: int
1022    squelch_level: int
1023    tail_elim: bool
1024    auto_relay_en: bool
1025    auto_power_on: bool
1026    keep_aghfp_link: bool
1027    mic_gain: int
1028    tx_hold_time: int
1029    tx_time_limit: int
1030    local_speaker: int
1031    bt_mic_gain: int
1032    adaptive_response: bool
1033    dis_tone: bool
1034    power_saving_mode: bool
1035    auto_power_off: int
1036    auto_share_loc_ch: int | t.Literal["current"]
1037    hm_speaker: int
1038    positioning_system: int
1039    time_offset: int
1040    use_freq_range_2: bool
1041    ptt_lock: bool
1042    leading_sync_bit_en: bool
1043    pairing_at_power_on: bool
1044    screen_timeout: int
1045    vfo_x: int
1046    imperial_unit: bool
1047    wx_mode: int
1048    noaa_ch: int
1049    vfol_tx_power_x: int
1050    vfo2_tx_power_x: int
1051    dis_digital_mute: bool
1052    signaling_ecc_en: bool
1053    ch_data_lock: bool
1054    vfo1_mod_freq_x: int
1055    vfo2_mod_freq_x: int
1056
1057    @classmethod
1058    def from_protocol(cls, rs: p.Settings) -> Settings:
1059        """@private (Protocol helper)"""
1060        return Settings(
1061            channel_a=cls._channel_split.from_parts(
1062                rs.channel_a_upper, rs.channel_a_lower
1063            ),
1064            channel_b=cls._channel_split.from_parts(
1065                rs.channel_b_upper, rs.channel_b_lower
1066            ),
1067            scan=rs.scan,
1068            aghfp_call_mode=rs.aghfp_call_mode,
1069            double_channel=rs.double_channel,
1070            squelch_level=rs.squelch_level,
1071            tail_elim=rs.tail_elim,
1072            auto_relay_en=rs.auto_relay_en,
1073            auto_power_on=rs.auto_power_on,
1074            keep_aghfp_link=rs.keep_aghfp_link,
1075            mic_gain=rs.mic_gain,
1076            tx_hold_time=rs.tx_hold_time,
1077            tx_time_limit=rs.tx_time_limit,
1078            local_speaker=rs.local_speaker,
1079            bt_mic_gain=rs.bt_mic_gain,
1080            adaptive_response=rs.adaptive_response,
1081            dis_tone=rs.dis_tone,
1082            power_saving_mode=rs.power_saving_mode,
1083            auto_power_off=rs.auto_power_off,
1084            auto_share_loc_ch=rs.auto_share_loc_ch,
1085            hm_speaker=rs.hm_speaker,
1086            positioning_system=rs.positioning_system,
1087            time_offset=rs.time_offset,
1088            use_freq_range_2=rs.use_freq_range_2,
1089            ptt_lock=rs.ptt_lock,
1090            leading_sync_bit_en=rs.leading_sync_bit_en,
1091            pairing_at_power_on=rs.pairing_at_power_on,
1092            screen_timeout=rs.screen_timeout,
1093            vfo_x=rs.vfo_x,
1094            imperial_unit=rs.imperial_unit,
1095            wx_mode=rs.wx_mode,
1096            noaa_ch=rs.noaa_ch,
1097            vfol_tx_power_x=rs.vfol_tx_power_x,
1098            vfo2_tx_power_x=rs.vfo2_tx_power_x,
1099            dis_digital_mute=rs.dis_digital_mute,
1100            signaling_ecc_en=rs.signaling_ecc_en,
1101            ch_data_lock=rs.ch_data_lock,
1102            vfo1_mod_freq_x=rs.vfo1_mod_freq_x,
1103            vfo2_mod_freq_x=rs.vfo2_mod_freq_x
1104        )
1105
1106    def to_protocol(self):
1107        """@private (Protocol helper)"""
1108        return p.Settings(
1109            channel_a_lower=self._channel_split.get_lower(self.channel_a),
1110            channel_b_lower=self._channel_split.get_lower(self.channel_b),
1111            scan=self.scan,
1112            aghfp_call_mode=self.aghfp_call_mode,
1113            double_channel=self.double_channel,
1114            squelch_level=self.squelch_level,
1115            tail_elim=self.tail_elim,
1116            auto_relay_en=self.auto_relay_en,
1117            auto_power_on=self.auto_power_on,
1118            keep_aghfp_link=self.keep_aghfp_link,
1119            mic_gain=self.mic_gain,
1120            tx_hold_time=self.tx_hold_time,
1121            tx_time_limit=self.tx_time_limit,
1122            local_speaker=self.local_speaker,
1123            bt_mic_gain=self.bt_mic_gain,
1124            adaptive_response=self.adaptive_response,
1125            dis_tone=self.dis_tone,
1126            power_saving_mode=self.power_saving_mode,
1127            auto_power_off=self.auto_power_off,
1128            auto_share_loc_ch=self.auto_share_loc_ch,
1129            hm_speaker=self.hm_speaker,
1130            positioning_system=self.positioning_system,
1131            time_offset=self.time_offset,
1132            use_freq_range_2=self.use_freq_range_2,
1133            ptt_lock=self.ptt_lock,
1134            leading_sync_bit_en=self.leading_sync_bit_en,
1135            pairing_at_power_on=self.pairing_at_power_on,
1136            screen_timeout=self.screen_timeout,
1137            vfo_x=self.vfo_x,
1138            imperial_unit=self.imperial_unit,
1139            channel_a_upper=self._channel_split.get_upper(self.channel_a),
1140            channel_b_upper=self._channel_split.get_upper(self.channel_b),
1141            wx_mode=self.wx_mode,
1142            noaa_ch=self.noaa_ch,
1143            vfol_tx_power_x=self.vfol_tx_power_x,
1144            vfo2_tx_power_x=self.vfo2_tx_power_x,
1145            dis_digital_mute=self.dis_digital_mute,
1146            signaling_ecc_en=self.signaling_ecc_en,
1147            ch_data_lock=self.ch_data_lock,
1148            vfo1_mod_freq_x=self.vfo1_mod_freq_x,
1149            vfo2_mod_freq_x=self.vfo2_mod_freq_x
1150        )

A data object representing the radio settings

channel_a: int
channel_b: int
scan: bool
aghfp_call_mode: int
double_channel: int
squelch_level: int
tail_elim: bool
auto_relay_en: bool
auto_power_on: bool
mic_gain: int
tx_hold_time: int
tx_time_limit: int
local_speaker: int
bt_mic_gain: int
adaptive_response: bool
dis_tone: bool
power_saving_mode: bool
auto_power_off: int
auto_share_loc_ch: Union[int, Literal['current']]
hm_speaker: int
positioning_system: int
time_offset: int
use_freq_range_2: bool
ptt_lock: bool
leading_sync_bit_en: bool
pairing_at_power_on: bool
screen_timeout: int
vfo_x: int
imperial_unit: bool
wx_mode: int
noaa_ch: int
vfol_tx_power_x: int
vfo2_tx_power_x: int
dis_digital_mute: bool
signaling_ecc_en: bool
ch_data_lock: bool
vfo1_mod_freq_x: int
vfo2_mod_freq_x: int
class DeviceInfo(ImmutableBaseModel):
1153class DeviceInfo(ImmutableBaseModel):
1154    """A data object representing the device information"""
1155    vendor_id: int
1156    product_id: int
1157    hardware_version: int
1158    firmware_version: int
1159    supports_radio: bool
1160    supports_medium_power: bool
1161    fixed_location_speaker_volume: bool
1162    has_speaker: bool
1163    has_hand_microphone_speaker: bool
1164    region_count: int
1165    supports_noaa: bool
1166    supports_gmrs: bool
1167    supports_vfo: bool
1168    supports_dmr: bool
1169    supports_software_power_control: bool
1170    channel_count: int
1171    frequency_range_count: int
1172
1173    @classmethod
1174    def from_protocol(cls, info: p.DevInfo) -> DeviceInfo:
1175        """@private (Protocol helper)"""
1176        return DeviceInfo(
1177            vendor_id=info.vendor_id,
1178            product_id=info.product_id,
1179            hardware_version=info.hw_ver,
1180            firmware_version=info.soft_ver,
1181            supports_radio=info.support_radio,
1182            supports_medium_power=info.support_medium_power,
1183            fixed_location_speaker_volume=info.fixed_loc_speaker_vol,
1184            supports_software_power_control=not info.not_support_soft_power_ctrl,
1185            has_speaker=not info.have_no_speaker,
1186            has_hand_microphone_speaker=info.have_hm_speaker,
1187            region_count=info.region_count,
1188            supports_noaa=info.support_noaa,
1189            supports_gmrs=info.gmrs,
1190            supports_vfo=info.support_vfo,
1191            supports_dmr=info.support_dmr,
1192            channel_count=info.channel_count,
1193            frequency_range_count=info.freq_range_count
1194        )
1195
1196    def to_protocol(self) -> p.DevInfo:
1197        """@private (Protocol helper)"""
1198        return p.DevInfo(
1199            vendor_id=self.vendor_id,
1200            product_id=self.product_id,
1201            hw_ver=self.hardware_version,
1202            soft_ver=self.firmware_version,
1203            support_radio=self.supports_radio,
1204            support_medium_power=self.supports_medium_power,
1205            fixed_loc_speaker_vol=self.fixed_location_speaker_volume,
1206            not_support_soft_power_ctrl=not self.supports_software_power_control,
1207            have_no_speaker=not self.has_speaker,
1208            have_hm_speaker=self.has_hand_microphone_speaker,
1209            region_count=self.region_count,
1210            support_noaa=self.supports_noaa,
1211            gmrs=self.supports_gmrs,
1212            support_vfo=self.supports_vfo,
1213            support_dmr=self.supports_dmr,
1214            channel_count=self.channel_count,
1215            freq_range_count=self.frequency_range_count
1216        )

A data object representing the device information

vendor_id: int
product_id: int
hardware_version: int
firmware_version: int
supports_radio: bool
supports_medium_power: bool
fixed_location_speaker_volume: bool
has_speaker: bool
has_hand_microphone_speaker: bool
region_count: int
supports_noaa: bool
supports_gmrs: bool
supports_vfo: bool
supports_dmr: bool
supports_software_power_control: bool
channel_count: int
frequency_range_count: int
class BeaconSettingsArgs(typing.TypedDict):
1219class BeaconSettingsArgs(t.TypedDict, total=False):
1220    """A dictionary of the parameters that can be set in the beacon settings"""
1221    max_fwd_times: int
1222    time_to_live: int
1223    ptt_release_send_location: bool
1224    ptt_release_send_id_info: bool
1225    ptt_release_send_bss_user_id: bool
1226    should_share_location: bool
1227    send_pwr_voltage: bool
1228    packet_format: t.Literal["BSS", "APRS"]
1229    allow_position_check: bool
1230    aprs_ssid: int
1231    location_share_interval: int
1232    bss_user_id: int
1233    ptt_release_id_info: str
1234    beacon_message: str
1235    aprs_symbol: str
1236    aprs_callsign: str

A dictionary of the parameters that can be set in the beacon settings

max_fwd_times: int
time_to_live: int
ptt_release_send_location: bool
ptt_release_send_id_info: bool
ptt_release_send_bss_user_id: bool
should_share_location: bool
send_pwr_voltage: bool
packet_format: Literal['BSS', 'APRS']
allow_position_check: bool
aprs_ssid: int
location_share_interval: int
bss_user_id: int
ptt_release_id_info: str
beacon_message: str
aprs_symbol: str
aprs_callsign: str
class BeaconSettings(ImmutableBaseModel):
1239class BeaconSettings(ImmutableBaseModel):
1240    """A data object representing the beacon settings"""
1241    _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32)
1242    max_fwd_times: int
1243    time_to_live: int
1244    ptt_release_send_location: bool
1245    ptt_release_send_id_info: bool
1246    ptt_release_send_bss_user_id: bool
1247    should_share_location: bool
1248    send_pwr_voltage: bool
1249    packet_format: t.Literal["BSS", "APRS"]
1250    allow_position_check: bool
1251    aprs_ssid: int
1252    location_share_interval: int
1253    bss_user_id: int
1254    ptt_release_id_info: str
1255    beacon_message: str
1256    aprs_symbol: str
1257    aprs_callsign: str
1258
1259    @classmethod
1260    def from_protocol(cls, bs: p.BSSSettingsExt | p.BSSSettings) -> BeaconSettings:
1261        """@private (Protocol helper)"""
1262
1263        if not isinstance(bs, p.BSSSettingsExt):
1264            raise ValueError(
1265                "Radio replied with old BSSSettings message version. Upgrade your firmware!"
1266            )
1267
1268        return BeaconSettings(
1269            max_fwd_times=bs.max_fwd_times,
1270            time_to_live=bs.time_to_live,
1271            ptt_release_send_location=bs.ptt_release_send_location,
1272            ptt_release_send_id_info=bs.ptt_release_send_id_info,
1273            ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id,
1274            should_share_location=bs.should_share_location,
1275            send_pwr_voltage=bs.send_pwr_voltage,
1276            packet_format=bs.packet_format.name,
1277            allow_position_check=bs.allow_position_check,
1278            aprs_ssid=bs.aprs_ssid,
1279            location_share_interval=bs.location_share_interval,
1280            bss_user_id=cls._bss_user_id_split.from_parts(
1281                bs.bss_user_id_upper, bs.bss_user_id_lower
1282            ),
1283            ptt_release_id_info=bs.ptt_release_id_info,
1284            beacon_message=bs.beacon_message,
1285            aprs_symbol=bs.aprs_symbol,
1286            aprs_callsign=bs.aprs_callsign
1287        )
1288
1289    def to_protocol(self) -> p.BSSSettingsExt:
1290        """@private (Protocol helper)"""
1291        return p.BSSSettingsExt(
1292            max_fwd_times=self.max_fwd_times,
1293            time_to_live=self.time_to_live,
1294            ptt_release_send_location=self.ptt_release_send_location,
1295            ptt_release_send_id_info=self.ptt_release_send_id_info,
1296            ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id,
1297            should_share_location=self.should_share_location,
1298            send_pwr_voltage=self.send_pwr_voltage,
1299            packet_format=p.PacketFormat[self.packet_format],
1300            allow_position_check=self.allow_position_check,
1301            aprs_ssid=self.aprs_ssid,
1302            location_share_interval=self.location_share_interval,
1303            bss_user_id_lower=self._bss_user_id_split.get_lower(
1304                self.bss_user_id
1305            ),
1306            ptt_release_id_info=self.ptt_release_id_info,
1307            beacon_message=self.beacon_message,
1308            aprs_symbol=self.aprs_symbol,
1309            aprs_callsign=self.aprs_callsign,
1310            bss_user_id_upper=self._bss_user_id_split.get_upper(
1311                self.bss_user_id
1312            ),
1313        )

A data object representing the beacon settings

max_fwd_times: int
time_to_live: int
ptt_release_send_location: bool
ptt_release_send_id_info: bool
ptt_release_send_bss_user_id: bool
should_share_location: bool
send_pwr_voltage: bool
packet_format: Literal['BSS', 'APRS']
allow_position_check: bool
aprs_ssid: int
location_share_interval: int
bss_user_id: int
ptt_release_id_info: str
beacon_message: str
aprs_symbol: str
aprs_callsign: str
ChannelType = typing.Literal['OFF', 'A', 'B']
class Status(ImmutableBaseModel):
1319class Status(ImmutableBaseModel):
1320    """A data object representing the radio status"""
1321    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1322    is_power_on: bool
1323    is_in_tx: bool
1324    is_sq: bool
1325    is_in_rx: bool
1326    double_channel: ChannelType
1327    is_scan: bool
1328    is_radio: bool
1329    curr_ch_id: int
1330    is_gps_locked: bool
1331    is_hfp_connected: bool
1332    is_aoc_connected: bool
1333    rssi: float
1334    curr_region: int
1335
1336    @classmethod
1337    def from_protocol(cls, s: p.Status | p.StatusExt) -> Status:
1338        """@private (Protocol helper)"""
1339        if not isinstance(s, p.StatusExt):
1340            raise ValueError(
1341                "Radio replied with old Status message version. Upgrade your firmware!"
1342            )
1343
1344        return Status(
1345            is_power_on=s.is_power_on,
1346            is_in_tx=s.is_in_tx,
1347            is_sq=s.is_sq,
1348            is_in_rx=s.is_in_rx,
1349            double_channel=s.double_channel.name,
1350            is_scan=s.is_scan,
1351            is_radio=s.is_radio,
1352            curr_ch_id=cls._channel_split.from_parts(
1353                s.curr_channel_id_upper, s.curr_ch_id_lower
1354            ),
1355            is_gps_locked=s.is_gps_locked,
1356            is_hfp_connected=s.is_hfp_connected,
1357            is_aoc_connected=s.is_aoc_connected,
1358            rssi=s.rssi,
1359            curr_region=s.curr_region
1360        )
1361
1362    def to_protocol(self) -> p.StatusExt:
1363        """@private (Protocol helper)"""
1364        return p.StatusExt(
1365            is_power_on=self.is_power_on,
1366            is_in_tx=self.is_in_tx,
1367            is_sq=self.is_sq,
1368            is_in_rx=self.is_in_rx,
1369            double_channel=p.ChannelType[self.double_channel],
1370            is_scan=self.is_scan,
1371            is_radio=self.is_radio,
1372            curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id),
1373            is_gps_locked=self.is_gps_locked,
1374            is_hfp_connected=self.is_hfp_connected,
1375            is_aoc_connected=self.is_aoc_connected,
1376            rssi=self.rssi,
1377            curr_region=self.curr_region,
1378            curr_channel_id_upper=self._channel_split.get_upper(
1379                self.curr_ch_id)
1380        )

A data object representing the radio status

is_power_on: bool
is_in_tx: bool
is_sq: bool
is_in_rx: bool
double_channel: Literal['OFF', 'A', 'B']
is_scan: bool
is_radio: bool
curr_ch_id: int
is_gps_locked: bool
is_hfp_connected: bool
is_aoc_connected: bool
rssi: float
curr_region: int
class Position(ImmutableBaseModel):
1383class Position(ImmutableBaseModel):
1384    """A data object for representing GPS positions"""
1385    latitude: float
1386    longitude: float
1387    altitude: int | None
1388    speed: int | None
1389    heading: int | None
1390    time: datetime
1391    accuracy: int
1392
1393    @classmethod
1394    def from_protocol(cls, x: p.Position) -> Position:
1395        """@private (Protocol helper)"""
1396        return Position(
1397            latitude=x.latitude,
1398            longitude=x.longitude,
1399            altitude=x.altitude,
1400            speed=x.speed,
1401            heading=x.heading,
1402            time=x.time,
1403            accuracy=x.accuracy,
1404        )
1405
1406    def to_protocol(self) -> p.Position:
1407        """@private (Protocol helper)"""
1408        return p.Position(
1409            latitude=self.latitude,
1410            longitude=self.longitude,
1411            altitude=self.altitude,
1412            speed=self.speed,
1413            heading=self.heading,
1414            time=self.time,
1415            accuracy=self.accuracy,
1416        )

A data object for representing GPS positions

latitude: float
longitude: float
altitude: int | None
speed: int | None
heading: int | None
time: datetime.datetime
accuracy: int