Edit on GitHub

benlink.command

Overview

This module defines a low-level interface for communicating with a radio over BLE or Rfcomm.

Unless you know what you are doing, you probably want to use the benlink.controller module instead.

Messages

The RadioMessage type is a union of all the possible messages that can sent or received from the radio.

It RadioMessages are divided into three categories:

  1. CommandMessage: Messages that are sent to the radio to request information or to change settings.

  2. ReplyMessage: Messages that are received in response to a CommandMessage.

  3. EventMessage: Messages that are received from the radio to indicate that an event has occurred. (e.g. a channel has changed, a packet has been received)

Data

The data objects (e.g. DeviceInfo, Settings) are used to represent the data that is sent or received in the messages. Some of these data objects have accompanying Args types that are used in the API to allow for functions that take keyword arguments to set these parameters.

   1"""
   2# Overview
   3
   4This module defines a low-level interface for communicating
   5with a radio over BLE or Rfcomm.
   6
   7Unless you know what you are doing, you probably want to use the
   8`benlink.controller` module instead.
   9
  10# Messages
  11
  12The `RadioMessage` type is a union of all the possible messages that can
  13sent or received from the radio.
  14
  15It `RadioMessage`s are divided into three categories:
  16
  171. `CommandMessage`: Messages that are sent to the radio to request
  18    information or to change settings.
  19
  202. `ReplyMessage`: Messages that are received in response to a
  21    `CommandMessage`.
  22
  233. `EventMessage`: Messages that are received from the radio to indicate
  24    that an event has occurred. (e.g. a channel has changed, a packet has
  25    been received)
  26
  27# Data
  28
  29The data objects (e.g. `DeviceInfo`, `Settings`) are used to represent
  30the data that is sent or received in the messages. Some of these data
  31objects have accompanying `Args` types that are used in the API to allow
  32for functions that take keyword arguments to set these parameters.
  33"""
  34
  35from __future__ import annotations
  36import typing as t
  37import asyncio
  38from pydantic import BaseModel, ConfigDict
  39from . import protocol as p
  40from .link import CommandLink, BleCommandLink, RfcommCommandLink
  41from datetime import datetime
  42
  43RADIO_SERVICE_UUID = "00001100-d102-11e1-9b23-00025b00a5a5"
  44"""@private"""
  45
  46RADIO_WRITE_UUID = "00001101-d102-11e1-9b23-00025b00a5a5"
  47"""@private"""
  48
  49RADIO_INDICATE_UUID = "00001102-d102-11e1-9b23-00025b00a5a5"
  50"""@private"""
  51
  52
  53class CommandConnection:
  54    _link: CommandLink
  55    _handlers: t.List[RadioMessageHandler] = []
  56
  57    def __init__(self, link: CommandLink):
  58        self._link = link
  59        self._handlers = []
  60
  61    @classmethod
  62    def new_ble(cls, device_uuid: str) -> CommandConnection:
  63        return cls(BleCommandLink(device_uuid))
  64
  65    @classmethod
  66    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection:
  67        return cls(RfcommCommandLink(device_uuid, channel))
  68
  69    def is_connected(self) -> bool:
  70        return self._link.is_connected()
  71
  72    async def connect(self) -> None:
  73        await self._link.connect(self._on_recv)
  74
  75    async def disconnect(self) -> None:
  76        self._handlers.clear()
  77        await self._link.disconnect()
  78
  79    async def send_bytes(self, data: bytes) -> None:
  80        await self._link.send_bytes(data)
  81
  82    async def send_message(self, command: CommandMessage) -> None:
  83        await self._link.send(command_message_to_protocol(command))
  84
  85    async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError:
  86        queue: asyncio.Queue[RadioMessageT |
  87                             MessageReplyError] = asyncio.Queue()
  88
  89        def reply_handler(reply: RadioMessage):
  90            if (
  91                isinstance(reply, expect) or
  92                (
  93                    isinstance(reply, MessageReplyError) and
  94                    reply.message_type is expect
  95                )
  96            ):
  97                queue.put_nowait(reply)
  98
  99        remove_handler = self._add_message_handler(reply_handler)
 100
 101        await self.send_message(command)
 102
 103        out = await queue.get()
 104
 105        remove_handler()
 106
 107        return out
 108
 109    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
 110        def event_handler(msg: RadioMessage):
 111            if isinstance(msg, EventMessage):
 112                handler(msg)
 113        return self._add_message_handler(event_handler)
 114
 115    def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]:
 116        self._handlers.append(handler)
 117
 118        def remove_handler():
 119            self._handlers.remove(handler)
 120
 121        return remove_handler
 122
 123    def _on_recv(self, msg: p.Message) -> None:
 124        radio_message = radio_message_from_protocol(msg)
 125        for handler in self._handlers:
 126            handler(radio_message)
 127
 128    # Command API
 129
 130    async def enable_event(self, event_type: EventType) -> None:
 131        """Enable an event"""
 132        # This event doesn't get a reply version of EnableEvent. It does, however
 133        # does trigger a StatusChangedEvent... but we don't wait for it here.
 134        # Instead, we just fire and forget
 135        await self.send_message(EnableEvent(event_type))
 136
 137    async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
 138        """Send Tnc data"""
 139        reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply)
 140        if isinstance(reply, MessageReplyError):
 141            raise reply.as_exception()
 142
 143    async def get_beacon_settings(self) -> BeaconSettings:
 144        """Get the current packet settings"""
 145        reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply)
 146        if isinstance(reply, MessageReplyError):
 147            raise reply.as_exception()
 148        return reply.tnc_settings
 149
 150    async def set_beacon_settings(self, packet_settings: BeaconSettings):
 151        """Set the packet settings"""
 152        reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply)
 153        if isinstance(reply, MessageReplyError):
 154            raise reply.as_exception()
 155
 156    async def get_battery_level(self) -> int:
 157        """Get the battery level"""
 158        reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply)
 159        if isinstance(reply, MessageReplyError):
 160            raise reply.as_exception()
 161        return reply.battery_level
 162
 163    async def get_battery_level_as_percentage(self) -> int:
 164        """Get the battery level as a percentage"""
 165        reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply)
 166        if isinstance(reply, MessageReplyError):
 167            raise reply.as_exception()
 168        return reply.battery_level_as_percentage
 169
 170    async def get_rc_battery_level(self) -> int:
 171        """Get the RC battery level"""
 172        reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply)
 173        if isinstance(reply, MessageReplyError):
 174            raise reply.as_exception()
 175        return reply.rc_battery_level
 176
 177    async def get_battery_voltage(self) -> float:
 178        """Get the battery voltage"""
 179        reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply)
 180        if isinstance(reply, MessageReplyError):
 181            raise reply.as_exception()
 182        return reply.battery_voltage
 183
 184    async def get_device_info(self) -> DeviceInfo:
 185        """Get the device info"""
 186        reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply)
 187        if isinstance(reply, MessageReplyError):
 188            raise reply.as_exception()
 189        return reply.device_info
 190
 191    async def get_settings(self) -> Settings:
 192        """Get the settings"""
 193        reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply)
 194        if isinstance(reply, MessageReplyError):
 195            raise reply.as_exception()
 196        return reply.settings
 197
 198    async def set_settings(self, settings: Settings) -> None:
 199        """Set the settings"""
 200        reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply)
 201        if isinstance(reply, MessageReplyError):
 202            raise reply.as_exception()
 203
 204    async def get_channel(self, channel_id: int) -> Channel:
 205        """Get a channel"""
 206        reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply)
 207        if isinstance(reply, MessageReplyError):
 208            raise reply.as_exception()
 209        return reply.channel
 210
 211    async def set_channel(self, channel: Channel):
 212        """Set a channel"""
 213        reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply)
 214        if isinstance(reply, MessageReplyError):
 215            raise reply.as_exception()
 216
 217    async def get_status(self) -> Status:
 218        """Get the radio status"""
 219        reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply)
 220        if isinstance(reply, MessageReplyError):
 221            raise reply.as_exception()
 222        return reply.status
 223
 224    async def get_position(self) -> Position:
 225        """Get the radio position"""
 226        reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply)
 227        if isinstance(reply, MessageReplyError):
 228            raise reply.as_exception()
 229        return reply.position
 230
 231    async def __aenter__(self):
 232        await self.connect()
 233        return self
 234
 235    async def __aexit__(
 236        self,
 237        exc_type: t.Any,
 238        exc_value: t.Any,
 239        traceback: t.Any,
 240    ) -> None:
 241        await self.disconnect()
 242
 243
 244class ImmutableBaseModel(BaseModel):
 245    """@private (A base class for immutable data objects)"""
 246
 247    model_config = ConfigDict(frozen=True)
 248    """@private"""
 249
 250
 251def command_message_to_protocol(m: CommandMessage) -> p.Message:
 252    """@private (Protocol helper)"""
 253    match m:
 254        case EnableEvent(event_type):
 255            return p.Message(
 256                command_group=p.CommandGroup.BASIC,
 257                is_reply=False,
 258                command=p.BasicCommand.REGISTER_NOTIFICATION,
 259                body=p.RegisterNotificationBody(
 260                    event_type=p.EventType[event_type],
 261                )
 262            )
 263        case SendTncDataFragment(tnc_data_fragment):
 264            return p.Message(
 265                command_group=p.CommandGroup.BASIC,
 266                is_reply=False,
 267                command=p.BasicCommand.HT_SEND_DATA,
 268                body=p.HTSendDataBody(
 269                    tnc_data_fragment=tnc_data_fragment.to_protocol()
 270                )
 271            )
 272        case GetBeaconSettings():
 273            return p.Message(
 274                command_group=p.CommandGroup.BASIC,
 275                is_reply=False,
 276                command=p.BasicCommand.READ_BSS_SETTINGS,
 277                body=p.ReadBSSSettingsBody()
 278            )
 279        case SetBeaconSettings(packet_settings):
 280            return p.Message(
 281                command_group=p.CommandGroup.BASIC,
 282                is_reply=False,
 283                command=p.BasicCommand.WRITE_BSS_SETTINGS,
 284                body=p.WriteBSSSettingsBody(
 285                    bss_settings=packet_settings.to_protocol()
 286                )
 287            )
 288        case GetSettings():
 289            return p.Message(
 290                command_group=p.CommandGroup.BASIC,
 291                is_reply=False,
 292                command=p.BasicCommand.READ_SETTINGS,
 293                body=p.ReadSettingsBody()
 294            )
 295        case SetSettings(settings):
 296            return p.Message(
 297                command_group=p.CommandGroup.BASIC,
 298                is_reply=False,
 299                command=p.BasicCommand.WRITE_SETTINGS,
 300                body=p.WriteSettingsBody(
 301                    settings=settings.to_protocol()
 302                )
 303            )
 304        case GetDeviceInfo():
 305            return p.Message(
 306                command_group=p.CommandGroup.BASIC,
 307                is_reply=False,
 308                command=p.BasicCommand.GET_DEV_INFO,
 309                body=p.GetDevInfoBody()
 310            )
 311        case GetChannel(channel_id):
 312            return p.Message(
 313                command_group=p.CommandGroup.BASIC,
 314                is_reply=False,
 315                command=p.BasicCommand.READ_RF_CH,
 316                body=p.ReadRFChBody(channel_id=channel_id)
 317            )
 318        case SetChannel(channel):
 319            return p.Message(
 320                command_group=p.CommandGroup.BASIC,
 321                is_reply=False,
 322                command=p.BasicCommand.WRITE_RF_CH,
 323                body=p.WriteRFChBody(
 324                    rf_ch=channel.to_protocol()
 325                )
 326            )
 327        case GetBatteryVoltage():
 328            return p.Message(
 329                command_group=p.CommandGroup.BASIC,
 330                is_reply=False,
 331                command=p.BasicCommand.READ_STATUS,
 332                body=p.ReadPowerStatusBody(
 333                    status_type=p.PowerStatusType.BATTERY_VOLTAGE
 334                )
 335            )
 336        case GetBatteryLevel():
 337            return p.Message(
 338                command_group=p.CommandGroup.BASIC,
 339                is_reply=False,
 340                command=p.BasicCommand.READ_STATUS,
 341                body=p.ReadPowerStatusBody(
 342                    status_type=p.PowerStatusType.BATTERY_LEVEL
 343                )
 344            )
 345        case GetBatteryLevelAsPercentage():
 346            return p.Message(
 347                command_group=p.CommandGroup.BASIC,
 348                is_reply=False,
 349                command=p.BasicCommand.READ_STATUS,
 350                body=p.ReadPowerStatusBody(
 351                    status_type=p.PowerStatusType.BATTERY_LEVEL_AS_PERCENTAGE
 352                )
 353            )
 354        case GetRCBatteryLevel():
 355            return p.Message(
 356                command_group=p.CommandGroup.BASIC,
 357                is_reply=False,
 358                command=p.BasicCommand.READ_STATUS,
 359                body=p.ReadPowerStatusBody(
 360                    status_type=p.PowerStatusType.RC_BATTERY_LEVEL
 361                )
 362            )
 363        case GetStatus():
 364            return p.Message(
 365                command_group=p.CommandGroup.BASIC,
 366                is_reply=False,
 367                command=p.BasicCommand.GET_HT_STATUS,
 368                body=p.GetHtStatusBody()
 369            )
 370        case GetPosition():
 371            return p.Message(
 372                command_group=p.CommandGroup.BASIC,
 373                is_reply=False,
 374                command=p.BasicCommand.GET_POSITION,
 375                body=p.GetPositionBody()
 376            )
 377
 378
 379def radio_message_from_protocol(mf: p.Message) -> RadioMessage:
 380    """@private (Protocol helper)"""
 381    match mf.body:
 382        case p.GetPositionReplyBody(reply_status=reply_status, position=position):
 383            if position is None:
 384                return MessageReplyError(
 385                    message_type=GetPositionReply,
 386                    reason=reply_status.name
 387                )
 388            return GetPositionReply(Position.from_protocol(position))
 389        case p.GetHtStatusReplyBody(reply_status=reply_status, status=status):
 390            if status is None:
 391                return MessageReplyError(
 392                    message_type=GetStatusReply,
 393                    reason=reply_status.name,
 394                )
 395            return GetStatusReply(Status.from_protocol(status))
 396        case p.HTSendDataReplyBody(
 397            reply_status=reply_status
 398        ):
 399            if reply_status != p.ReplyStatus.SUCCESS:
 400                return MessageReplyError(
 401                    message_type=SendTncDataFragmentReply,
 402                    reason=reply_status.name,
 403                )
 404            return SendTncDataFragmentReply()
 405        case p.ReadBSSSettingsReplyBody(
 406            reply_status=reply_status,
 407            bss_settings=bss_settings,
 408        ):
 409            if bss_settings is None:
 410                return MessageReplyError(
 411                    message_type=GetBeaconSettingsReply,
 412                    reason=reply_status.name,
 413                )
 414
 415            return GetBeaconSettingsReply(BeaconSettings.from_protocol(bss_settings))
 416        case p.WriteBSSSettingsReplyBody(
 417            reply_status=reply_status,
 418        ):
 419            if reply_status != p.ReplyStatus.SUCCESS:
 420                return MessageReplyError(
 421                    message_type=SetBeaconSettingsReply,
 422                    reason=reply_status.name,
 423                )
 424            return SetBeaconSettingsReply()
 425        case p.ReadPowerStatusReplyBody(
 426            reply_status=reply_status,
 427            status=power_status
 428        ):
 429            if power_status is None:
 430                return MessageReplyError(
 431                    message_type=GetBatteryVoltageReply,
 432                    reason=reply_status.name,
 433                )
 434
 435            match power_status.value:
 436                case p.BatteryVoltageStatus(
 437                    battery_voltage=battery_voltage
 438                ):
 439                    return GetBatteryVoltageReply(
 440                        battery_voltage=battery_voltage
 441                    )
 442                case p.BatteryLevelPercentageStatus(
 443                    battery_level_as_percentage=battery_level_as_percentage
 444                ):
 445                    return GetBatteryLevelAsPercentageReply(
 446                        battery_level_as_percentage=battery_level_as_percentage
 447                    )
 448                case p.BatteryLevelStatus(
 449                    battery_level=battery_level
 450                ):
 451                    return GetBatteryLevelReply(
 452                        battery_level=battery_level
 453                    )
 454                case p.RCBatteryLevelStatus(
 455                    rc_battery_level=rc_battery_level
 456                ):
 457                    return GetRCBatteryLevelReply(
 458                        rc_battery_level=rc_battery_level
 459                    )
 460        case p.EventNotificationBody(
 461            event=event
 462        ):
 463            match event:
 464                case p.HTSettingsChangedEvent(
 465                    settings=settings
 466                ):
 467                    return SettingsChangedEvent(Settings.from_protocol(settings))
 468                case p.DataRxdEvent(
 469                    tnc_data_fragment=tnc_data_fragment
 470                ):
 471                    return TncDataFragmentReceivedEvent(
 472                        tnc_data_fragment=TncDataFragment.from_protocol(
 473                            tnc_data_fragment
 474                        )
 475                    )
 476                case p.HTChChangedEvent(
 477                    rf_ch=rf_ch
 478                ):
 479                    return ChannelChangedEvent(Channel.from_protocol(rf_ch))
 480                case p.HTStatusChangedEvent(
 481                    status=status
 482                ):
 483                    return StatusChangedEvent(Status.from_protocol(status))
 484                case _:
 485                    return UnknownProtocolMessage(mf)
 486        case p.ReadSettingsReplyBody(
 487            reply_status=reply_status,
 488            settings=settings
 489        ):
 490            if settings is None:
 491                return MessageReplyError(
 492                    message_type=GetSettingsReply,
 493                    reason=reply_status.name,
 494                )
 495            return GetSettingsReply(Settings.from_protocol(settings))
 496        case p.WriteSettingsReplyBody(
 497            reply_status=reply_status,
 498        ):
 499            if reply_status != p.ReplyStatus.SUCCESS:
 500                return MessageReplyError(
 501                    message_type=SetSettingsReply,
 502                    reason=reply_status.name,
 503                )
 504            return SetSettingsReply()
 505        case p.GetDevInfoReplyBody(
 506            reply_status=reply_status,
 507            dev_info=dev_info
 508        ):
 509            if dev_info is None:
 510                return MessageReplyError(
 511                    message_type=GetDeviceInfoReply,
 512                    reason=reply_status.name,
 513                )
 514            return GetDeviceInfoReply(DeviceInfo.from_protocol(dev_info))
 515        case p.ReadRFChReplyBody(
 516            reply_status=reply_status,
 517            rf_ch=rf_ch
 518        ):
 519            if rf_ch is None:
 520                return MessageReplyError(
 521                    message_type=GetChannelReply,
 522                    reason=reply_status.name,
 523                )
 524            return GetChannelReply(Channel.from_protocol(rf_ch))
 525        case p.WriteRFChReplyBody(
 526            reply_status=reply_status,
 527        ):
 528            if reply_status != p.ReplyStatus.SUCCESS:
 529                return MessageReplyError(
 530                    message_type=SetChannelReply,
 531                    reason=reply_status.name,
 532                )
 533            return SetChannelReply()
 534
 535        case _:
 536            return UnknownProtocolMessage(mf)
 537
 538
 539#####################
 540# CommandMessage
 541
 542class EnableEvent(t.NamedTuple):
 543    event_type: EventType
 544
 545
 546class GetBeaconSettings(t.NamedTuple):
 547    pass
 548
 549
 550class SetBeaconSettings(t.NamedTuple):
 551    tnc_settings: BeaconSettings
 552
 553
 554class SetSettings(t.NamedTuple):
 555    settings: Settings
 556
 557
 558class GetBatteryLevelAsPercentage(t.NamedTuple):
 559    pass
 560
 561
 562class GetRCBatteryLevel(t.NamedTuple):
 563    pass
 564
 565
 566class GetBatteryLevel(t.NamedTuple):
 567    pass
 568
 569
 570class GetBatteryVoltage(t.NamedTuple):
 571    pass
 572
 573
 574class GetDeviceInfo(t.NamedTuple):
 575    pass
 576
 577
 578class GetChannel(t.NamedTuple):
 579    channel_id: int
 580
 581
 582class SetChannel(t.NamedTuple):
 583    channel: Channel
 584
 585
 586class GetSettings(t.NamedTuple):
 587    pass
 588
 589
 590class GetStatus(t.NamedTuple):
 591    pass
 592
 593
 594class GetPosition(t.NamedTuple):
 595    pass
 596
 597
 598class SendTncDataFragment(t.NamedTuple):
 599    tnc_data_fragment: TncDataFragment
 600
 601
 602CommandMessage = t.Union[
 603    GetBeaconSettings,
 604    SetBeaconSettings,
 605    GetRCBatteryLevel,
 606    GetBatteryLevelAsPercentage,
 607    GetBatteryLevel,
 608    GetBatteryVoltage,
 609    GetDeviceInfo,
 610    GetChannel,
 611    SetChannel,
 612    GetSettings,
 613    SetSettings,
 614    SendTncDataFragment,
 615    EnableEvent,
 616    GetStatus,
 617    GetPosition,
 618]
 619
 620#####################
 621# ReplyMessage
 622
 623
 624class SendTncDataFragmentReply(t.NamedTuple):
 625    pass
 626
 627
 628class GetBeaconSettingsReply(t.NamedTuple):
 629    tnc_settings: BeaconSettings
 630
 631
 632class SetBeaconSettingsReply(t.NamedTuple):
 633    pass
 634
 635
 636class SetSettingsReply(t.NamedTuple):
 637    pass
 638
 639
 640class GetBatteryLevelAsPercentageReply(t.NamedTuple):
 641    battery_level_as_percentage: int
 642
 643
 644class GetRCBatteryLevelReply(t.NamedTuple):
 645    rc_battery_level: int
 646
 647
 648class GetBatteryLevelReply(t.NamedTuple):
 649    battery_level: int
 650
 651
 652class GetBatteryVoltageReply(t.NamedTuple):
 653    battery_voltage: float
 654
 655
 656class GetDeviceInfoReply(t.NamedTuple):
 657    device_info: DeviceInfo
 658
 659
 660class GetChannelReply(t.NamedTuple):
 661    channel: Channel
 662
 663
 664class SetChannelReply(t.NamedTuple):
 665    pass
 666
 667
 668class GetStatusReply(t.NamedTuple):
 669    status: Status
 670
 671
 672class GetSettingsReply(t.NamedTuple):
 673    settings: Settings
 674
 675
 676class GetPositionReply(t.NamedTuple):
 677    position: Position
 678
 679
 680ReplyStatus = t.Literal[
 681    "SUCCESS",
 682    "NOT_SUPPORTED",
 683    "NOT_AUTHENTICATED",
 684    "INSUFFICIENT_RESOURCES",
 685    "AUTHENTICATING",
 686    "INVALID_PARAMETER",
 687    "INCORRECT_STATE",
 688    "IN_PROGRESS",
 689]
 690
 691
 692class MessageReplyError(t.NamedTuple):
 693    message_type: t.Type[t.Any]
 694    reason: ReplyStatus
 695
 696    def as_exception(self):
 697        return ValueError(f"{self.message_type.__name__} failed: {self.reason}")
 698
 699
 700ReplyMessage = t.Union[
 701    GetBeaconSettingsReply,
 702    SetBeaconSettingsReply,
 703    GetBatteryLevelAsPercentageReply,
 704    GetRCBatteryLevelReply,
 705    GetBatteryLevelReply,
 706    GetBatteryVoltageReply,
 707    GetDeviceInfoReply,
 708    GetChannelReply,
 709    SetChannelReply,
 710    GetSettingsReply,
 711    SetSettingsReply,
 712    SendTncDataFragmentReply,
 713    GetStatusReply,
 714    GetPositionReply,
 715    MessageReplyError,
 716]
 717
 718#####################
 719# EventMessage
 720
 721
 722class StatusChangedEvent(t.NamedTuple):
 723    status: Status
 724
 725
 726class ChannelChangedEvent(t.NamedTuple):
 727    channel: Channel
 728
 729
 730class TncDataFragmentReceivedEvent(t.NamedTuple):
 731    tnc_data_fragment: TncDataFragment
 732
 733
 734class SettingsChangedEvent(t.NamedTuple):
 735    settings: Settings
 736
 737
 738class UnknownProtocolMessage(t.NamedTuple):
 739    message: p.Message
 740
 741
 742EventMessage = t.Union[
 743    TncDataFragmentReceivedEvent,
 744    SettingsChangedEvent,
 745    ChannelChangedEvent,
 746    StatusChangedEvent,
 747    UnknownProtocolMessage,
 748]
 749
 750EventType = t.Literal[
 751    "HT_STATUS_CHANGED",
 752    "DATA_RXD",
 753    "NEW_INQUIRY_DATA",
 754    "RESTORE_FACTORY_SETTINGS",
 755    "HT_CH_CHANGED",
 756    "HT_SETTINGS_CHANGED",
 757    "RINGING_STOPPED"
 758    "RADIO_STATUS_CHANGED",
 759    "USER_ACTION",
 760    "SYSTEM_EVENT",
 761    "BSS_SETTINGS_CHANGED",
 762    "DATA_TXD",
 763    "POSITION_CHANGED"
 764]
 765
 766RadioMessage = ReplyMessage | EventMessage
 767
 768RadioMessageT = t.TypeVar("RadioMessageT", bound=RadioMessage)
 769
 770#####################
 771# Handlers
 772
 773RadioMessageHandler = t.Callable[[RadioMessage], None]
 774"""@private"""
 775
 776EventHandler = t.Callable[[EventMessage], None]
 777"""@private"""
 778
 779#####################
 780# Protocol to data object conversions
 781
 782
 783class IntSplit(t.NamedTuple):
 784    """@private (A helper for working with integers split into upper and lower parts)"""
 785
 786    n_upper: int
 787    n_lower: int
 788
 789    def from_parts(self, upper: int, lower: int) -> int:
 790        if upper >= 1 << self.n_upper:
 791            raise ValueError(
 792                f"Upper part {upper} is too large for {self.n_upper} bits")
 793        if lower >= 1 << self.n_lower:
 794            raise ValueError(
 795                f"Lower part {lower} is too large for {self.n_lower} bits")
 796
 797        return (upper << self.n_lower) | lower
 798
 799    def get_upper(self, n: int) -> int:
 800        if n >= 1 << (self.n_upper + self.n_lower):
 801            raise ValueError(
 802                f"Value {n} is too large for {self.n_upper + self.n_lower} bits"
 803            )
 804        return n >> self.n_lower
 805
 806    def get_lower(self, n: int) -> int:
 807        if n >= 1 << (self.n_upper + self.n_lower):
 808            raise ValueError(
 809                f"Value {n} is too large for {self.n_upper + self.n_lower} bits"
 810            )
 811        return n & ((1 << self.n_lower) - 1)
 812
 813
 814class TncDataFragment(ImmutableBaseModel):
 815    """A data object representing a message packet"""
 816    is_final_fragment: bool
 817    fragment_id: int
 818    data: bytes
 819    channel_id: int | None = None
 820
 821    @classmethod
 822    def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment:
 823        """@private (Protocol helper)"""
 824        return TncDataFragment(
 825            is_final_fragment=mp.is_final_fragment,
 826            fragment_id=mp.fragment_id,
 827            data=mp.data,
 828            channel_id=mp.channel_id
 829        )
 830
 831    def to_protocol(self) -> p.TncDataFragment:
 832        """@private (Protocol helper)"""
 833        return p.TncDataFragment(
 834            is_final_fragment=self.is_final_fragment,
 835            with_channel_id=self.channel_id is not None,
 836            fragment_id=self.fragment_id,
 837            data=self.data,
 838            channel_id=self.channel_id
 839        )
 840
 841
 842ModulationType = t.Literal["AM", "FM", "DMR"]
 843
 844BandwidthType = t.Literal["NARROW", "WIDE"]
 845
 846
 847class DCS(t.NamedTuple):
 848    """A type for setting Digital Coded Squelch (DCS) on channels"""
 849
 850    n: int
 851    """The DCS Normal (N) code"""
 852
 853
 854def sub_audio_from_protocol(x: float | p.DCS | None) -> float | DCS | None:
 855    """@private (Protocol helper)"""
 856    match x:
 857        case p.DCS(n):
 858            return DCS(n=n)
 859        case _:
 860            return x
 861
 862
 863def sub_audio_to_protocol(x: float | DCS | None) -> float | p.DCS | None:
 864    """@private (Protocol helper)"""
 865    match x:
 866        case DCS(n):
 867            return p.DCS(n=n)
 868        case _:
 869            return x
 870
 871
 872class ChannelArgs(t.TypedDict, total=False):
 873    """A dictionary of the parameters that can be set on a channel"""
 874    tx_mod: ModulationType
 875    tx_freq: float
 876    rx_mod: ModulationType
 877    rx_freq: float
 878    tx_sub_audio: float | DCS | None
 879    rx_sub_audio: float | DCS | None
 880    scan: bool
 881    tx_at_max_power: bool
 882    talk_around: bool
 883    bandwidth: BandwidthType
 884    pre_de_emph_bypass: bool
 885    sign: bool
 886    tx_at_med_power: bool
 887    tx_disable: bool
 888    fixed_freq: bool
 889    fixed_bandwidth: bool
 890    fixed_tx_power: bool
 891    mute: bool
 892    name: str
 893
 894
 895class Channel(ImmutableBaseModel):
 896    """A data object representing a radio channel"""
 897    channel_id: int
 898    tx_mod: ModulationType
 899    tx_freq: float
 900    rx_mod: ModulationType
 901    rx_freq: float
 902    tx_sub_audio: float | DCS | None
 903    rx_sub_audio: float | DCS | None
 904    scan: bool
 905    tx_at_max_power: bool
 906    talk_around: bool
 907    bandwidth: BandwidthType
 908    pre_de_emph_bypass: bool
 909    sign: bool
 910    tx_at_med_power: bool
 911    tx_disable: bool
 912    fixed_freq: bool
 913    fixed_bandwidth: bool
 914    fixed_tx_power: bool
 915    mute: bool
 916    name: str
 917
 918    @classmethod
 919    def from_protocol(cls, cs: p.RfCh) -> Channel:
 920        """@private (Protocol helper)"""
 921        return Channel(
 922            channel_id=cs.channel_id,
 923            tx_mod=cs.tx_mod.name,
 924            tx_freq=cs.tx_freq,
 925            rx_mod=cs.rx_mod.name,
 926            rx_freq=cs.rx_freq,
 927            tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio),
 928            rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio),
 929            scan=cs.scan,
 930            tx_at_max_power=cs.tx_at_max_power,
 931            talk_around=cs.talk_around,
 932            bandwidth=cs.bandwidth.name,
 933            pre_de_emph_bypass=cs.pre_de_emph_bypass,
 934            sign=cs.sign,
 935            tx_at_med_power=cs.tx_at_med_power,
 936            tx_disable=cs.tx_disable,
 937            fixed_freq=cs.fixed_freq,
 938            fixed_bandwidth=cs.fixed_bandwidth,
 939            fixed_tx_power=cs.fixed_tx_power,
 940            mute=cs.mute,
 941            name=cs.name_str
 942        )
 943
 944    def to_protocol(self) -> p.RfCh:
 945        """@private (Protocol helper)"""
 946        return p.RfCh(
 947            channel_id=self.channel_id,
 948            tx_mod=p.ModulationType[self.tx_mod],
 949            tx_freq=self.tx_freq,
 950            rx_mod=p.ModulationType[self.rx_mod],
 951            rx_freq=self.rx_freq,
 952            tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio),
 953            rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio),
 954            scan=self.scan,
 955            tx_at_max_power=self.tx_at_max_power,
 956            talk_around=self.talk_around,
 957            bandwidth=p.BandwidthType[self.bandwidth],
 958            pre_de_emph_bypass=self.pre_de_emph_bypass,
 959            sign=self.sign,
 960            tx_at_med_power=self.tx_at_med_power,
 961            tx_disable=self.tx_disable,
 962            fixed_freq=self.fixed_freq,
 963            fixed_bandwidth=self.fixed_bandwidth,
 964            fixed_tx_power=self.fixed_tx_power,
 965            mute=self.mute,
 966            name_str=self.name
 967        )
 968
 969
 970class SettingsArgs(t.TypedDict, total=False):
 971    """A dictionary of the parameters that can be set in the radio settings"""
 972    channel_a: int
 973    channel_b: int
 974    scan: bool
 975    aghfp_call_mode: int
 976    double_channel: int
 977    squelch_level: int
 978    tail_elim: bool
 979    auto_relay_en: bool
 980    auto_power_on: bool
 981    keep_aghfp_link: bool
 982    mic_gain: int
 983    tx_hold_time: int
 984    tx_time_limit: int
 985    local_speaker: int
 986    bt_mic_gain: int
 987    adaptive_response: bool
 988    dis_tone: bool
 989    power_saving_mode: bool
 990    auto_power_off: int
 991    auto_share_loc_ch: int | t.Literal["current"]
 992    hm_speaker: int
 993    positioning_system: int
 994    time_offset: int
 995    use_freq_range_2: bool
 996    ptt_lock: bool
 997    leading_sync_bit_en: bool
 998    pairing_at_power_on: bool
 999    screen_timeout: int
1000    kiss_upload_tx_msg: bool
1001    kiss_en: bool
1002    imperial_unit: bool
1003    wx_mode: int
1004    noaa_ch: int
1005    vfol_tx_power_x: int
1006    vfo2_tx_power_x: int
1007    dis_digital_mute: bool
1008    signaling_ecc_en: bool
1009    ch_data_lock: bool
1010    kiss_tx_delay: int
1011    kiss_tx_tail: int
1012    vox_en: bool
1013    vox_level: int
1014    dis_bt_mic: bool
1015    vox_delay: int
1016    ns_en: bool
1017    alarm_volume: int
1018    use_custom_location: bool
1019    gpwpl_upload_en: bool
1020    vfo1_mod_freq_x: int
1021    custom_location_lat: int
1022    custom_location_lon: int
1023
1024
1025class Settings(ImmutableBaseModel):
1026    """A data object representing the radio settings"""
1027    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1028    _auto_share_loc_ch_split: t.ClassVar[IntSplit] = IntSplit(3, 5)
1029    channel_a: int
1030    channel_b: int
1031    scan: bool
1032    aghfp_call_mode: int
1033    double_channel: int
1034    squelch_level: int
1035    tail_elim: bool
1036    auto_relay_en: bool
1037    auto_power_on: bool
1038    keep_aghfp_link: bool
1039    mic_gain: int
1040    tx_hold_time: int
1041    tx_time_limit: int
1042    local_speaker: int
1043    bt_mic_gain: int
1044    adaptive_response: bool
1045    dis_tone: bool
1046    power_saving_mode: bool
1047    auto_power_off: int
1048    auto_share_loc_ch: int | t.Literal["current"]
1049    hm_speaker: int
1050    positioning_system: int
1051    time_offset: int
1052    use_freq_range_2: bool
1053    ptt_lock: bool
1054    leading_sync_bit_en: bool
1055    pairing_at_power_on: bool
1056    screen_timeout: int
1057    kiss_upload_tx_msg: bool
1058    kiss_en: bool
1059    imperial_unit: bool
1060    wx_mode: int
1061    noaa_ch: int
1062    vfol_tx_power_x: int
1063    vfo2_tx_power_x: int
1064    dis_digital_mute: bool
1065    signaling_ecc_en: bool
1066    ch_data_lock: bool
1067    kiss_tx_delay: int
1068    kiss_tx_tail: int
1069    vox_en: bool
1070    vox_level: int
1071    dis_bt_mic: bool
1072    vox_delay: int
1073    ns_en: bool
1074    alarm_volume: int
1075    use_custom_location: bool
1076    gpwpl_upload_en: bool
1077    vfo1_mod_freq_x: int
1078    custom_location_lat: int
1079    custom_location_lon: int
1080
1081    @classmethod
1082    def from_protocol(cls, rs: p.Settings) -> Settings:
1083        """@private (Protocol helper)"""
1084        _raw_auto_share_loc_ch = cls._auto_share_loc_ch_split.from_parts(
1085            rs.auto_share_loc_ch_upper, rs.auto_share_loc_ch
1086        )
1087        return Settings(
1088            channel_a=cls._channel_split.from_parts(
1089                rs.channel_a_upper, rs.channel_a_lower
1090            ),
1091            channel_b=cls._channel_split.from_parts(
1092                rs.channel_b_upper, rs.channel_b_lower
1093            ),
1094            scan=rs.scan,
1095            aghfp_call_mode=rs.aghfp_call_mode,
1096            double_channel=rs.double_channel,
1097            squelch_level=rs.squelch_level,
1098            tail_elim=rs.tail_elim,
1099            auto_relay_en=rs.auto_relay_en,
1100            auto_power_on=rs.auto_power_on,
1101            keep_aghfp_link=rs.keep_aghfp_link,
1102            mic_gain=rs.mic_gain,
1103            tx_hold_time=rs.tx_hold_time,
1104            tx_time_limit=rs.tx_time_limit,
1105            local_speaker=rs.local_speaker,
1106            bt_mic_gain=rs.bt_mic_gain,
1107            adaptive_response=rs.adaptive_response,
1108            dis_tone=rs.dis_tone,
1109            power_saving_mode=rs.power_saving_mode,
1110            auto_power_off=rs.auto_power_off,
1111            auto_share_loc_ch=_raw_auto_share_loc_ch -
1112            1 if _raw_auto_share_loc_ch > 0 else "current",
1113            hm_speaker=rs.hm_speaker,
1114            positioning_system=rs.positioning_system,
1115            time_offset=rs.time_offset,
1116            use_freq_range_2=rs.use_freq_range_2,
1117            ptt_lock=rs.ptt_lock,
1118            leading_sync_bit_en=rs.leading_sync_bit_en,
1119            pairing_at_power_on=rs.pairing_at_power_on,
1120            screen_timeout=rs.screen_timeout,
1121            kiss_upload_tx_msg=rs.kiss_upload_tx_msg,
1122            kiss_en=rs.kiss_en,
1123            imperial_unit=rs.imperial_unit,
1124            wx_mode=rs.wx_mode,
1125            noaa_ch=rs.noaa_ch,
1126            vfol_tx_power_x=rs.vfol_tx_power_x,
1127            vfo2_tx_power_x=rs.vfo2_tx_power_x,
1128            dis_digital_mute=rs.dis_digital_mute,
1129            signaling_ecc_en=rs.signaling_ecc_en,
1130            ch_data_lock=rs.ch_data_lock,
1131            kiss_tx_delay=rs.kiss_tx_delay,
1132            kiss_tx_tail=rs.kiss_tx_tail,
1133            vox_en=rs.vox_en,
1134            vox_level=rs.vox_level,
1135            dis_bt_mic=rs.dis_bt_mic,
1136            vox_delay=rs.vox_delay,
1137            ns_en=rs.ns_en,
1138            alarm_volume=rs.alarm_volume,
1139            use_custom_location=rs.use_custom_location,
1140            gpwpl_upload_en=rs.gpwpl_upload_en,
1141            vfo1_mod_freq_x=rs.vfo1_mod_freq_x,
1142            custom_location_lat=rs.custom_location_lat,
1143            custom_location_lon=rs.custom_location_lon
1144        )
1145
1146    def to_protocol(self):
1147        """@private (Protocol helper)"""
1148        _raw_auto_share_loc_ch = 0 if self.auto_share_loc_ch == "current" else self.auto_share_loc_ch + 1
1149        return p.Settings(
1150            channel_a_lower=self._channel_split.get_lower(self.channel_a),
1151            channel_b_lower=self._channel_split.get_lower(self.channel_b),
1152            scan=self.scan,
1153            aghfp_call_mode=self.aghfp_call_mode,
1154            double_channel=self.double_channel,
1155            squelch_level=self.squelch_level,
1156            tail_elim=self.tail_elim,
1157            auto_relay_en=self.auto_relay_en,
1158            auto_power_on=self.auto_power_on,
1159            keep_aghfp_link=self.keep_aghfp_link,
1160            mic_gain=self.mic_gain,
1161            tx_hold_time=self.tx_hold_time,
1162            tx_time_limit=self.tx_time_limit,
1163            local_speaker=self.local_speaker,
1164            bt_mic_gain=self.bt_mic_gain,
1165            adaptive_response=self.adaptive_response,
1166            dis_tone=self.dis_tone,
1167            power_saving_mode=self.power_saving_mode,
1168            auto_power_off=self.auto_power_off,
1169            auto_share_loc_ch=self._auto_share_loc_ch_split.get_lower(
1170                _raw_auto_share_loc_ch),
1171            hm_speaker=self.hm_speaker,
1172            positioning_system=self.positioning_system,
1173            time_offset=self.time_offset,
1174            use_freq_range_2=self.use_freq_range_2,
1175            ptt_lock=self.ptt_lock,
1176            leading_sync_bit_en=self.leading_sync_bit_en,
1177            pairing_at_power_on=self.pairing_at_power_on,
1178            screen_timeout=self.screen_timeout,
1179            kiss_upload_tx_msg=self.kiss_upload_tx_msg,
1180            kiss_en=self.kiss_en,
1181            imperial_unit=self.imperial_unit,
1182            channel_a_upper=self._channel_split.get_upper(self.channel_a),
1183            channel_b_upper=self._channel_split.get_upper(self.channel_b),
1184            wx_mode=self.wx_mode,
1185            noaa_ch=self.noaa_ch,
1186            vfol_tx_power_x=self.vfol_tx_power_x,
1187            vfo2_tx_power_x=self.vfo2_tx_power_x,
1188            dis_digital_mute=self.dis_digital_mute,
1189            signaling_ecc_en=self.signaling_ecc_en,
1190            ch_data_lock=self.ch_data_lock,
1191            auto_share_loc_ch_upper=self._auto_share_loc_ch_split.get_upper(
1192                _raw_auto_share_loc_ch),
1193            kiss_tx_delay=self.kiss_tx_delay,
1194            kiss_tx_tail=self.kiss_tx_tail,
1195            vox_en=self.vox_en,
1196            vox_level=self.vox_level,
1197            dis_bt_mic=self.dis_bt_mic,
1198            vox_delay=self.vox_delay,
1199            ns_en=self.ns_en,
1200            alarm_volume=self.alarm_volume,
1201            use_custom_location=self.use_custom_location,
1202            gpwpl_upload_en=self.gpwpl_upload_en,
1203            vfo1_mod_freq_x=self.vfo1_mod_freq_x,
1204            custom_location_lat=self.custom_location_lat,
1205            custom_location_lon=self.custom_location_lon
1206        )
1207
1208
1209class DeviceInfo(ImmutableBaseModel):
1210    """A data object representing the device information"""
1211    vendor_id: int
1212    product_id: int
1213    hardware_version: int
1214    firmware_version: int
1215    supports_radio: bool
1216    supports_medium_power: bool
1217    fixed_location_speaker_volume: bool
1218    has_speaker: bool
1219    has_hand_microphone_speaker: bool
1220    region_count: int
1221    supports_noaa: bool
1222    supports_gmrs: bool
1223    supports_vfo: bool
1224    supports_dmr: bool
1225    supports_software_power_control: bool
1226    channel_count: int
1227    frequency_range_count: int
1228
1229    @classmethod
1230    def from_protocol(cls, info: p.DevInfo) -> DeviceInfo:
1231        """@private (Protocol helper)"""
1232        return DeviceInfo(
1233            vendor_id=info.vendor_id,
1234            product_id=info.product_id,
1235            hardware_version=info.hw_ver,
1236            firmware_version=info.soft_ver,
1237            supports_radio=info.support_radio,
1238            supports_medium_power=info.support_medium_power,
1239            fixed_location_speaker_volume=info.fixed_loc_speaker_vol,
1240            supports_software_power_control=not info.not_support_soft_power_ctrl,
1241            has_speaker=not info.have_no_speaker,
1242            has_hand_microphone_speaker=info.have_hm_speaker,
1243            region_count=info.region_count,
1244            supports_noaa=info.support_noaa,
1245            supports_gmrs=info.gmrs,
1246            supports_vfo=info.support_vfo,
1247            supports_dmr=info.support_dmr,
1248            channel_count=info.channel_count,
1249            frequency_range_count=info.freq_range_count
1250        )
1251
1252    def to_protocol(self) -> p.DevInfo:
1253        """@private (Protocol helper)"""
1254        return p.DevInfo(
1255            vendor_id=self.vendor_id,
1256            product_id=self.product_id,
1257            hw_ver=self.hardware_version,
1258            soft_ver=self.firmware_version,
1259            support_radio=self.supports_radio,
1260            support_medium_power=self.supports_medium_power,
1261            fixed_loc_speaker_vol=self.fixed_location_speaker_volume,
1262            not_support_soft_power_ctrl=not self.supports_software_power_control,
1263            have_no_speaker=not self.has_speaker,
1264            have_hm_speaker=self.has_hand_microphone_speaker,
1265            region_count=self.region_count,
1266            support_noaa=self.supports_noaa,
1267            gmrs=self.supports_gmrs,
1268            support_vfo=self.supports_vfo,
1269            support_dmr=self.supports_dmr,
1270            channel_count=self.channel_count,
1271            freq_range_count=self.frequency_range_count
1272        )
1273
1274
1275class BeaconSettingsArgs(t.TypedDict, total=False):
1276    """A dictionary of the parameters that can be set in the beacon settings"""
1277    max_fwd_times: int
1278    time_to_live: int
1279    ptt_release_send_location: bool
1280    ptt_release_send_id_info: bool
1281    ptt_release_send_bss_user_id: bool
1282    should_share_location: bool
1283    send_pwr_voltage: bool
1284    packet_format: t.Literal["BSS", "APRS"]
1285    allow_position_check: bool
1286    aprs_ssid: int
1287    location_share_interval: int
1288    bss_user_id: int
1289    ptt_release_id_info: str
1290    beacon_message: str
1291    aprs_symbol: str
1292    aprs_callsign: str
1293
1294
1295class BeaconSettings(ImmutableBaseModel):
1296    """A data object representing the beacon settings"""
1297    _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32)
1298    max_fwd_times: int
1299    time_to_live: int
1300    ptt_release_send_location: bool
1301    ptt_release_send_id_info: bool
1302    ptt_release_send_bss_user_id: bool
1303    should_share_location: bool
1304    send_pwr_voltage: bool
1305    packet_format: t.Literal["BSS", "APRS"]
1306    allow_position_check: bool
1307    aprs_ssid: int
1308    location_share_interval: int
1309    bss_user_id: int
1310    ptt_release_id_info: str
1311    beacon_message: str
1312    aprs_symbol: str
1313    aprs_callsign: str
1314
1315    @classmethod
1316    def from_protocol(cls, bs: p.BSSSettingsV2 | p.BSSSettings) -> BeaconSettings:
1317        """@private (Protocol helper)"""
1318        return BeaconSettings(
1319            max_fwd_times=bs.max_fwd_times,
1320            time_to_live=bs.time_to_live,
1321            ptt_release_send_location=bs.ptt_release_send_location,
1322            ptt_release_send_id_info=bs.ptt_release_send_id_info,
1323            ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id,
1324            should_share_location=bs.should_share_location,
1325            send_pwr_voltage=bs.send_pwr_voltage,
1326            packet_format=bs.packet_format.name,
1327            allow_position_check=bs.allow_position_check,
1328            aprs_ssid=bs.aprs_ssid,
1329            location_share_interval=bs.location_share_interval,
1330            bss_user_id=cls._bss_user_id_split.from_parts(
1331                bs.bss_user_id_upper, bs.bss_user_id_lower
1332            ),
1333            ptt_release_id_info=bs.ptt_release_id_info,
1334            beacon_message=bs.beacon_message,
1335            aprs_symbol=bs.aprs_symbol,
1336            aprs_callsign=bs.aprs_callsign
1337        )
1338
1339    def to_protocol(self) -> p.BSSSettingsV2:
1340        """@private (Protocol helper)"""
1341        return p.BSSSettingsV2(
1342            max_fwd_times=self.max_fwd_times,
1343            time_to_live=self.time_to_live,
1344            ptt_release_send_location=self.ptt_release_send_location,
1345            ptt_release_send_id_info=self.ptt_release_send_id_info,
1346            ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id,
1347            should_share_location=self.should_share_location,
1348            send_pwr_voltage=self.send_pwr_voltage,
1349            packet_format=p.PacketFormat[self.packet_format],
1350            allow_position_check=self.allow_position_check,
1351            aprs_ssid=self.aprs_ssid,
1352            location_share_interval=self.location_share_interval,
1353            bss_user_id_lower=self._bss_user_id_split.get_lower(
1354                self.bss_user_id
1355            ),
1356            ptt_release_id_info=self.ptt_release_id_info,
1357            beacon_message=self.beacon_message,
1358            aprs_symbol=self.aprs_symbol,
1359            aprs_callsign=self.aprs_callsign,
1360            bss_user_id_upper=self._bss_user_id_split.get_upper(
1361                self.bss_user_id
1362            ),
1363        )
1364
1365
1366ChannelType = t.Literal["OFF", "A", "B"]
1367
1368
1369class Status(ImmutableBaseModel):
1370    """A data object representing the radio status"""
1371    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1372    is_power_on: bool
1373    is_in_tx: bool
1374    is_sq: bool
1375    is_in_rx: bool
1376    double_channel: ChannelType
1377    is_scan: bool
1378    is_radio: bool
1379    curr_ch_id: int
1380    is_gps_locked: bool
1381    is_hfp_connected: bool
1382    is_aoc_connected: bool
1383    rssi: float
1384    curr_region: int
1385
1386    @classmethod
1387    def from_protocol(cls, s: p.Status | p.StatusExt) -> Status:
1388        """@private (Protocol helper)"""
1389        if not isinstance(s, p.StatusExt):
1390            raise ValueError(
1391                "Radio replied with old Status message version. Upgrade your firmware!"
1392            )
1393
1394        return Status(
1395            is_power_on=s.is_power_on,
1396            is_in_tx=s.is_in_tx,
1397            is_sq=s.is_sq,
1398            is_in_rx=s.is_in_rx,
1399            double_channel=s.double_channel.name,
1400            is_scan=s.is_scan,
1401            is_radio=s.is_radio,
1402            curr_ch_id=cls._channel_split.from_parts(
1403                s.curr_channel_id_upper, s.curr_ch_id_lower
1404            ),
1405            is_gps_locked=s.is_gps_locked,
1406            is_hfp_connected=s.is_hfp_connected,
1407            is_aoc_connected=s.is_aoc_connected,
1408            rssi=s.rssi,
1409            curr_region=s.curr_region
1410        )
1411
1412    def to_protocol(self) -> p.StatusExt:
1413        """@private (Protocol helper)"""
1414        return p.StatusExt(
1415            is_power_on=self.is_power_on,
1416            is_in_tx=self.is_in_tx,
1417            is_sq=self.is_sq,
1418            is_in_rx=self.is_in_rx,
1419            double_channel=p.ChannelType[self.double_channel],
1420            is_scan=self.is_scan,
1421            is_radio=self.is_radio,
1422            curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id),
1423            is_gps_locked=self.is_gps_locked,
1424            is_hfp_connected=self.is_hfp_connected,
1425            is_aoc_connected=self.is_aoc_connected,
1426            rssi=self.rssi,
1427            curr_region=self.curr_region,
1428            curr_channel_id_upper=self._channel_split.get_upper(
1429                self.curr_ch_id)
1430        )
1431
1432
1433class Position(ImmutableBaseModel):
1434    """A data object for representing GPS positions"""
1435    latitude: float
1436    longitude: float
1437    altitude: int | None
1438    speed: int | None
1439    heading: int | None
1440    time: datetime
1441    accuracy: int
1442
1443    @classmethod
1444    def from_protocol(cls, x: p.Position) -> Position:
1445        """@private (Protocol helper)"""
1446        return Position(
1447            latitude=x.latitude,
1448            longitude=x.longitude,
1449            altitude=x.altitude,
1450            speed=x.speed,
1451            heading=x.heading,
1452            time=x.time,
1453            accuracy=x.accuracy,
1454        )
1455
1456    def to_protocol(self) -> p.Position:
1457        """@private (Protocol helper)"""
1458        return p.Position(
1459            latitude=self.latitude,
1460            longitude=self.longitude,
1461            altitude=self.altitude,
1462            speed=self.speed,
1463            heading=self.heading,
1464            time=self.time,
1465            accuracy=self.accuracy,
1466        )
class CommandConnection:
 54class CommandConnection:
 55    _link: CommandLink
 56    _handlers: t.List[RadioMessageHandler] = []
 57
 58    def __init__(self, link: CommandLink):
 59        self._link = link
 60        self._handlers = []
 61
 62    @classmethod
 63    def new_ble(cls, device_uuid: str) -> CommandConnection:
 64        return cls(BleCommandLink(device_uuid))
 65
 66    @classmethod
 67    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection:
 68        return cls(RfcommCommandLink(device_uuid, channel))
 69
 70    def is_connected(self) -> bool:
 71        return self._link.is_connected()
 72
 73    async def connect(self) -> None:
 74        await self._link.connect(self._on_recv)
 75
 76    async def disconnect(self) -> None:
 77        self._handlers.clear()
 78        await self._link.disconnect()
 79
 80    async def send_bytes(self, data: bytes) -> None:
 81        await self._link.send_bytes(data)
 82
 83    async def send_message(self, command: CommandMessage) -> None:
 84        await self._link.send(command_message_to_protocol(command))
 85
 86    async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError:
 87        queue: asyncio.Queue[RadioMessageT |
 88                             MessageReplyError] = asyncio.Queue()
 89
 90        def reply_handler(reply: RadioMessage):
 91            if (
 92                isinstance(reply, expect) or
 93                (
 94                    isinstance(reply, MessageReplyError) and
 95                    reply.message_type is expect
 96                )
 97            ):
 98                queue.put_nowait(reply)
 99
100        remove_handler = self._add_message_handler(reply_handler)
101
102        await self.send_message(command)
103
104        out = await queue.get()
105
106        remove_handler()
107
108        return out
109
110    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
111        def event_handler(msg: RadioMessage):
112            if isinstance(msg, EventMessage):
113                handler(msg)
114        return self._add_message_handler(event_handler)
115
116    def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]:
117        self._handlers.append(handler)
118
119        def remove_handler():
120            self._handlers.remove(handler)
121
122        return remove_handler
123
124    def _on_recv(self, msg: p.Message) -> None:
125        radio_message = radio_message_from_protocol(msg)
126        for handler in self._handlers:
127            handler(radio_message)
128
129    # Command API
130
131    async def enable_event(self, event_type: EventType) -> None:
132        """Enable an event"""
133        # This event doesn't get a reply version of EnableEvent. It does, however
134        # does trigger a StatusChangedEvent... but we don't wait for it here.
135        # Instead, we just fire and forget
136        await self.send_message(EnableEvent(event_type))
137
138    async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
139        """Send Tnc data"""
140        reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply)
141        if isinstance(reply, MessageReplyError):
142            raise reply.as_exception()
143
144    async def get_beacon_settings(self) -> BeaconSettings:
145        """Get the current packet settings"""
146        reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply)
147        if isinstance(reply, MessageReplyError):
148            raise reply.as_exception()
149        return reply.tnc_settings
150
151    async def set_beacon_settings(self, packet_settings: BeaconSettings):
152        """Set the packet settings"""
153        reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply)
154        if isinstance(reply, MessageReplyError):
155            raise reply.as_exception()
156
157    async def get_battery_level(self) -> int:
158        """Get the battery level"""
159        reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply)
160        if isinstance(reply, MessageReplyError):
161            raise reply.as_exception()
162        return reply.battery_level
163
164    async def get_battery_level_as_percentage(self) -> int:
165        """Get the battery level as a percentage"""
166        reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply)
167        if isinstance(reply, MessageReplyError):
168            raise reply.as_exception()
169        return reply.battery_level_as_percentage
170
171    async def get_rc_battery_level(self) -> int:
172        """Get the RC battery level"""
173        reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply)
174        if isinstance(reply, MessageReplyError):
175            raise reply.as_exception()
176        return reply.rc_battery_level
177
178    async def get_battery_voltage(self) -> float:
179        """Get the battery voltage"""
180        reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply)
181        if isinstance(reply, MessageReplyError):
182            raise reply.as_exception()
183        return reply.battery_voltage
184
185    async def get_device_info(self) -> DeviceInfo:
186        """Get the device info"""
187        reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply)
188        if isinstance(reply, MessageReplyError):
189            raise reply.as_exception()
190        return reply.device_info
191
192    async def get_settings(self) -> Settings:
193        """Get the settings"""
194        reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply)
195        if isinstance(reply, MessageReplyError):
196            raise reply.as_exception()
197        return reply.settings
198
199    async def set_settings(self, settings: Settings) -> None:
200        """Set the settings"""
201        reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply)
202        if isinstance(reply, MessageReplyError):
203            raise reply.as_exception()
204
205    async def get_channel(self, channel_id: int) -> Channel:
206        """Get a channel"""
207        reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply)
208        if isinstance(reply, MessageReplyError):
209            raise reply.as_exception()
210        return reply.channel
211
212    async def set_channel(self, channel: Channel):
213        """Set a channel"""
214        reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply)
215        if isinstance(reply, MessageReplyError):
216            raise reply.as_exception()
217
218    async def get_status(self) -> Status:
219        """Get the radio status"""
220        reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply)
221        if isinstance(reply, MessageReplyError):
222            raise reply.as_exception()
223        return reply.status
224
225    async def get_position(self) -> Position:
226        """Get the radio position"""
227        reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply)
228        if isinstance(reply, MessageReplyError):
229            raise reply.as_exception()
230        return reply.position
231
232    async def __aenter__(self):
233        await self.connect()
234        return self
235
236    async def __aexit__(
237        self,
238        exc_type: t.Any,
239        exc_value: t.Any,
240        traceback: t.Any,
241    ) -> None:
242        await self.disconnect()
CommandConnection(link: benlink.link.CommandLink)
58    def __init__(self, link: CommandLink):
59        self._link = link
60        self._handlers = []
@classmethod
def new_ble(cls, device_uuid: str) -> CommandConnection:
62    @classmethod
63    def new_ble(cls, device_uuid: str) -> CommandConnection:
64        return cls(BleCommandLink(device_uuid))
@classmethod
def new_rfcomm( cls, device_uuid: str, channel: Union[int, Literal['auto']] = 'auto') -> CommandConnection:
66    @classmethod
67    def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection:
68        return cls(RfcommCommandLink(device_uuid, channel))
def is_connected(self) -> bool:
70    def is_connected(self) -> bool:
71        return self._link.is_connected()
async def connect(self) -> None:
73    async def connect(self) -> None:
74        await self._link.connect(self._on_recv)
async def disconnect(self) -> None:
76    async def disconnect(self) -> None:
77        self._handlers.clear()
78        await self._link.disconnect()
async def send_bytes(self, data: bytes) -> None:
80    async def send_bytes(self, data: bytes) -> None:
81        await self._link.send_bytes(data)
83    async def send_message(self, command: CommandMessage) -> None:
84        await self._link.send(command_message_to_protocol(command))
async def send_message_expect_reply( self, command: Union[GetBeaconSettings, SetBeaconSettings, GetRCBatteryLevel, GetBatteryLevelAsPercentage, GetBatteryLevel, GetBatteryVoltage, GetDeviceInfo, GetChannel, SetChannel, GetSettings, SetSettings, SendTncDataFragment, EnableEvent, GetStatus, GetPosition], expect: Type[~RadioMessageT]) -> Union[~RadioMessageT, MessageReplyError]:
 86    async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError:
 87        queue: asyncio.Queue[RadioMessageT |
 88                             MessageReplyError] = asyncio.Queue()
 89
 90        def reply_handler(reply: RadioMessage):
 91            if (
 92                isinstance(reply, expect) or
 93                (
 94                    isinstance(reply, MessageReplyError) and
 95                    reply.message_type is expect
 96                )
 97            ):
 98                queue.put_nowait(reply)
 99
100        remove_handler = self._add_message_handler(reply_handler)
101
102        await self.send_message(command)
103
104        out = await queue.get()
105
106        remove_handler()
107
108        return out
def add_event_handler( self, handler: Callable[[Union[TncDataFragmentReceivedEvent, SettingsChangedEvent, ChannelChangedEvent, StatusChangedEvent, UnknownProtocolMessage]], NoneType]) -> Callable[[], NoneType]:
110    def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]:
111        def event_handler(msg: RadioMessage):
112            if isinstance(msg, EventMessage):
113                handler(msg)
114        return self._add_message_handler(event_handler)
async def enable_event( self, event_type: Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']) -> None:
131    async def enable_event(self, event_type: EventType) -> None:
132        """Enable an event"""
133        # This event doesn't get a reply version of EnableEvent. It does, however
134        # does trigger a StatusChangedEvent... but we don't wait for it here.
135        # Instead, we just fire and forget
136        await self.send_message(EnableEvent(event_type))

Enable an event

async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
138    async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None:
139        """Send Tnc data"""
140        reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply)
141        if isinstance(reply, MessageReplyError):
142            raise reply.as_exception()

Send Tnc data

async def get_beacon_settings(self) -> BeaconSettings:
144    async def get_beacon_settings(self) -> BeaconSettings:
145        """Get the current packet settings"""
146        reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply)
147        if isinstance(reply, MessageReplyError):
148            raise reply.as_exception()
149        return reply.tnc_settings

Get the current packet settings

async def set_beacon_settings(self, packet_settings: BeaconSettings):
151    async def set_beacon_settings(self, packet_settings: BeaconSettings):
152        """Set the packet settings"""
153        reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply)
154        if isinstance(reply, MessageReplyError):
155            raise reply.as_exception()

Set the packet settings

async def get_battery_level(self) -> int:
157    async def get_battery_level(self) -> int:
158        """Get the battery level"""
159        reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply)
160        if isinstance(reply, MessageReplyError):
161            raise reply.as_exception()
162        return reply.battery_level

Get the battery level

async def get_battery_level_as_percentage(self) -> int:
164    async def get_battery_level_as_percentage(self) -> int:
165        """Get the battery level as a percentage"""
166        reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply)
167        if isinstance(reply, MessageReplyError):
168            raise reply.as_exception()
169        return reply.battery_level_as_percentage

Get the battery level as a percentage

async def get_rc_battery_level(self) -> int:
171    async def get_rc_battery_level(self) -> int:
172        """Get the RC battery level"""
173        reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply)
174        if isinstance(reply, MessageReplyError):
175            raise reply.as_exception()
176        return reply.rc_battery_level

Get the RC battery level

async def get_battery_voltage(self) -> float:
178    async def get_battery_voltage(self) -> float:
179        """Get the battery voltage"""
180        reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply)
181        if isinstance(reply, MessageReplyError):
182            raise reply.as_exception()
183        return reply.battery_voltage

Get the battery voltage

async def get_device_info(self) -> DeviceInfo:
185    async def get_device_info(self) -> DeviceInfo:
186        """Get the device info"""
187        reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply)
188        if isinstance(reply, MessageReplyError):
189            raise reply.as_exception()
190        return reply.device_info

Get the device info

async def get_settings(self) -> Settings:
192    async def get_settings(self) -> Settings:
193        """Get the settings"""
194        reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply)
195        if isinstance(reply, MessageReplyError):
196            raise reply.as_exception()
197        return reply.settings

Get the settings

async def set_settings(self, settings: Settings) -> None:
199    async def set_settings(self, settings: Settings) -> None:
200        """Set the settings"""
201        reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply)
202        if isinstance(reply, MessageReplyError):
203            raise reply.as_exception()

Set the settings

async def get_channel(self, channel_id: int) -> Channel:
205    async def get_channel(self, channel_id: int) -> Channel:
206        """Get a channel"""
207        reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply)
208        if isinstance(reply, MessageReplyError):
209            raise reply.as_exception()
210        return reply.channel

Get a channel

async def set_channel(self, channel: Channel):
212    async def set_channel(self, channel: Channel):
213        """Set a channel"""
214        reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply)
215        if isinstance(reply, MessageReplyError):
216            raise reply.as_exception()

Set a channel

async def get_status(self) -> Status:
218    async def get_status(self) -> Status:
219        """Get the radio status"""
220        reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply)
221        if isinstance(reply, MessageReplyError):
222            raise reply.as_exception()
223        return reply.status

Get the radio status

async def get_position(self) -> Position:
225    async def get_position(self) -> Position:
226        """Get the radio position"""
227        reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply)
228        if isinstance(reply, MessageReplyError):
229            raise reply.as_exception()
230        return reply.position

Get the radio position

class EnableEvent(typing.NamedTuple):
543class EnableEvent(t.NamedTuple):
544    event_type: EventType

EnableEvent(event_type,)

EnableEvent(event_type: ForwardRef('EventType'))

Create new instance of EnableEvent(event_type,)

event_type: Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']

Alias for field number 0

class GetBeaconSettings(typing.NamedTuple):
547class GetBeaconSettings(t.NamedTuple):
548    pass

GetBeaconSettings()

GetBeaconSettings()

Create new instance of GetBeaconSettings()

class SetBeaconSettings(typing.NamedTuple):
551class SetBeaconSettings(t.NamedTuple):
552    tnc_settings: BeaconSettings

SetBeaconSettings(tnc_settings,)

SetBeaconSettings(tnc_settings: ForwardRef('BeaconSettings'))

Create new instance of SetBeaconSettings(tnc_settings,)

tnc_settings: BeaconSettings

Alias for field number 0

class SetSettings(typing.NamedTuple):
555class SetSettings(t.NamedTuple):
556    settings: Settings

SetSettings(settings,)

SetSettings(settings: ForwardRef('Settings'))

Create new instance of SetSettings(settings,)

settings: Settings

Alias for field number 0

class GetBatteryLevelAsPercentage(typing.NamedTuple):
559class GetBatteryLevelAsPercentage(t.NamedTuple):
560    pass

GetBatteryLevelAsPercentage()

GetBatteryLevelAsPercentage()

Create new instance of GetBatteryLevelAsPercentage()

class GetRCBatteryLevel(typing.NamedTuple):
563class GetRCBatteryLevel(t.NamedTuple):
564    pass

GetRCBatteryLevel()

GetRCBatteryLevel()

Create new instance of GetRCBatteryLevel()

class GetBatteryLevel(typing.NamedTuple):
567class GetBatteryLevel(t.NamedTuple):
568    pass

GetBatteryLevel()

GetBatteryLevel()

Create new instance of GetBatteryLevel()

class GetBatteryVoltage(typing.NamedTuple):
571class GetBatteryVoltage(t.NamedTuple):
572    pass

GetBatteryVoltage()

GetBatteryVoltage()

Create new instance of GetBatteryVoltage()

class GetDeviceInfo(typing.NamedTuple):
575class GetDeviceInfo(t.NamedTuple):
576    pass

GetDeviceInfo()

GetDeviceInfo()

Create new instance of GetDeviceInfo()

class GetChannel(typing.NamedTuple):
579class GetChannel(t.NamedTuple):
580    channel_id: int

GetChannel(channel_id,)

GetChannel(channel_id: int)

Create new instance of GetChannel(channel_id,)

channel_id: int

Alias for field number 0

class SetChannel(typing.NamedTuple):
583class SetChannel(t.NamedTuple):
584    channel: Channel

SetChannel(channel,)

SetChannel(channel: ForwardRef('Channel'))

Create new instance of SetChannel(channel,)

channel: Channel

Alias for field number 0

class GetSettings(typing.NamedTuple):
587class GetSettings(t.NamedTuple):
588    pass

GetSettings()

GetSettings()

Create new instance of GetSettings()

class GetStatus(typing.NamedTuple):
591class GetStatus(t.NamedTuple):
592    pass

GetStatus()

GetStatus()

Create new instance of GetStatus()

class GetPosition(typing.NamedTuple):
595class GetPosition(t.NamedTuple):
596    pass

GetPosition()

GetPosition()

Create new instance of GetPosition()

class SendTncDataFragment(typing.NamedTuple):
599class SendTncDataFragment(t.NamedTuple):
600    tnc_data_fragment: TncDataFragment

SendTncDataFragment(tnc_data_fragment,)

SendTncDataFragment(tnc_data_fragment: ForwardRef('TncDataFragment'))

Create new instance of SendTncDataFragment(tnc_data_fragment,)

tnc_data_fragment: TncDataFragment

Alias for field number 0

class SendTncDataFragmentReply(typing.NamedTuple):
625class SendTncDataFragmentReply(t.NamedTuple):
626    pass

SendTncDataFragmentReply()

SendTncDataFragmentReply()

Create new instance of SendTncDataFragmentReply()

class GetBeaconSettingsReply(typing.NamedTuple):
629class GetBeaconSettingsReply(t.NamedTuple):
630    tnc_settings: BeaconSettings

GetBeaconSettingsReply(tnc_settings,)

GetBeaconSettingsReply(tnc_settings: ForwardRef('BeaconSettings'))

Create new instance of GetBeaconSettingsReply(tnc_settings,)

tnc_settings: BeaconSettings

Alias for field number 0

class SetBeaconSettingsReply(typing.NamedTuple):
633class SetBeaconSettingsReply(t.NamedTuple):
634    pass

SetBeaconSettingsReply()

SetBeaconSettingsReply()

Create new instance of SetBeaconSettingsReply()

class SetSettingsReply(typing.NamedTuple):
637class SetSettingsReply(t.NamedTuple):
638    pass

SetSettingsReply()

SetSettingsReply()

Create new instance of SetSettingsReply()

class GetBatteryLevelAsPercentageReply(typing.NamedTuple):
641class GetBatteryLevelAsPercentageReply(t.NamedTuple):
642    battery_level_as_percentage: int

GetBatteryLevelAsPercentageReply(battery_level_as_percentage,)

GetBatteryLevelAsPercentageReply(battery_level_as_percentage: int)

Create new instance of GetBatteryLevelAsPercentageReply(battery_level_as_percentage,)

battery_level_as_percentage: int

Alias for field number 0

class GetRCBatteryLevelReply(typing.NamedTuple):
645class GetRCBatteryLevelReply(t.NamedTuple):
646    rc_battery_level: int

GetRCBatteryLevelReply(rc_battery_level,)

GetRCBatteryLevelReply(rc_battery_level: int)

Create new instance of GetRCBatteryLevelReply(rc_battery_level,)

rc_battery_level: int

Alias for field number 0

class GetBatteryLevelReply(typing.NamedTuple):
649class GetBatteryLevelReply(t.NamedTuple):
650    battery_level: int

GetBatteryLevelReply(battery_level,)

GetBatteryLevelReply(battery_level: int)

Create new instance of GetBatteryLevelReply(battery_level,)

battery_level: int

Alias for field number 0

class GetBatteryVoltageReply(typing.NamedTuple):
653class GetBatteryVoltageReply(t.NamedTuple):
654    battery_voltage: float

GetBatteryVoltageReply(battery_voltage,)

GetBatteryVoltageReply(battery_voltage: float)

Create new instance of GetBatteryVoltageReply(battery_voltage,)

battery_voltage: float

Alias for field number 0

class GetDeviceInfoReply(typing.NamedTuple):
657class GetDeviceInfoReply(t.NamedTuple):
658    device_info: DeviceInfo

GetDeviceInfoReply(device_info,)

GetDeviceInfoReply(device_info: ForwardRef('DeviceInfo'))

Create new instance of GetDeviceInfoReply(device_info,)

device_info: DeviceInfo

Alias for field number 0

class GetChannelReply(typing.NamedTuple):
661class GetChannelReply(t.NamedTuple):
662    channel: Channel

GetChannelReply(channel,)

GetChannelReply(channel: ForwardRef('Channel'))

Create new instance of GetChannelReply(channel,)

channel: Channel

Alias for field number 0

class SetChannelReply(typing.NamedTuple):
665class SetChannelReply(t.NamedTuple):
666    pass

SetChannelReply()

SetChannelReply()

Create new instance of SetChannelReply()

class GetStatusReply(typing.NamedTuple):
669class GetStatusReply(t.NamedTuple):
670    status: Status

GetStatusReply(status,)

GetStatusReply(status: ForwardRef('Status'))

Create new instance of GetStatusReply(status,)

status: Status

Alias for field number 0

class GetSettingsReply(typing.NamedTuple):
673class GetSettingsReply(t.NamedTuple):
674    settings: Settings

GetSettingsReply(settings,)

GetSettingsReply(settings: ForwardRef('Settings'))

Create new instance of GetSettingsReply(settings,)

settings: Settings

Alias for field number 0

class GetPositionReply(typing.NamedTuple):
677class GetPositionReply(t.NamedTuple):
678    position: Position

GetPositionReply(position,)

GetPositionReply(position: ForwardRef('Position'))

Create new instance of GetPositionReply(position,)

position: Position

Alias for field number 0

ReplyStatus = typing.Literal['SUCCESS', 'NOT_SUPPORTED', 'NOT_AUTHENTICATED', 'INSUFFICIENT_RESOURCES', 'AUTHENTICATING', 'INVALID_PARAMETER', 'INCORRECT_STATE', 'IN_PROGRESS']
class MessageReplyError(typing.NamedTuple):
693class MessageReplyError(t.NamedTuple):
694    message_type: t.Type[t.Any]
695    reason: ReplyStatus
696
697    def as_exception(self):
698        return ValueError(f"{self.message_type.__name__} failed: {self.reason}")

MessageReplyError(message_type, reason)

MessageReplyError( message_type: ForwardRef('t.Type[t.Any]'), reason: ForwardRef('ReplyStatus'))

Create new instance of MessageReplyError(message_type, reason)

message_type: Type[Any]

Alias for field number 0

reason: Literal['SUCCESS', 'NOT_SUPPORTED', 'NOT_AUTHENTICATED', 'INSUFFICIENT_RESOURCES', 'AUTHENTICATING', 'INVALID_PARAMETER', 'INCORRECT_STATE', 'IN_PROGRESS']

Alias for field number 1

def as_exception(self):
697    def as_exception(self):
698        return ValueError(f"{self.message_type.__name__} failed: {self.reason}")
class StatusChangedEvent(typing.NamedTuple):
723class StatusChangedEvent(t.NamedTuple):
724    status: Status

StatusChangedEvent(status,)

StatusChangedEvent(status: ForwardRef('Status'))

Create new instance of StatusChangedEvent(status,)

status: Status

Alias for field number 0

class ChannelChangedEvent(typing.NamedTuple):
727class ChannelChangedEvent(t.NamedTuple):
728    channel: Channel

ChannelChangedEvent(channel,)

ChannelChangedEvent(channel: ForwardRef('Channel'))

Create new instance of ChannelChangedEvent(channel,)

channel: Channel

Alias for field number 0

class TncDataFragmentReceivedEvent(typing.NamedTuple):
731class TncDataFragmentReceivedEvent(t.NamedTuple):
732    tnc_data_fragment: TncDataFragment

TncDataFragmentReceivedEvent(tnc_data_fragment,)

TncDataFragmentReceivedEvent(tnc_data_fragment: ForwardRef('TncDataFragment'))

Create new instance of TncDataFragmentReceivedEvent(tnc_data_fragment,)

tnc_data_fragment: TncDataFragment

Alias for field number 0

class SettingsChangedEvent(typing.NamedTuple):
735class SettingsChangedEvent(t.NamedTuple):
736    settings: Settings

SettingsChangedEvent(settings,)

SettingsChangedEvent(settings: ForwardRef('Settings'))

Create new instance of SettingsChangedEvent(settings,)

settings: Settings

Alias for field number 0

class UnknownProtocolMessage(typing.NamedTuple):
739class UnknownProtocolMessage(t.NamedTuple):
740    message: p.Message

UnknownProtocolMessage(message,)

UnknownProtocolMessage(message: ForwardRef('p.Message'))

Create new instance of UnknownProtocolMessage(message,)

message: benlink.protocol.command.message.Message

Alias for field number 0

EventType = typing.Literal['HT_STATUS_CHANGED', 'DATA_RXD', 'NEW_INQUIRY_DATA', 'RESTORE_FACTORY_SETTINGS', 'HT_CH_CHANGED', 'HT_SETTINGS_CHANGED', 'RINGING_STOPPEDRADIO_STATUS_CHANGED', 'USER_ACTION', 'SYSTEM_EVENT', 'BSS_SETTINGS_CHANGED', 'DATA_TXD', 'POSITION_CHANGED']
class TncDataFragment(ImmutableBaseModel):
815class TncDataFragment(ImmutableBaseModel):
816    """A data object representing a message packet"""
817    is_final_fragment: bool
818    fragment_id: int
819    data: bytes
820    channel_id: int | None = None
821
822    @classmethod
823    def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment:
824        """@private (Protocol helper)"""
825        return TncDataFragment(
826            is_final_fragment=mp.is_final_fragment,
827            fragment_id=mp.fragment_id,
828            data=mp.data,
829            channel_id=mp.channel_id
830        )
831
832    def to_protocol(self) -> p.TncDataFragment:
833        """@private (Protocol helper)"""
834        return p.TncDataFragment(
835            is_final_fragment=self.is_final_fragment,
836            with_channel_id=self.channel_id is not None,
837            fragment_id=self.fragment_id,
838            data=self.data,
839            channel_id=self.channel_id
840        )

A data object representing a message packet

is_final_fragment: bool = PydanticUndefined
fragment_id: int = PydanticUndefined
data: bytes = PydanticUndefined
channel_id: int | None = None
ModulationType = typing.Literal['AM', 'FM', 'DMR']
BandwidthType = typing.Literal['NARROW', 'WIDE']
class DCS(typing.NamedTuple):
848class DCS(t.NamedTuple):
849    """A type for setting Digital Coded Squelch (DCS) on channels"""
850
851    n: int
852    """The DCS Normal (N) code"""

A type for setting Digital Coded Squelch (DCS) on channels

DCS(n: int)

Create new instance of DCS(n,)

n: int

The DCS Normal (N) code

class ChannelArgs(typing.TypedDict):
873class ChannelArgs(t.TypedDict, total=False):
874    """A dictionary of the parameters that can be set on a channel"""
875    tx_mod: ModulationType
876    tx_freq: float
877    rx_mod: ModulationType
878    rx_freq: float
879    tx_sub_audio: float | DCS | None
880    rx_sub_audio: float | DCS | None
881    scan: bool
882    tx_at_max_power: bool
883    talk_around: bool
884    bandwidth: BandwidthType
885    pre_de_emph_bypass: bool
886    sign: bool
887    tx_at_med_power: bool
888    tx_disable: bool
889    fixed_freq: bool
890    fixed_bandwidth: bool
891    fixed_tx_power: bool
892    mute: bool
893    name: str

A dictionary of the parameters that can be set on a channel

tx_mod: Literal['AM', 'FM', 'DMR']
tx_freq: float
rx_mod: Literal['AM', 'FM', 'DMR']
rx_freq: float
tx_sub_audio: float | DCS | None
rx_sub_audio: float | DCS | None
scan: bool
tx_at_max_power: bool
talk_around: bool
bandwidth: Literal['NARROW', 'WIDE']
pre_de_emph_bypass: bool
sign: bool
tx_at_med_power: bool
tx_disable: bool
fixed_freq: bool
fixed_bandwidth: bool
fixed_tx_power: bool
mute: bool
name: str
class Channel(ImmutableBaseModel):
896class Channel(ImmutableBaseModel):
897    """A data object representing a radio channel"""
898    channel_id: int
899    tx_mod: ModulationType
900    tx_freq: float
901    rx_mod: ModulationType
902    rx_freq: float
903    tx_sub_audio: float | DCS | None
904    rx_sub_audio: float | DCS | None
905    scan: bool
906    tx_at_max_power: bool
907    talk_around: bool
908    bandwidth: BandwidthType
909    pre_de_emph_bypass: bool
910    sign: bool
911    tx_at_med_power: bool
912    tx_disable: bool
913    fixed_freq: bool
914    fixed_bandwidth: bool
915    fixed_tx_power: bool
916    mute: bool
917    name: str
918
919    @classmethod
920    def from_protocol(cls, cs: p.RfCh) -> Channel:
921        """@private (Protocol helper)"""
922        return Channel(
923            channel_id=cs.channel_id,
924            tx_mod=cs.tx_mod.name,
925            tx_freq=cs.tx_freq,
926            rx_mod=cs.rx_mod.name,
927            rx_freq=cs.rx_freq,
928            tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio),
929            rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio),
930            scan=cs.scan,
931            tx_at_max_power=cs.tx_at_max_power,
932            talk_around=cs.talk_around,
933            bandwidth=cs.bandwidth.name,
934            pre_de_emph_bypass=cs.pre_de_emph_bypass,
935            sign=cs.sign,
936            tx_at_med_power=cs.tx_at_med_power,
937            tx_disable=cs.tx_disable,
938            fixed_freq=cs.fixed_freq,
939            fixed_bandwidth=cs.fixed_bandwidth,
940            fixed_tx_power=cs.fixed_tx_power,
941            mute=cs.mute,
942            name=cs.name_str
943        )
944
945    def to_protocol(self) -> p.RfCh:
946        """@private (Protocol helper)"""
947        return p.RfCh(
948            channel_id=self.channel_id,
949            tx_mod=p.ModulationType[self.tx_mod],
950            tx_freq=self.tx_freq,
951            rx_mod=p.ModulationType[self.rx_mod],
952            rx_freq=self.rx_freq,
953            tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio),
954            rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio),
955            scan=self.scan,
956            tx_at_max_power=self.tx_at_max_power,
957            talk_around=self.talk_around,
958            bandwidth=p.BandwidthType[self.bandwidth],
959            pre_de_emph_bypass=self.pre_de_emph_bypass,
960            sign=self.sign,
961            tx_at_med_power=self.tx_at_med_power,
962            tx_disable=self.tx_disable,
963            fixed_freq=self.fixed_freq,
964            fixed_bandwidth=self.fixed_bandwidth,
965            fixed_tx_power=self.fixed_tx_power,
966            mute=self.mute,
967            name_str=self.name
968        )

A data object representing a radio channel

channel_id: int = PydanticUndefined
tx_mod: Literal['AM', 'FM', 'DMR'] = PydanticUndefined
tx_freq: float = PydanticUndefined
rx_mod: Literal['AM', 'FM', 'DMR'] = PydanticUndefined
rx_freq: float = PydanticUndefined
tx_sub_audio: float | DCS | None = PydanticUndefined
rx_sub_audio: float | DCS | None = PydanticUndefined
scan: bool = PydanticUndefined
tx_at_max_power: bool = PydanticUndefined
talk_around: bool = PydanticUndefined
bandwidth: Literal['NARROW', 'WIDE'] = PydanticUndefined
pre_de_emph_bypass: bool = PydanticUndefined
sign: bool = PydanticUndefined
tx_at_med_power: bool = PydanticUndefined
tx_disable: bool = PydanticUndefined
fixed_freq: bool = PydanticUndefined
fixed_bandwidth: bool = PydanticUndefined
fixed_tx_power: bool = PydanticUndefined
mute: bool = PydanticUndefined
name: str = PydanticUndefined
class SettingsArgs(typing.TypedDict):
 971class SettingsArgs(t.TypedDict, total=False):
 972    """A dictionary of the parameters that can be set in the radio settings"""
 973    channel_a: int
 974    channel_b: int
 975    scan: bool
 976    aghfp_call_mode: int
 977    double_channel: int
 978    squelch_level: int
 979    tail_elim: bool
 980    auto_relay_en: bool
 981    auto_power_on: bool
 982    keep_aghfp_link: bool
 983    mic_gain: int
 984    tx_hold_time: int
 985    tx_time_limit: int
 986    local_speaker: int
 987    bt_mic_gain: int
 988    adaptive_response: bool
 989    dis_tone: bool
 990    power_saving_mode: bool
 991    auto_power_off: int
 992    auto_share_loc_ch: int | t.Literal["current"]
 993    hm_speaker: int
 994    positioning_system: int
 995    time_offset: int
 996    use_freq_range_2: bool
 997    ptt_lock: bool
 998    leading_sync_bit_en: bool
 999    pairing_at_power_on: bool
1000    screen_timeout: int
1001    kiss_upload_tx_msg: bool
1002    kiss_en: bool
1003    imperial_unit: bool
1004    wx_mode: int
1005    noaa_ch: int
1006    vfol_tx_power_x: int
1007    vfo2_tx_power_x: int
1008    dis_digital_mute: bool
1009    signaling_ecc_en: bool
1010    ch_data_lock: bool
1011    kiss_tx_delay: int
1012    kiss_tx_tail: int
1013    vox_en: bool
1014    vox_level: int
1015    dis_bt_mic: bool
1016    vox_delay: int
1017    ns_en: bool
1018    alarm_volume: int
1019    use_custom_location: bool
1020    gpwpl_upload_en: bool
1021    vfo1_mod_freq_x: int
1022    custom_location_lat: int
1023    custom_location_lon: int

A dictionary of the parameters that can be set in the radio settings

channel_a: int
channel_b: int
scan: bool
aghfp_call_mode: int
double_channel: int
squelch_level: int
tail_elim: bool
auto_relay_en: bool
auto_power_on: bool
mic_gain: int
tx_hold_time: int
tx_time_limit: int
local_speaker: int
bt_mic_gain: int
adaptive_response: bool
dis_tone: bool
power_saving_mode: bool
auto_power_off: int
auto_share_loc_ch: Union[int, Literal['current']]
hm_speaker: int
positioning_system: int
time_offset: int
use_freq_range_2: bool
ptt_lock: bool
leading_sync_bit_en: bool
pairing_at_power_on: bool
screen_timeout: int
kiss_upload_tx_msg: bool
kiss_en: bool
imperial_unit: bool
wx_mode: int
noaa_ch: int
vfol_tx_power_x: int
vfo2_tx_power_x: int
dis_digital_mute: bool
signaling_ecc_en: bool
ch_data_lock: bool
kiss_tx_delay: int
kiss_tx_tail: int
vox_en: bool
vox_level: int
dis_bt_mic: bool
vox_delay: int
ns_en: bool
alarm_volume: int
use_custom_location: bool
gpwpl_upload_en: bool
vfo1_mod_freq_x: int
custom_location_lat: int
custom_location_lon: int
class Settings(ImmutableBaseModel):
1026class Settings(ImmutableBaseModel):
1027    """A data object representing the radio settings"""
1028    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1029    _auto_share_loc_ch_split: t.ClassVar[IntSplit] = IntSplit(3, 5)
1030    channel_a: int
1031    channel_b: int
1032    scan: bool
1033    aghfp_call_mode: int
1034    double_channel: int
1035    squelch_level: int
1036    tail_elim: bool
1037    auto_relay_en: bool
1038    auto_power_on: bool
1039    keep_aghfp_link: bool
1040    mic_gain: int
1041    tx_hold_time: int
1042    tx_time_limit: int
1043    local_speaker: int
1044    bt_mic_gain: int
1045    adaptive_response: bool
1046    dis_tone: bool
1047    power_saving_mode: bool
1048    auto_power_off: int
1049    auto_share_loc_ch: int | t.Literal["current"]
1050    hm_speaker: int
1051    positioning_system: int
1052    time_offset: int
1053    use_freq_range_2: bool
1054    ptt_lock: bool
1055    leading_sync_bit_en: bool
1056    pairing_at_power_on: bool
1057    screen_timeout: int
1058    kiss_upload_tx_msg: bool
1059    kiss_en: bool
1060    imperial_unit: bool
1061    wx_mode: int
1062    noaa_ch: int
1063    vfol_tx_power_x: int
1064    vfo2_tx_power_x: int
1065    dis_digital_mute: bool
1066    signaling_ecc_en: bool
1067    ch_data_lock: bool
1068    kiss_tx_delay: int
1069    kiss_tx_tail: int
1070    vox_en: bool
1071    vox_level: int
1072    dis_bt_mic: bool
1073    vox_delay: int
1074    ns_en: bool
1075    alarm_volume: int
1076    use_custom_location: bool
1077    gpwpl_upload_en: bool
1078    vfo1_mod_freq_x: int
1079    custom_location_lat: int
1080    custom_location_lon: int
1081
1082    @classmethod
1083    def from_protocol(cls, rs: p.Settings) -> Settings:
1084        """@private (Protocol helper)"""
1085        _raw_auto_share_loc_ch = cls._auto_share_loc_ch_split.from_parts(
1086            rs.auto_share_loc_ch_upper, rs.auto_share_loc_ch
1087        )
1088        return Settings(
1089            channel_a=cls._channel_split.from_parts(
1090                rs.channel_a_upper, rs.channel_a_lower
1091            ),
1092            channel_b=cls._channel_split.from_parts(
1093                rs.channel_b_upper, rs.channel_b_lower
1094            ),
1095            scan=rs.scan,
1096            aghfp_call_mode=rs.aghfp_call_mode,
1097            double_channel=rs.double_channel,
1098            squelch_level=rs.squelch_level,
1099            tail_elim=rs.tail_elim,
1100            auto_relay_en=rs.auto_relay_en,
1101            auto_power_on=rs.auto_power_on,
1102            keep_aghfp_link=rs.keep_aghfp_link,
1103            mic_gain=rs.mic_gain,
1104            tx_hold_time=rs.tx_hold_time,
1105            tx_time_limit=rs.tx_time_limit,
1106            local_speaker=rs.local_speaker,
1107            bt_mic_gain=rs.bt_mic_gain,
1108            adaptive_response=rs.adaptive_response,
1109            dis_tone=rs.dis_tone,
1110            power_saving_mode=rs.power_saving_mode,
1111            auto_power_off=rs.auto_power_off,
1112            auto_share_loc_ch=_raw_auto_share_loc_ch -
1113            1 if _raw_auto_share_loc_ch > 0 else "current",
1114            hm_speaker=rs.hm_speaker,
1115            positioning_system=rs.positioning_system,
1116            time_offset=rs.time_offset,
1117            use_freq_range_2=rs.use_freq_range_2,
1118            ptt_lock=rs.ptt_lock,
1119            leading_sync_bit_en=rs.leading_sync_bit_en,
1120            pairing_at_power_on=rs.pairing_at_power_on,
1121            screen_timeout=rs.screen_timeout,
1122            kiss_upload_tx_msg=rs.kiss_upload_tx_msg,
1123            kiss_en=rs.kiss_en,
1124            imperial_unit=rs.imperial_unit,
1125            wx_mode=rs.wx_mode,
1126            noaa_ch=rs.noaa_ch,
1127            vfol_tx_power_x=rs.vfol_tx_power_x,
1128            vfo2_tx_power_x=rs.vfo2_tx_power_x,
1129            dis_digital_mute=rs.dis_digital_mute,
1130            signaling_ecc_en=rs.signaling_ecc_en,
1131            ch_data_lock=rs.ch_data_lock,
1132            kiss_tx_delay=rs.kiss_tx_delay,
1133            kiss_tx_tail=rs.kiss_tx_tail,
1134            vox_en=rs.vox_en,
1135            vox_level=rs.vox_level,
1136            dis_bt_mic=rs.dis_bt_mic,
1137            vox_delay=rs.vox_delay,
1138            ns_en=rs.ns_en,
1139            alarm_volume=rs.alarm_volume,
1140            use_custom_location=rs.use_custom_location,
1141            gpwpl_upload_en=rs.gpwpl_upload_en,
1142            vfo1_mod_freq_x=rs.vfo1_mod_freq_x,
1143            custom_location_lat=rs.custom_location_lat,
1144            custom_location_lon=rs.custom_location_lon
1145        )
1146
1147    def to_protocol(self):
1148        """@private (Protocol helper)"""
1149        _raw_auto_share_loc_ch = 0 if self.auto_share_loc_ch == "current" else self.auto_share_loc_ch + 1
1150        return p.Settings(
1151            channel_a_lower=self._channel_split.get_lower(self.channel_a),
1152            channel_b_lower=self._channel_split.get_lower(self.channel_b),
1153            scan=self.scan,
1154            aghfp_call_mode=self.aghfp_call_mode,
1155            double_channel=self.double_channel,
1156            squelch_level=self.squelch_level,
1157            tail_elim=self.tail_elim,
1158            auto_relay_en=self.auto_relay_en,
1159            auto_power_on=self.auto_power_on,
1160            keep_aghfp_link=self.keep_aghfp_link,
1161            mic_gain=self.mic_gain,
1162            tx_hold_time=self.tx_hold_time,
1163            tx_time_limit=self.tx_time_limit,
1164            local_speaker=self.local_speaker,
1165            bt_mic_gain=self.bt_mic_gain,
1166            adaptive_response=self.adaptive_response,
1167            dis_tone=self.dis_tone,
1168            power_saving_mode=self.power_saving_mode,
1169            auto_power_off=self.auto_power_off,
1170            auto_share_loc_ch=self._auto_share_loc_ch_split.get_lower(
1171                _raw_auto_share_loc_ch),
1172            hm_speaker=self.hm_speaker,
1173            positioning_system=self.positioning_system,
1174            time_offset=self.time_offset,
1175            use_freq_range_2=self.use_freq_range_2,
1176            ptt_lock=self.ptt_lock,
1177            leading_sync_bit_en=self.leading_sync_bit_en,
1178            pairing_at_power_on=self.pairing_at_power_on,
1179            screen_timeout=self.screen_timeout,
1180            kiss_upload_tx_msg=self.kiss_upload_tx_msg,
1181            kiss_en=self.kiss_en,
1182            imperial_unit=self.imperial_unit,
1183            channel_a_upper=self._channel_split.get_upper(self.channel_a),
1184            channel_b_upper=self._channel_split.get_upper(self.channel_b),
1185            wx_mode=self.wx_mode,
1186            noaa_ch=self.noaa_ch,
1187            vfol_tx_power_x=self.vfol_tx_power_x,
1188            vfo2_tx_power_x=self.vfo2_tx_power_x,
1189            dis_digital_mute=self.dis_digital_mute,
1190            signaling_ecc_en=self.signaling_ecc_en,
1191            ch_data_lock=self.ch_data_lock,
1192            auto_share_loc_ch_upper=self._auto_share_loc_ch_split.get_upper(
1193                _raw_auto_share_loc_ch),
1194            kiss_tx_delay=self.kiss_tx_delay,
1195            kiss_tx_tail=self.kiss_tx_tail,
1196            vox_en=self.vox_en,
1197            vox_level=self.vox_level,
1198            dis_bt_mic=self.dis_bt_mic,
1199            vox_delay=self.vox_delay,
1200            ns_en=self.ns_en,
1201            alarm_volume=self.alarm_volume,
1202            use_custom_location=self.use_custom_location,
1203            gpwpl_upload_en=self.gpwpl_upload_en,
1204            vfo1_mod_freq_x=self.vfo1_mod_freq_x,
1205            custom_location_lat=self.custom_location_lat,
1206            custom_location_lon=self.custom_location_lon
1207        )

A data object representing the radio settings

channel_a: int = PydanticUndefined
channel_b: int = PydanticUndefined
scan: bool = PydanticUndefined
aghfp_call_mode: int = PydanticUndefined
double_channel: int = PydanticUndefined
squelch_level: int = PydanticUndefined
tail_elim: bool = PydanticUndefined
auto_relay_en: bool = PydanticUndefined
auto_power_on: bool = PydanticUndefined
mic_gain: int = PydanticUndefined
tx_hold_time: int = PydanticUndefined
tx_time_limit: int = PydanticUndefined
local_speaker: int = PydanticUndefined
bt_mic_gain: int = PydanticUndefined
adaptive_response: bool = PydanticUndefined
dis_tone: bool = PydanticUndefined
power_saving_mode: bool = PydanticUndefined
auto_power_off: int = PydanticUndefined
auto_share_loc_ch: Union[int, Literal['current']] = PydanticUndefined
hm_speaker: int = PydanticUndefined
positioning_system: int = PydanticUndefined
time_offset: int = PydanticUndefined
use_freq_range_2: bool = PydanticUndefined
ptt_lock: bool = PydanticUndefined
leading_sync_bit_en: bool = PydanticUndefined
pairing_at_power_on: bool = PydanticUndefined
screen_timeout: int = PydanticUndefined
kiss_upload_tx_msg: bool = PydanticUndefined
kiss_en: bool = PydanticUndefined
imperial_unit: bool = PydanticUndefined
wx_mode: int = PydanticUndefined
noaa_ch: int = PydanticUndefined
vfol_tx_power_x: int = PydanticUndefined
vfo2_tx_power_x: int = PydanticUndefined
dis_digital_mute: bool = PydanticUndefined
signaling_ecc_en: bool = PydanticUndefined
ch_data_lock: bool = PydanticUndefined
kiss_tx_delay: int = PydanticUndefined
kiss_tx_tail: int = PydanticUndefined
vox_en: bool = PydanticUndefined
vox_level: int = PydanticUndefined
dis_bt_mic: bool = PydanticUndefined
vox_delay: int = PydanticUndefined
ns_en: bool = PydanticUndefined
alarm_volume: int = PydanticUndefined
use_custom_location: bool = PydanticUndefined
gpwpl_upload_en: bool = PydanticUndefined
vfo1_mod_freq_x: int = PydanticUndefined
custom_location_lat: int = PydanticUndefined
custom_location_lon: int = PydanticUndefined
class DeviceInfo(ImmutableBaseModel):
1210class DeviceInfo(ImmutableBaseModel):
1211    """A data object representing the device information"""
1212    vendor_id: int
1213    product_id: int
1214    hardware_version: int
1215    firmware_version: int
1216    supports_radio: bool
1217    supports_medium_power: bool
1218    fixed_location_speaker_volume: bool
1219    has_speaker: bool
1220    has_hand_microphone_speaker: bool
1221    region_count: int
1222    supports_noaa: bool
1223    supports_gmrs: bool
1224    supports_vfo: bool
1225    supports_dmr: bool
1226    supports_software_power_control: bool
1227    channel_count: int
1228    frequency_range_count: int
1229
1230    @classmethod
1231    def from_protocol(cls, info: p.DevInfo) -> DeviceInfo:
1232        """@private (Protocol helper)"""
1233        return DeviceInfo(
1234            vendor_id=info.vendor_id,
1235            product_id=info.product_id,
1236            hardware_version=info.hw_ver,
1237            firmware_version=info.soft_ver,
1238            supports_radio=info.support_radio,
1239            supports_medium_power=info.support_medium_power,
1240            fixed_location_speaker_volume=info.fixed_loc_speaker_vol,
1241            supports_software_power_control=not info.not_support_soft_power_ctrl,
1242            has_speaker=not info.have_no_speaker,
1243            has_hand_microphone_speaker=info.have_hm_speaker,
1244            region_count=info.region_count,
1245            supports_noaa=info.support_noaa,
1246            supports_gmrs=info.gmrs,
1247            supports_vfo=info.support_vfo,
1248            supports_dmr=info.support_dmr,
1249            channel_count=info.channel_count,
1250            frequency_range_count=info.freq_range_count
1251        )
1252
1253    def to_protocol(self) -> p.DevInfo:
1254        """@private (Protocol helper)"""
1255        return p.DevInfo(
1256            vendor_id=self.vendor_id,
1257            product_id=self.product_id,
1258            hw_ver=self.hardware_version,
1259            soft_ver=self.firmware_version,
1260            support_radio=self.supports_radio,
1261            support_medium_power=self.supports_medium_power,
1262            fixed_loc_speaker_vol=self.fixed_location_speaker_volume,
1263            not_support_soft_power_ctrl=not self.supports_software_power_control,
1264            have_no_speaker=not self.has_speaker,
1265            have_hm_speaker=self.has_hand_microphone_speaker,
1266            region_count=self.region_count,
1267            support_noaa=self.supports_noaa,
1268            gmrs=self.supports_gmrs,
1269            support_vfo=self.supports_vfo,
1270            support_dmr=self.supports_dmr,
1271            channel_count=self.channel_count,
1272            freq_range_count=self.frequency_range_count
1273        )

A data object representing the device information

vendor_id: int = PydanticUndefined
product_id: int = PydanticUndefined
hardware_version: int = PydanticUndefined
firmware_version: int = PydanticUndefined
supports_radio: bool = PydanticUndefined
supports_medium_power: bool = PydanticUndefined
fixed_location_speaker_volume: bool = PydanticUndefined
has_speaker: bool = PydanticUndefined
has_hand_microphone_speaker: bool = PydanticUndefined
region_count: int = PydanticUndefined
supports_noaa: bool = PydanticUndefined
supports_gmrs: bool = PydanticUndefined
supports_vfo: bool = PydanticUndefined
supports_dmr: bool = PydanticUndefined
supports_software_power_control: bool = PydanticUndefined
channel_count: int = PydanticUndefined
frequency_range_count: int = PydanticUndefined
class BeaconSettingsArgs(typing.TypedDict):
1276class BeaconSettingsArgs(t.TypedDict, total=False):
1277    """A dictionary of the parameters that can be set in the beacon settings"""
1278    max_fwd_times: int
1279    time_to_live: int
1280    ptt_release_send_location: bool
1281    ptt_release_send_id_info: bool
1282    ptt_release_send_bss_user_id: bool
1283    should_share_location: bool
1284    send_pwr_voltage: bool
1285    packet_format: t.Literal["BSS", "APRS"]
1286    allow_position_check: bool
1287    aprs_ssid: int
1288    location_share_interval: int
1289    bss_user_id: int
1290    ptt_release_id_info: str
1291    beacon_message: str
1292    aprs_symbol: str
1293    aprs_callsign: str

A dictionary of the parameters that can be set in the beacon settings

max_fwd_times: int
time_to_live: int
ptt_release_send_location: bool
ptt_release_send_id_info: bool
ptt_release_send_bss_user_id: bool
should_share_location: bool
send_pwr_voltage: bool
packet_format: Literal['BSS', 'APRS']
allow_position_check: bool
aprs_ssid: int
location_share_interval: int
bss_user_id: int
ptt_release_id_info: str
beacon_message: str
aprs_symbol: str
aprs_callsign: str
class BeaconSettings(ImmutableBaseModel):
1296class BeaconSettings(ImmutableBaseModel):
1297    """A data object representing the beacon settings"""
1298    _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32)
1299    max_fwd_times: int
1300    time_to_live: int
1301    ptt_release_send_location: bool
1302    ptt_release_send_id_info: bool
1303    ptt_release_send_bss_user_id: bool
1304    should_share_location: bool
1305    send_pwr_voltage: bool
1306    packet_format: t.Literal["BSS", "APRS"]
1307    allow_position_check: bool
1308    aprs_ssid: int
1309    location_share_interval: int
1310    bss_user_id: int
1311    ptt_release_id_info: str
1312    beacon_message: str
1313    aprs_symbol: str
1314    aprs_callsign: str
1315
1316    @classmethod
1317    def from_protocol(cls, bs: p.BSSSettingsV2 | p.BSSSettings) -> BeaconSettings:
1318        """@private (Protocol helper)"""
1319        return BeaconSettings(
1320            max_fwd_times=bs.max_fwd_times,
1321            time_to_live=bs.time_to_live,
1322            ptt_release_send_location=bs.ptt_release_send_location,
1323            ptt_release_send_id_info=bs.ptt_release_send_id_info,
1324            ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id,
1325            should_share_location=bs.should_share_location,
1326            send_pwr_voltage=bs.send_pwr_voltage,
1327            packet_format=bs.packet_format.name,
1328            allow_position_check=bs.allow_position_check,
1329            aprs_ssid=bs.aprs_ssid,
1330            location_share_interval=bs.location_share_interval,
1331            bss_user_id=cls._bss_user_id_split.from_parts(
1332                bs.bss_user_id_upper, bs.bss_user_id_lower
1333            ),
1334            ptt_release_id_info=bs.ptt_release_id_info,
1335            beacon_message=bs.beacon_message,
1336            aprs_symbol=bs.aprs_symbol,
1337            aprs_callsign=bs.aprs_callsign
1338        )
1339
1340    def to_protocol(self) -> p.BSSSettingsV2:
1341        """@private (Protocol helper)"""
1342        return p.BSSSettingsV2(
1343            max_fwd_times=self.max_fwd_times,
1344            time_to_live=self.time_to_live,
1345            ptt_release_send_location=self.ptt_release_send_location,
1346            ptt_release_send_id_info=self.ptt_release_send_id_info,
1347            ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id,
1348            should_share_location=self.should_share_location,
1349            send_pwr_voltage=self.send_pwr_voltage,
1350            packet_format=p.PacketFormat[self.packet_format],
1351            allow_position_check=self.allow_position_check,
1352            aprs_ssid=self.aprs_ssid,
1353            location_share_interval=self.location_share_interval,
1354            bss_user_id_lower=self._bss_user_id_split.get_lower(
1355                self.bss_user_id
1356            ),
1357            ptt_release_id_info=self.ptt_release_id_info,
1358            beacon_message=self.beacon_message,
1359            aprs_symbol=self.aprs_symbol,
1360            aprs_callsign=self.aprs_callsign,
1361            bss_user_id_upper=self._bss_user_id_split.get_upper(
1362                self.bss_user_id
1363            ),
1364        )

A data object representing the beacon settings

max_fwd_times: int = PydanticUndefined
time_to_live: int = PydanticUndefined
ptt_release_send_location: bool = PydanticUndefined
ptt_release_send_id_info: bool = PydanticUndefined
ptt_release_send_bss_user_id: bool = PydanticUndefined
should_share_location: bool = PydanticUndefined
send_pwr_voltage: bool = PydanticUndefined
packet_format: Literal['BSS', 'APRS'] = PydanticUndefined
allow_position_check: bool = PydanticUndefined
aprs_ssid: int = PydanticUndefined
location_share_interval: int = PydanticUndefined
bss_user_id: int = PydanticUndefined
ptt_release_id_info: str = PydanticUndefined
beacon_message: str = PydanticUndefined
aprs_symbol: str = PydanticUndefined
aprs_callsign: str = PydanticUndefined
ChannelType = typing.Literal['OFF', 'A', 'B']
class Status(ImmutableBaseModel):
1370class Status(ImmutableBaseModel):
1371    """A data object representing the radio status"""
1372    _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4)
1373    is_power_on: bool
1374    is_in_tx: bool
1375    is_sq: bool
1376    is_in_rx: bool
1377    double_channel: ChannelType
1378    is_scan: bool
1379    is_radio: bool
1380    curr_ch_id: int
1381    is_gps_locked: bool
1382    is_hfp_connected: bool
1383    is_aoc_connected: bool
1384    rssi: float
1385    curr_region: int
1386
1387    @classmethod
1388    def from_protocol(cls, s: p.Status | p.StatusExt) -> Status:
1389        """@private (Protocol helper)"""
1390        if not isinstance(s, p.StatusExt):
1391            raise ValueError(
1392                "Radio replied with old Status message version. Upgrade your firmware!"
1393            )
1394
1395        return Status(
1396            is_power_on=s.is_power_on,
1397            is_in_tx=s.is_in_tx,
1398            is_sq=s.is_sq,
1399            is_in_rx=s.is_in_rx,
1400            double_channel=s.double_channel.name,
1401            is_scan=s.is_scan,
1402            is_radio=s.is_radio,
1403            curr_ch_id=cls._channel_split.from_parts(
1404                s.curr_channel_id_upper, s.curr_ch_id_lower
1405            ),
1406            is_gps_locked=s.is_gps_locked,
1407            is_hfp_connected=s.is_hfp_connected,
1408            is_aoc_connected=s.is_aoc_connected,
1409            rssi=s.rssi,
1410            curr_region=s.curr_region
1411        )
1412
1413    def to_protocol(self) -> p.StatusExt:
1414        """@private (Protocol helper)"""
1415        return p.StatusExt(
1416            is_power_on=self.is_power_on,
1417            is_in_tx=self.is_in_tx,
1418            is_sq=self.is_sq,
1419            is_in_rx=self.is_in_rx,
1420            double_channel=p.ChannelType[self.double_channel],
1421            is_scan=self.is_scan,
1422            is_radio=self.is_radio,
1423            curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id),
1424            is_gps_locked=self.is_gps_locked,
1425            is_hfp_connected=self.is_hfp_connected,
1426            is_aoc_connected=self.is_aoc_connected,
1427            rssi=self.rssi,
1428            curr_region=self.curr_region,
1429            curr_channel_id_upper=self._channel_split.get_upper(
1430                self.curr_ch_id)
1431        )

A data object representing the radio status

is_power_on: bool = PydanticUndefined
is_in_tx: bool = PydanticUndefined
is_sq: bool = PydanticUndefined
is_in_rx: bool = PydanticUndefined
double_channel: Literal['OFF', 'A', 'B'] = PydanticUndefined
is_scan: bool = PydanticUndefined
is_radio: bool = PydanticUndefined
curr_ch_id: int = PydanticUndefined
is_gps_locked: bool = PydanticUndefined
is_hfp_connected: bool = PydanticUndefined
is_aoc_connected: bool = PydanticUndefined
rssi: float = PydanticUndefined
curr_region: int = PydanticUndefined
class Position(ImmutableBaseModel):
1434class Position(ImmutableBaseModel):
1435    """A data object for representing GPS positions"""
1436    latitude: float
1437    longitude: float
1438    altitude: int | None
1439    speed: int | None
1440    heading: int | None
1441    time: datetime
1442    accuracy: int
1443
1444    @classmethod
1445    def from_protocol(cls, x: p.Position) -> Position:
1446        """@private (Protocol helper)"""
1447        return Position(
1448            latitude=x.latitude,
1449            longitude=x.longitude,
1450            altitude=x.altitude,
1451            speed=x.speed,
1452            heading=x.heading,
1453            time=x.time,
1454            accuracy=x.accuracy,
1455        )
1456
1457    def to_protocol(self) -> p.Position:
1458        """@private (Protocol helper)"""
1459        return p.Position(
1460            latitude=self.latitude,
1461            longitude=self.longitude,
1462            altitude=self.altitude,
1463            speed=self.speed,
1464            heading=self.heading,
1465            time=self.time,
1466            accuracy=self.accuracy,
1467        )

A data object for representing GPS positions

latitude: float = PydanticUndefined
longitude: float = PydanticUndefined
altitude: int | None = PydanticUndefined
speed: int | None = PydanticUndefined
heading: int | None = PydanticUndefined
time: datetime.datetime = PydanticUndefined
accuracy: int = PydanticUndefined