benlink.command
Overview
This module defines a low-level interface for communicating with a radio over BLE or Rfcomm.
Unless you know what you are doing, you probably want to use the
benlink.controller module instead.
Messages
The RadioMessage type is a union of all the possible messages that can
sent or received from the radio.
It RadioMessages are divided into three categories:
CommandMessage: Messages that are sent to the radio to request information or to change settings.ReplyMessage: Messages that are received in response to aCommandMessage.EventMessage: Messages that are received from the radio to indicate that an event has occurred. (e.g. a channel has changed, a packet has been received)
Data
The data objects (e.g. DeviceInfo, Settings) are used to represent
the data that is sent or received in the messages. Some of these data
objects have accompanying Args types that are used in the API to allow
for functions that take keyword arguments to set these parameters.
1""" 2# Overview 3 4This module defines a low-level interface for communicating 5with a radio over BLE or Rfcomm. 6 7Unless you know what you are doing, you probably want to use the 8`benlink.controller` module instead. 9 10# Messages 11 12The `RadioMessage` type is a union of all the possible messages that can 13sent or received from the radio. 14 15It `RadioMessage`s are divided into three categories: 16 171. `CommandMessage`: Messages that are sent to the radio to request 18 information or to change settings. 19 202. `ReplyMessage`: Messages that are received in response to a 21 `CommandMessage`. 22 233. `EventMessage`: Messages that are received from the radio to indicate 24 that an event has occurred. (e.g. a channel has changed, a packet has 25 been received) 26 27# Data 28 29The data objects (e.g. `DeviceInfo`, `Settings`) are used to represent 30the data that is sent or received in the messages. Some of these data 31objects have accompanying `Args` types that are used in the API to allow 32for functions that take keyword arguments to set these parameters. 33""" 34 35from __future__ import annotations 36import typing as t 37import asyncio 38from pydantic import BaseModel, ConfigDict 39from . import protocol as p 40from .link import CommandLink, BleCommandLink, RfcommCommandLink 41from datetime import datetime 42 43RADIO_SERVICE_UUID = "00001100-d102-11e1-9b23-00025b00a5a5" 44"""@private""" 45 46RADIO_WRITE_UUID = "00001101-d102-11e1-9b23-00025b00a5a5" 47"""@private""" 48 49RADIO_INDICATE_UUID = "00001102-d102-11e1-9b23-00025b00a5a5" 50"""@private""" 51 52 53class CommandConnection: 54 _link: CommandLink 55 _handlers: t.List[RadioMessageHandler] = [] 56 57 def __init__(self, link: CommandLink): 58 self._link = link 59 self._handlers = [] 60 61 @classmethod 62 def new_ble(cls, device_uuid: str) -> CommandConnection: 63 return cls(BleCommandLink(device_uuid)) 64 65 @classmethod 66 def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection: 67 return cls(RfcommCommandLink(device_uuid, channel)) 68 69 def is_connected(self) -> bool: 70 return self._link.is_connected() 71 72 async def connect(self) -> None: 73 await self._link.connect(self._on_recv) 74 75 async def disconnect(self) -> None: 76 self._handlers.clear() 77 await self._link.disconnect() 78 79 async def send_bytes(self, data: bytes) -> None: 80 await self._link.send_bytes(data) 81 82 async def send_message(self, command: CommandMessage) -> None: 83 await self._link.send(command_message_to_protocol(command)) 84 85 async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError: 86 queue: asyncio.Queue[RadioMessageT | 87 MessageReplyError] = asyncio.Queue() 88 89 def reply_handler(reply: RadioMessage): 90 if ( 91 isinstance(reply, expect) or 92 ( 93 isinstance(reply, MessageReplyError) and 94 reply.message_type is expect 95 ) 96 ): 97 queue.put_nowait(reply) 98 99 remove_handler = self._add_message_handler(reply_handler) 100 101 await self.send_message(command) 102 103 out = await queue.get() 104 105 remove_handler() 106 107 return out 108 109 def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]: 110 def event_handler(msg: RadioMessage): 111 if isinstance(msg, EventMessage): 112 handler(msg) 113 return self._add_message_handler(event_handler) 114 115 def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]: 116 self._handlers.append(handler) 117 118 def remove_handler(): 119 self._handlers.remove(handler) 120 121 return remove_handler 122 123 def _on_recv(self, msg: p.Message) -> None: 124 radio_message = radio_message_from_protocol(msg) 125 for handler in self._handlers: 126 handler(radio_message) 127 128 # Command API 129 130 async def enable_event(self, event_type: EventType) -> None: 131 """Enable an event""" 132 # This event doesn't get a reply version of EnableEvent. It does, however 133 # does trigger a StatusChangedEvent... but we don't wait for it here. 134 # Instead, we just fire and forget 135 await self.send_message(EnableEvent(event_type)) 136 137 async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None: 138 """Send Tnc data""" 139 reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply) 140 if isinstance(reply, MessageReplyError): 141 raise reply.as_exception() 142 143 async def get_beacon_settings(self) -> BeaconSettings: 144 """Get the current packet settings""" 145 reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply) 146 if isinstance(reply, MessageReplyError): 147 raise reply.as_exception() 148 return reply.tnc_settings 149 150 async def set_beacon_settings(self, packet_settings: BeaconSettings): 151 """Set the packet settings""" 152 reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply) 153 if isinstance(reply, MessageReplyError): 154 raise reply.as_exception() 155 156 async def get_battery_level(self) -> int: 157 """Get the battery level""" 158 reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply) 159 if isinstance(reply, MessageReplyError): 160 raise reply.as_exception() 161 return reply.battery_level 162 163 async def get_battery_level_as_percentage(self) -> int: 164 """Get the battery level as a percentage""" 165 reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply) 166 if isinstance(reply, MessageReplyError): 167 raise reply.as_exception() 168 return reply.battery_level_as_percentage 169 170 async def get_rc_battery_level(self) -> int: 171 """Get the RC battery level""" 172 reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply) 173 if isinstance(reply, MessageReplyError): 174 raise reply.as_exception() 175 return reply.rc_battery_level 176 177 async def get_battery_voltage(self) -> float: 178 """Get the battery voltage""" 179 reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply) 180 if isinstance(reply, MessageReplyError): 181 raise reply.as_exception() 182 return reply.battery_voltage 183 184 async def get_device_info(self) -> DeviceInfo: 185 """Get the device info""" 186 reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply) 187 if isinstance(reply, MessageReplyError): 188 raise reply.as_exception() 189 return reply.device_info 190 191 async def get_settings(self) -> Settings: 192 """Get the settings""" 193 reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply) 194 if isinstance(reply, MessageReplyError): 195 raise reply.as_exception() 196 return reply.settings 197 198 async def set_settings(self, settings: Settings) -> None: 199 """Set the settings""" 200 reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply) 201 if isinstance(reply, MessageReplyError): 202 raise reply.as_exception() 203 204 async def get_channel(self, channel_id: int) -> Channel: 205 """Get a channel""" 206 reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply) 207 if isinstance(reply, MessageReplyError): 208 raise reply.as_exception() 209 return reply.channel 210 211 async def set_channel(self, channel: Channel): 212 """Set a channel""" 213 reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply) 214 if isinstance(reply, MessageReplyError): 215 raise reply.as_exception() 216 217 async def get_status(self) -> Status: 218 """Get the radio status""" 219 reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply) 220 if isinstance(reply, MessageReplyError): 221 raise reply.as_exception() 222 return reply.status 223 224 async def get_position(self) -> Position: 225 """Get the radio position""" 226 reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply) 227 if isinstance(reply, MessageReplyError): 228 raise reply.as_exception() 229 return reply.position 230 231 async def __aenter__(self): 232 await self.connect() 233 return self 234 235 async def __aexit__( 236 self, 237 exc_type: t.Any, 238 exc_value: t.Any, 239 traceback: t.Any, 240 ) -> None: 241 await self.disconnect() 242 243 244class ImmutableBaseModel(BaseModel): 245 """@private (A base class for immutable data objects)""" 246 247 model_config = ConfigDict(frozen=True) 248 """@private""" 249 250 251def command_message_to_protocol(m: CommandMessage) -> p.Message: 252 """@private (Protocol helper)""" 253 match m: 254 case EnableEvent(event_type): 255 return p.Message( 256 command_group=p.CommandGroup.BASIC, 257 is_reply=False, 258 command=p.BasicCommand.REGISTER_NOTIFICATION, 259 body=p.RegisterNotificationBody( 260 event_type=p.EventType[event_type], 261 ) 262 ) 263 case SendTncDataFragment(tnc_data_fragment): 264 return p.Message( 265 command_group=p.CommandGroup.BASIC, 266 is_reply=False, 267 command=p.BasicCommand.HT_SEND_DATA, 268 body=p.HTSendDataBody( 269 tnc_data_fragment=tnc_data_fragment.to_protocol() 270 ) 271 ) 272 case GetBeaconSettings(): 273 return p.Message( 274 command_group=p.CommandGroup.BASIC, 275 is_reply=False, 276 command=p.BasicCommand.READ_BSS_SETTINGS, 277 body=p.ReadBSSSettingsBody() 278 ) 279 case SetBeaconSettings(packet_settings): 280 return p.Message( 281 command_group=p.CommandGroup.BASIC, 282 is_reply=False, 283 command=p.BasicCommand.WRITE_BSS_SETTINGS, 284 body=p.WriteBSSSettingsBody( 285 bss_settings=packet_settings.to_protocol() 286 ) 287 ) 288 case GetSettings(): 289 return p.Message( 290 command_group=p.CommandGroup.BASIC, 291 is_reply=False, 292 command=p.BasicCommand.READ_SETTINGS, 293 body=p.ReadSettingsBody() 294 ) 295 case SetSettings(settings): 296 return p.Message( 297 command_group=p.CommandGroup.BASIC, 298 is_reply=False, 299 command=p.BasicCommand.WRITE_SETTINGS, 300 body=p.WriteSettingsBody( 301 settings=settings.to_protocol() 302 ) 303 ) 304 case GetDeviceInfo(): 305 return p.Message( 306 command_group=p.CommandGroup.BASIC, 307 is_reply=False, 308 command=p.BasicCommand.GET_DEV_INFO, 309 body=p.GetDevInfoBody() 310 ) 311 case GetChannel(channel_id): 312 return p.Message( 313 command_group=p.CommandGroup.BASIC, 314 is_reply=False, 315 command=p.BasicCommand.READ_RF_CH, 316 body=p.ReadRFChBody(channel_id=channel_id) 317 ) 318 case SetChannel(channel): 319 return p.Message( 320 command_group=p.CommandGroup.BASIC, 321 is_reply=False, 322 command=p.BasicCommand.WRITE_RF_CH, 323 body=p.WriteRFChBody( 324 rf_ch=channel.to_protocol() 325 ) 326 ) 327 case GetBatteryVoltage(): 328 return p.Message( 329 command_group=p.CommandGroup.BASIC, 330 is_reply=False, 331 command=p.BasicCommand.READ_STATUS, 332 body=p.ReadPowerStatusBody( 333 status_type=p.PowerStatusType.BATTERY_VOLTAGE 334 ) 335 ) 336 case GetBatteryLevel(): 337 return p.Message( 338 command_group=p.CommandGroup.BASIC, 339 is_reply=False, 340 command=p.BasicCommand.READ_STATUS, 341 body=p.ReadPowerStatusBody( 342 status_type=p.PowerStatusType.BATTERY_LEVEL 343 ) 344 ) 345 case GetBatteryLevelAsPercentage(): 346 return p.Message( 347 command_group=p.CommandGroup.BASIC, 348 is_reply=False, 349 command=p.BasicCommand.READ_STATUS, 350 body=p.ReadPowerStatusBody( 351 status_type=p.PowerStatusType.BATTERY_LEVEL_AS_PERCENTAGE 352 ) 353 ) 354 case GetRCBatteryLevel(): 355 return p.Message( 356 command_group=p.CommandGroup.BASIC, 357 is_reply=False, 358 command=p.BasicCommand.READ_STATUS, 359 body=p.ReadPowerStatusBody( 360 status_type=p.PowerStatusType.RC_BATTERY_LEVEL 361 ) 362 ) 363 case GetStatus(): 364 return p.Message( 365 command_group=p.CommandGroup.BASIC, 366 is_reply=False, 367 command=p.BasicCommand.GET_HT_STATUS, 368 body=p.GetHtStatusBody() 369 ) 370 case GetPosition(): 371 return p.Message( 372 command_group=p.CommandGroup.BASIC, 373 is_reply=False, 374 command=p.BasicCommand.GET_POSITION, 375 body=p.GetPositionBody() 376 ) 377 378 379def radio_message_from_protocol(mf: p.Message) -> RadioMessage: 380 """@private (Protocol helper)""" 381 match mf.body: 382 case p.GetPositionReplyBody(reply_status=reply_status, position=position): 383 if position is None: 384 return MessageReplyError( 385 message_type=GetPositionReply, 386 reason=reply_status.name 387 ) 388 return GetPositionReply(Position.from_protocol(position)) 389 case p.GetHtStatusReplyBody(reply_status=reply_status, status=status): 390 if status is None: 391 return MessageReplyError( 392 message_type=GetStatusReply, 393 reason=reply_status.name, 394 ) 395 return GetStatusReply(Status.from_protocol(status)) 396 case p.HTSendDataReplyBody( 397 reply_status=reply_status 398 ): 399 if reply_status != p.ReplyStatus.SUCCESS: 400 return MessageReplyError( 401 message_type=SendTncDataFragmentReply, 402 reason=reply_status.name, 403 ) 404 return SendTncDataFragmentReply() 405 case p.ReadBSSSettingsReplyBody( 406 reply_status=reply_status, 407 bss_settings=bss_settings, 408 ): 409 if bss_settings is None: 410 return MessageReplyError( 411 message_type=GetBeaconSettingsReply, 412 reason=reply_status.name, 413 ) 414 415 return GetBeaconSettingsReply(BeaconSettings.from_protocol(bss_settings)) 416 case p.WriteBSSSettingsReplyBody( 417 reply_status=reply_status, 418 ): 419 if reply_status != p.ReplyStatus.SUCCESS: 420 return MessageReplyError( 421 message_type=SetBeaconSettingsReply, 422 reason=reply_status.name, 423 ) 424 return SetBeaconSettingsReply() 425 case p.ReadPowerStatusReplyBody( 426 reply_status=reply_status, 427 status=power_status 428 ): 429 if power_status is None: 430 return MessageReplyError( 431 message_type=GetBatteryVoltageReply, 432 reason=reply_status.name, 433 ) 434 435 match power_status.value: 436 case p.BatteryVoltageStatus( 437 battery_voltage=battery_voltage 438 ): 439 return GetBatteryVoltageReply( 440 battery_voltage=battery_voltage 441 ) 442 case p.BatteryLevelPercentageStatus( 443 battery_level_as_percentage=battery_level_as_percentage 444 ): 445 return GetBatteryLevelAsPercentageReply( 446 battery_level_as_percentage=battery_level_as_percentage 447 ) 448 case p.BatteryLevelStatus( 449 battery_level=battery_level 450 ): 451 return GetBatteryLevelReply( 452 battery_level=battery_level 453 ) 454 case p.RCBatteryLevelStatus( 455 rc_battery_level=rc_battery_level 456 ): 457 return GetRCBatteryLevelReply( 458 rc_battery_level=rc_battery_level 459 ) 460 case p.EventNotificationBody( 461 event=event 462 ): 463 match event: 464 case p.HTSettingsChangedEvent( 465 settings=settings 466 ): 467 return SettingsChangedEvent(Settings.from_protocol(settings)) 468 case p.DataRxdEvent( 469 tnc_data_fragment=tnc_data_fragment 470 ): 471 return TncDataFragmentReceivedEvent( 472 tnc_data_fragment=TncDataFragment.from_protocol( 473 tnc_data_fragment 474 ) 475 ) 476 case p.HTChChangedEvent( 477 rf_ch=rf_ch 478 ): 479 return ChannelChangedEvent(Channel.from_protocol(rf_ch)) 480 case p.HTStatusChangedEvent( 481 status=status 482 ): 483 return StatusChangedEvent(Status.from_protocol(status)) 484 case _: 485 return UnknownProtocolMessage(mf) 486 case p.ReadSettingsReplyBody( 487 reply_status=reply_status, 488 settings=settings 489 ): 490 if settings is None: 491 return MessageReplyError( 492 message_type=GetSettingsReply, 493 reason=reply_status.name, 494 ) 495 return GetSettingsReply(Settings.from_protocol(settings)) 496 case p.WriteSettingsReplyBody( 497 reply_status=reply_status, 498 ): 499 if reply_status != p.ReplyStatus.SUCCESS: 500 return MessageReplyError( 501 message_type=SetSettingsReply, 502 reason=reply_status.name, 503 ) 504 return SetSettingsReply() 505 case p.GetDevInfoReplyBody( 506 reply_status=reply_status, 507 dev_info=dev_info 508 ): 509 if dev_info is None: 510 return MessageReplyError( 511 message_type=GetDeviceInfoReply, 512 reason=reply_status.name, 513 ) 514 return GetDeviceInfoReply(DeviceInfo.from_protocol(dev_info)) 515 case p.ReadRFChReplyBody( 516 reply_status=reply_status, 517 rf_ch=rf_ch 518 ): 519 if rf_ch is None: 520 return MessageReplyError( 521 message_type=GetChannelReply, 522 reason=reply_status.name, 523 ) 524 return GetChannelReply(Channel.from_protocol(rf_ch)) 525 case p.WriteRFChReplyBody( 526 reply_status=reply_status, 527 ): 528 if reply_status != p.ReplyStatus.SUCCESS: 529 return MessageReplyError( 530 message_type=SetChannelReply, 531 reason=reply_status.name, 532 ) 533 return SetChannelReply() 534 535 case _: 536 return UnknownProtocolMessage(mf) 537 538 539##################### 540# CommandMessage 541 542class EnableEvent(t.NamedTuple): 543 event_type: EventType 544 545 546class GetBeaconSettings(t.NamedTuple): 547 pass 548 549 550class SetBeaconSettings(t.NamedTuple): 551 tnc_settings: BeaconSettings 552 553 554class SetSettings(t.NamedTuple): 555 settings: Settings 556 557 558class GetBatteryLevelAsPercentage(t.NamedTuple): 559 pass 560 561 562class GetRCBatteryLevel(t.NamedTuple): 563 pass 564 565 566class GetBatteryLevel(t.NamedTuple): 567 pass 568 569 570class GetBatteryVoltage(t.NamedTuple): 571 pass 572 573 574class GetDeviceInfo(t.NamedTuple): 575 pass 576 577 578class GetChannel(t.NamedTuple): 579 channel_id: int 580 581 582class SetChannel(t.NamedTuple): 583 channel: Channel 584 585 586class GetSettings(t.NamedTuple): 587 pass 588 589 590class GetStatus(t.NamedTuple): 591 pass 592 593 594class GetPosition(t.NamedTuple): 595 pass 596 597 598class SendTncDataFragment(t.NamedTuple): 599 tnc_data_fragment: TncDataFragment 600 601 602CommandMessage = t.Union[ 603 GetBeaconSettings, 604 SetBeaconSettings, 605 GetRCBatteryLevel, 606 GetBatteryLevelAsPercentage, 607 GetBatteryLevel, 608 GetBatteryVoltage, 609 GetDeviceInfo, 610 GetChannel, 611 SetChannel, 612 GetSettings, 613 SetSettings, 614 SendTncDataFragment, 615 EnableEvent, 616 GetStatus, 617 GetPosition, 618] 619 620##################### 621# ReplyMessage 622 623 624class SendTncDataFragmentReply(t.NamedTuple): 625 pass 626 627 628class GetBeaconSettingsReply(t.NamedTuple): 629 tnc_settings: BeaconSettings 630 631 632class SetBeaconSettingsReply(t.NamedTuple): 633 pass 634 635 636class SetSettingsReply(t.NamedTuple): 637 pass 638 639 640class GetBatteryLevelAsPercentageReply(t.NamedTuple): 641 battery_level_as_percentage: int 642 643 644class GetRCBatteryLevelReply(t.NamedTuple): 645 rc_battery_level: int 646 647 648class GetBatteryLevelReply(t.NamedTuple): 649 battery_level: int 650 651 652class GetBatteryVoltageReply(t.NamedTuple): 653 battery_voltage: float 654 655 656class GetDeviceInfoReply(t.NamedTuple): 657 device_info: DeviceInfo 658 659 660class GetChannelReply(t.NamedTuple): 661 channel: Channel 662 663 664class SetChannelReply(t.NamedTuple): 665 pass 666 667 668class GetStatusReply(t.NamedTuple): 669 status: Status 670 671 672class GetSettingsReply(t.NamedTuple): 673 settings: Settings 674 675 676class GetPositionReply(t.NamedTuple): 677 position: Position 678 679 680ReplyStatus = t.Literal[ 681 "SUCCESS", 682 "NOT_SUPPORTED", 683 "NOT_AUTHENTICATED", 684 "INSUFFICIENT_RESOURCES", 685 "AUTHENTICATING", 686 "INVALID_PARAMETER", 687 "INCORRECT_STATE", 688 "IN_PROGRESS", 689] 690 691 692class MessageReplyError(t.NamedTuple): 693 message_type: t.Type[t.Any] 694 reason: ReplyStatus 695 696 def as_exception(self): 697 return ValueError(f"{self.message_type.__name__} failed: {self.reason}") 698 699 700ReplyMessage = t.Union[ 701 GetBeaconSettingsReply, 702 SetBeaconSettingsReply, 703 GetBatteryLevelAsPercentageReply, 704 GetRCBatteryLevelReply, 705 GetBatteryLevelReply, 706 GetBatteryVoltageReply, 707 GetDeviceInfoReply, 708 GetChannelReply, 709 SetChannelReply, 710 GetSettingsReply, 711 SetSettingsReply, 712 SendTncDataFragmentReply, 713 GetStatusReply, 714 GetPositionReply, 715 MessageReplyError, 716] 717 718##################### 719# EventMessage 720 721 722class StatusChangedEvent(t.NamedTuple): 723 status: Status 724 725 726class ChannelChangedEvent(t.NamedTuple): 727 channel: Channel 728 729 730class TncDataFragmentReceivedEvent(t.NamedTuple): 731 tnc_data_fragment: TncDataFragment 732 733 734class SettingsChangedEvent(t.NamedTuple): 735 settings: Settings 736 737 738class UnknownProtocolMessage(t.NamedTuple): 739 message: p.Message 740 741 742EventMessage = t.Union[ 743 TncDataFragmentReceivedEvent, 744 SettingsChangedEvent, 745 ChannelChangedEvent, 746 StatusChangedEvent, 747 UnknownProtocolMessage, 748] 749 750EventType = t.Literal[ 751 "HT_STATUS_CHANGED", 752 "DATA_RXD", 753 "NEW_INQUIRY_DATA", 754 "RESTORE_FACTORY_SETTINGS", 755 "HT_CH_CHANGED", 756 "HT_SETTINGS_CHANGED", 757 "RINGING_STOPPED" 758 "RADIO_STATUS_CHANGED", 759 "USER_ACTION", 760 "SYSTEM_EVENT", 761 "BSS_SETTINGS_CHANGED", 762 "DATA_TXD", 763 "POSITION_CHANGED" 764] 765 766RadioMessage = ReplyMessage | EventMessage 767 768RadioMessageT = t.TypeVar("RadioMessageT", bound=RadioMessage) 769 770##################### 771# Handlers 772 773RadioMessageHandler = t.Callable[[RadioMessage], None] 774"""@private""" 775 776EventHandler = t.Callable[[EventMessage], None] 777"""@private""" 778 779##################### 780# Protocol to data object conversions 781 782 783class IntSplit(t.NamedTuple): 784 """@private (A helper for working with integers split into upper and lower parts)""" 785 786 n_upper: int 787 n_lower: int 788 789 def from_parts(self, upper: int, lower: int) -> int: 790 if upper >= 1 << self.n_upper: 791 raise ValueError( 792 f"Upper part {upper} is too large for {self.n_upper} bits") 793 if lower >= 1 << self.n_lower: 794 raise ValueError( 795 f"Lower part {lower} is too large for {self.n_lower} bits") 796 797 return (upper << self.n_lower) | lower 798 799 def get_upper(self, n: int) -> int: 800 if n >= 1 << (self.n_upper + self.n_lower): 801 raise ValueError( 802 f"Value {n} is too large for {self.n_upper + self.n_lower} bits" 803 ) 804 return n >> self.n_lower 805 806 def get_lower(self, n: int) -> int: 807 if n >= 1 << (self.n_upper + self.n_lower): 808 raise ValueError( 809 f"Value {n} is too large for {self.n_upper + self.n_lower} bits" 810 ) 811 return n & ((1 << self.n_lower) - 1) 812 813 814class TncDataFragment(ImmutableBaseModel): 815 """A data object representing a message packet""" 816 is_final_fragment: bool 817 fragment_id: int 818 data: bytes 819 channel_id: int | None = None 820 821 @classmethod 822 def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment: 823 """@private (Protocol helper)""" 824 return TncDataFragment( 825 is_final_fragment=mp.is_final_fragment, 826 fragment_id=mp.fragment_id, 827 data=mp.data, 828 channel_id=mp.channel_id 829 ) 830 831 def to_protocol(self) -> p.TncDataFragment: 832 """@private (Protocol helper)""" 833 return p.TncDataFragment( 834 is_final_fragment=self.is_final_fragment, 835 with_channel_id=self.channel_id is not None, 836 fragment_id=self.fragment_id, 837 data=self.data, 838 channel_id=self.channel_id 839 ) 840 841 842ModulationType = t.Literal["AM", "FM", "DMR"] 843 844BandwidthType = t.Literal["NARROW", "WIDE"] 845 846 847class DCS(t.NamedTuple): 848 """A type for setting Digital Coded Squelch (DCS) on channels""" 849 850 n: int 851 """The DCS Normal (N) code""" 852 853 854def sub_audio_from_protocol(x: float | p.DCS | None) -> float | DCS | None: 855 """@private (Protocol helper)""" 856 match x: 857 case p.DCS(n): 858 return DCS(n=n) 859 case _: 860 return x 861 862 863def sub_audio_to_protocol(x: float | DCS | None) -> float | p.DCS | None: 864 """@private (Protocol helper)""" 865 match x: 866 case DCS(n): 867 return p.DCS(n=n) 868 case _: 869 return x 870 871 872class ChannelArgs(t.TypedDict, total=False): 873 """A dictionary of the parameters that can be set on a channel""" 874 tx_mod: ModulationType 875 tx_freq: float 876 rx_mod: ModulationType 877 rx_freq: float 878 tx_sub_audio: float | DCS | None 879 rx_sub_audio: float | DCS | None 880 scan: bool 881 tx_at_max_power: bool 882 talk_around: bool 883 bandwidth: BandwidthType 884 pre_de_emph_bypass: bool 885 sign: bool 886 tx_at_med_power: bool 887 tx_disable: bool 888 fixed_freq: bool 889 fixed_bandwidth: bool 890 fixed_tx_power: bool 891 mute: bool 892 name: str 893 894 895class Channel(ImmutableBaseModel): 896 """A data object representing a radio channel""" 897 channel_id: int 898 tx_mod: ModulationType 899 tx_freq: float 900 rx_mod: ModulationType 901 rx_freq: float 902 tx_sub_audio: float | DCS | None 903 rx_sub_audio: float | DCS | None 904 scan: bool 905 tx_at_max_power: bool 906 talk_around: bool 907 bandwidth: BandwidthType 908 pre_de_emph_bypass: bool 909 sign: bool 910 tx_at_med_power: bool 911 tx_disable: bool 912 fixed_freq: bool 913 fixed_bandwidth: bool 914 fixed_tx_power: bool 915 mute: bool 916 name: str 917 918 @classmethod 919 def from_protocol(cls, cs: p.RfCh) -> Channel: 920 """@private (Protocol helper)""" 921 return Channel( 922 channel_id=cs.channel_id, 923 tx_mod=cs.tx_mod.name, 924 tx_freq=cs.tx_freq, 925 rx_mod=cs.rx_mod.name, 926 rx_freq=cs.rx_freq, 927 tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio), 928 rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio), 929 scan=cs.scan, 930 tx_at_max_power=cs.tx_at_max_power, 931 talk_around=cs.talk_around, 932 bandwidth=cs.bandwidth.name, 933 pre_de_emph_bypass=cs.pre_de_emph_bypass, 934 sign=cs.sign, 935 tx_at_med_power=cs.tx_at_med_power, 936 tx_disable=cs.tx_disable, 937 fixed_freq=cs.fixed_freq, 938 fixed_bandwidth=cs.fixed_bandwidth, 939 fixed_tx_power=cs.fixed_tx_power, 940 mute=cs.mute, 941 name=cs.name_str 942 ) 943 944 def to_protocol(self) -> p.RfCh: 945 """@private (Protocol helper)""" 946 return p.RfCh( 947 channel_id=self.channel_id, 948 tx_mod=p.ModulationType[self.tx_mod], 949 tx_freq=self.tx_freq, 950 rx_mod=p.ModulationType[self.rx_mod], 951 rx_freq=self.rx_freq, 952 tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio), 953 rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio), 954 scan=self.scan, 955 tx_at_max_power=self.tx_at_max_power, 956 talk_around=self.talk_around, 957 bandwidth=p.BandwidthType[self.bandwidth], 958 pre_de_emph_bypass=self.pre_de_emph_bypass, 959 sign=self.sign, 960 tx_at_med_power=self.tx_at_med_power, 961 tx_disable=self.tx_disable, 962 fixed_freq=self.fixed_freq, 963 fixed_bandwidth=self.fixed_bandwidth, 964 fixed_tx_power=self.fixed_tx_power, 965 mute=self.mute, 966 name_str=self.name 967 ) 968 969 970class SettingsArgs(t.TypedDict, total=False): 971 """A dictionary of the parameters that can be set in the radio settings""" 972 channel_a: int 973 channel_b: int 974 scan: bool 975 aghfp_call_mode: int 976 double_channel: int 977 squelch_level: int 978 tail_elim: bool 979 auto_relay_en: bool 980 auto_power_on: bool 981 keep_aghfp_link: bool 982 mic_gain: int 983 tx_hold_time: int 984 tx_time_limit: int 985 local_speaker: int 986 bt_mic_gain: int 987 adaptive_response: bool 988 dis_tone: bool 989 power_saving_mode: bool 990 auto_power_off: int 991 auto_share_loc_ch: int | t.Literal["current"] 992 hm_speaker: int 993 positioning_system: int 994 time_offset: int 995 use_freq_range_2: bool 996 ptt_lock: bool 997 leading_sync_bit_en: bool 998 pairing_at_power_on: bool 999 screen_timeout: int 1000 kiss_upload_tx_msg: bool 1001 kiss_en: bool 1002 imperial_unit: bool 1003 wx_mode: int 1004 noaa_ch: int 1005 vfol_tx_power_x: int 1006 vfo2_tx_power_x: int 1007 dis_digital_mute: bool 1008 signaling_ecc_en: bool 1009 ch_data_lock: bool 1010 kiss_tx_delay: int 1011 kiss_tx_tail: int 1012 vox_en: bool 1013 vox_level: int 1014 dis_bt_mic: bool 1015 vox_delay: int 1016 ns_en: bool 1017 alarm_volume: int 1018 use_custom_location: bool 1019 gpwpl_upload_en: bool 1020 vfo1_mod_freq_x: int 1021 custom_location_lat: int 1022 custom_location_lon: int 1023 1024 1025class Settings(ImmutableBaseModel): 1026 """A data object representing the radio settings""" 1027 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1028 _auto_share_loc_ch_split: t.ClassVar[IntSplit] = IntSplit(3, 5) 1029 channel_a: int 1030 channel_b: int 1031 scan: bool 1032 aghfp_call_mode: int 1033 double_channel: int 1034 squelch_level: int 1035 tail_elim: bool 1036 auto_relay_en: bool 1037 auto_power_on: bool 1038 keep_aghfp_link: bool 1039 mic_gain: int 1040 tx_hold_time: int 1041 tx_time_limit: int 1042 local_speaker: int 1043 bt_mic_gain: int 1044 adaptive_response: bool 1045 dis_tone: bool 1046 power_saving_mode: bool 1047 auto_power_off: int 1048 auto_share_loc_ch: int | t.Literal["current"] 1049 hm_speaker: int 1050 positioning_system: int 1051 time_offset: int 1052 use_freq_range_2: bool 1053 ptt_lock: bool 1054 leading_sync_bit_en: bool 1055 pairing_at_power_on: bool 1056 screen_timeout: int 1057 kiss_upload_tx_msg: bool 1058 kiss_en: bool 1059 imperial_unit: bool 1060 wx_mode: int 1061 noaa_ch: int 1062 vfol_tx_power_x: int 1063 vfo2_tx_power_x: int 1064 dis_digital_mute: bool 1065 signaling_ecc_en: bool 1066 ch_data_lock: bool 1067 kiss_tx_delay: int 1068 kiss_tx_tail: int 1069 vox_en: bool 1070 vox_level: int 1071 dis_bt_mic: bool 1072 vox_delay: int 1073 ns_en: bool 1074 alarm_volume: int 1075 use_custom_location: bool 1076 gpwpl_upload_en: bool 1077 vfo1_mod_freq_x: int 1078 custom_location_lat: int 1079 custom_location_lon: int 1080 1081 @classmethod 1082 def from_protocol(cls, rs: p.Settings) -> Settings: 1083 """@private (Protocol helper)""" 1084 _raw_auto_share_loc_ch = cls._auto_share_loc_ch_split.from_parts( 1085 rs.auto_share_loc_ch_upper, rs.auto_share_loc_ch 1086 ) 1087 return Settings( 1088 channel_a=cls._channel_split.from_parts( 1089 rs.channel_a_upper, rs.channel_a_lower 1090 ), 1091 channel_b=cls._channel_split.from_parts( 1092 rs.channel_b_upper, rs.channel_b_lower 1093 ), 1094 scan=rs.scan, 1095 aghfp_call_mode=rs.aghfp_call_mode, 1096 double_channel=rs.double_channel, 1097 squelch_level=rs.squelch_level, 1098 tail_elim=rs.tail_elim, 1099 auto_relay_en=rs.auto_relay_en, 1100 auto_power_on=rs.auto_power_on, 1101 keep_aghfp_link=rs.keep_aghfp_link, 1102 mic_gain=rs.mic_gain, 1103 tx_hold_time=rs.tx_hold_time, 1104 tx_time_limit=rs.tx_time_limit, 1105 local_speaker=rs.local_speaker, 1106 bt_mic_gain=rs.bt_mic_gain, 1107 adaptive_response=rs.adaptive_response, 1108 dis_tone=rs.dis_tone, 1109 power_saving_mode=rs.power_saving_mode, 1110 auto_power_off=rs.auto_power_off, 1111 auto_share_loc_ch=_raw_auto_share_loc_ch - 1112 1 if _raw_auto_share_loc_ch > 0 else "current", 1113 hm_speaker=rs.hm_speaker, 1114 positioning_system=rs.positioning_system, 1115 time_offset=rs.time_offset, 1116 use_freq_range_2=rs.use_freq_range_2, 1117 ptt_lock=rs.ptt_lock, 1118 leading_sync_bit_en=rs.leading_sync_bit_en, 1119 pairing_at_power_on=rs.pairing_at_power_on, 1120 screen_timeout=rs.screen_timeout, 1121 kiss_upload_tx_msg=rs.kiss_upload_tx_msg, 1122 kiss_en=rs.kiss_en, 1123 imperial_unit=rs.imperial_unit, 1124 wx_mode=rs.wx_mode, 1125 noaa_ch=rs.noaa_ch, 1126 vfol_tx_power_x=rs.vfol_tx_power_x, 1127 vfo2_tx_power_x=rs.vfo2_tx_power_x, 1128 dis_digital_mute=rs.dis_digital_mute, 1129 signaling_ecc_en=rs.signaling_ecc_en, 1130 ch_data_lock=rs.ch_data_lock, 1131 kiss_tx_delay=rs.kiss_tx_delay, 1132 kiss_tx_tail=rs.kiss_tx_tail, 1133 vox_en=rs.vox_en, 1134 vox_level=rs.vox_level, 1135 dis_bt_mic=rs.dis_bt_mic, 1136 vox_delay=rs.vox_delay, 1137 ns_en=rs.ns_en, 1138 alarm_volume=rs.alarm_volume, 1139 use_custom_location=rs.use_custom_location, 1140 gpwpl_upload_en=rs.gpwpl_upload_en, 1141 vfo1_mod_freq_x=rs.vfo1_mod_freq_x, 1142 custom_location_lat=rs.custom_location_lat, 1143 custom_location_lon=rs.custom_location_lon 1144 ) 1145 1146 def to_protocol(self): 1147 """@private (Protocol helper)""" 1148 _raw_auto_share_loc_ch = 0 if self.auto_share_loc_ch == "current" else self.auto_share_loc_ch + 1 1149 return p.Settings( 1150 channel_a_lower=self._channel_split.get_lower(self.channel_a), 1151 channel_b_lower=self._channel_split.get_lower(self.channel_b), 1152 scan=self.scan, 1153 aghfp_call_mode=self.aghfp_call_mode, 1154 double_channel=self.double_channel, 1155 squelch_level=self.squelch_level, 1156 tail_elim=self.tail_elim, 1157 auto_relay_en=self.auto_relay_en, 1158 auto_power_on=self.auto_power_on, 1159 keep_aghfp_link=self.keep_aghfp_link, 1160 mic_gain=self.mic_gain, 1161 tx_hold_time=self.tx_hold_time, 1162 tx_time_limit=self.tx_time_limit, 1163 local_speaker=self.local_speaker, 1164 bt_mic_gain=self.bt_mic_gain, 1165 adaptive_response=self.adaptive_response, 1166 dis_tone=self.dis_tone, 1167 power_saving_mode=self.power_saving_mode, 1168 auto_power_off=self.auto_power_off, 1169 auto_share_loc_ch=self._auto_share_loc_ch_split.get_lower( 1170 _raw_auto_share_loc_ch), 1171 hm_speaker=self.hm_speaker, 1172 positioning_system=self.positioning_system, 1173 time_offset=self.time_offset, 1174 use_freq_range_2=self.use_freq_range_2, 1175 ptt_lock=self.ptt_lock, 1176 leading_sync_bit_en=self.leading_sync_bit_en, 1177 pairing_at_power_on=self.pairing_at_power_on, 1178 screen_timeout=self.screen_timeout, 1179 kiss_upload_tx_msg=self.kiss_upload_tx_msg, 1180 kiss_en=self.kiss_en, 1181 imperial_unit=self.imperial_unit, 1182 channel_a_upper=self._channel_split.get_upper(self.channel_a), 1183 channel_b_upper=self._channel_split.get_upper(self.channel_b), 1184 wx_mode=self.wx_mode, 1185 noaa_ch=self.noaa_ch, 1186 vfol_tx_power_x=self.vfol_tx_power_x, 1187 vfo2_tx_power_x=self.vfo2_tx_power_x, 1188 dis_digital_mute=self.dis_digital_mute, 1189 signaling_ecc_en=self.signaling_ecc_en, 1190 ch_data_lock=self.ch_data_lock, 1191 auto_share_loc_ch_upper=self._auto_share_loc_ch_split.get_upper( 1192 _raw_auto_share_loc_ch), 1193 kiss_tx_delay=self.kiss_tx_delay, 1194 kiss_tx_tail=self.kiss_tx_tail, 1195 vox_en=self.vox_en, 1196 vox_level=self.vox_level, 1197 dis_bt_mic=self.dis_bt_mic, 1198 vox_delay=self.vox_delay, 1199 ns_en=self.ns_en, 1200 alarm_volume=self.alarm_volume, 1201 use_custom_location=self.use_custom_location, 1202 gpwpl_upload_en=self.gpwpl_upload_en, 1203 vfo1_mod_freq_x=self.vfo1_mod_freq_x, 1204 custom_location_lat=self.custom_location_lat, 1205 custom_location_lon=self.custom_location_lon 1206 ) 1207 1208 1209class DeviceInfo(ImmutableBaseModel): 1210 """A data object representing the device information""" 1211 vendor_id: int 1212 product_id: int 1213 hardware_version: int 1214 firmware_version: int 1215 supports_radio: bool 1216 supports_medium_power: bool 1217 fixed_location_speaker_volume: bool 1218 has_speaker: bool 1219 has_hand_microphone_speaker: bool 1220 region_count: int 1221 supports_noaa: bool 1222 supports_gmrs: bool 1223 supports_vfo: bool 1224 supports_dmr: bool 1225 supports_software_power_control: bool 1226 channel_count: int 1227 frequency_range_count: int 1228 1229 @classmethod 1230 def from_protocol(cls, info: p.DevInfo) -> DeviceInfo: 1231 """@private (Protocol helper)""" 1232 return DeviceInfo( 1233 vendor_id=info.vendor_id, 1234 product_id=info.product_id, 1235 hardware_version=info.hw_ver, 1236 firmware_version=info.soft_ver, 1237 supports_radio=info.support_radio, 1238 supports_medium_power=info.support_medium_power, 1239 fixed_location_speaker_volume=info.fixed_loc_speaker_vol, 1240 supports_software_power_control=not info.not_support_soft_power_ctrl, 1241 has_speaker=not info.have_no_speaker, 1242 has_hand_microphone_speaker=info.have_hm_speaker, 1243 region_count=info.region_count, 1244 supports_noaa=info.support_noaa, 1245 supports_gmrs=info.gmrs, 1246 supports_vfo=info.support_vfo, 1247 supports_dmr=info.support_dmr, 1248 channel_count=info.channel_count, 1249 frequency_range_count=info.freq_range_count 1250 ) 1251 1252 def to_protocol(self) -> p.DevInfo: 1253 """@private (Protocol helper)""" 1254 return p.DevInfo( 1255 vendor_id=self.vendor_id, 1256 product_id=self.product_id, 1257 hw_ver=self.hardware_version, 1258 soft_ver=self.firmware_version, 1259 support_radio=self.supports_radio, 1260 support_medium_power=self.supports_medium_power, 1261 fixed_loc_speaker_vol=self.fixed_location_speaker_volume, 1262 not_support_soft_power_ctrl=not self.supports_software_power_control, 1263 have_no_speaker=not self.has_speaker, 1264 have_hm_speaker=self.has_hand_microphone_speaker, 1265 region_count=self.region_count, 1266 support_noaa=self.supports_noaa, 1267 gmrs=self.supports_gmrs, 1268 support_vfo=self.supports_vfo, 1269 support_dmr=self.supports_dmr, 1270 channel_count=self.channel_count, 1271 freq_range_count=self.frequency_range_count 1272 ) 1273 1274 1275class BeaconSettingsArgs(t.TypedDict, total=False): 1276 """A dictionary of the parameters that can be set in the beacon settings""" 1277 max_fwd_times: int 1278 time_to_live: int 1279 ptt_release_send_location: bool 1280 ptt_release_send_id_info: bool 1281 ptt_release_send_bss_user_id: bool 1282 should_share_location: bool 1283 send_pwr_voltage: bool 1284 packet_format: t.Literal["BSS", "APRS"] 1285 allow_position_check: bool 1286 aprs_ssid: int 1287 location_share_interval: int 1288 bss_user_id: int 1289 ptt_release_id_info: str 1290 beacon_message: str 1291 aprs_symbol: str 1292 aprs_callsign: str 1293 1294 1295class BeaconSettings(ImmutableBaseModel): 1296 """A data object representing the beacon settings""" 1297 _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32) 1298 max_fwd_times: int 1299 time_to_live: int 1300 ptt_release_send_location: bool 1301 ptt_release_send_id_info: bool 1302 ptt_release_send_bss_user_id: bool 1303 should_share_location: bool 1304 send_pwr_voltage: bool 1305 packet_format: t.Literal["BSS", "APRS"] 1306 allow_position_check: bool 1307 aprs_ssid: int 1308 location_share_interval: int 1309 bss_user_id: int 1310 ptt_release_id_info: str 1311 beacon_message: str 1312 aprs_symbol: str 1313 aprs_callsign: str 1314 1315 @classmethod 1316 def from_protocol(cls, bs: p.BSSSettingsV2 | p.BSSSettings) -> BeaconSettings: 1317 """@private (Protocol helper)""" 1318 return BeaconSettings( 1319 max_fwd_times=bs.max_fwd_times, 1320 time_to_live=bs.time_to_live, 1321 ptt_release_send_location=bs.ptt_release_send_location, 1322 ptt_release_send_id_info=bs.ptt_release_send_id_info, 1323 ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id, 1324 should_share_location=bs.should_share_location, 1325 send_pwr_voltage=bs.send_pwr_voltage, 1326 packet_format=bs.packet_format.name, 1327 allow_position_check=bs.allow_position_check, 1328 aprs_ssid=bs.aprs_ssid, 1329 location_share_interval=bs.location_share_interval, 1330 bss_user_id=cls._bss_user_id_split.from_parts( 1331 bs.bss_user_id_upper, bs.bss_user_id_lower 1332 ), 1333 ptt_release_id_info=bs.ptt_release_id_info, 1334 beacon_message=bs.beacon_message, 1335 aprs_symbol=bs.aprs_symbol, 1336 aprs_callsign=bs.aprs_callsign 1337 ) 1338 1339 def to_protocol(self) -> p.BSSSettingsV2: 1340 """@private (Protocol helper)""" 1341 return p.BSSSettingsV2( 1342 max_fwd_times=self.max_fwd_times, 1343 time_to_live=self.time_to_live, 1344 ptt_release_send_location=self.ptt_release_send_location, 1345 ptt_release_send_id_info=self.ptt_release_send_id_info, 1346 ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id, 1347 should_share_location=self.should_share_location, 1348 send_pwr_voltage=self.send_pwr_voltage, 1349 packet_format=p.PacketFormat[self.packet_format], 1350 allow_position_check=self.allow_position_check, 1351 aprs_ssid=self.aprs_ssid, 1352 location_share_interval=self.location_share_interval, 1353 bss_user_id_lower=self._bss_user_id_split.get_lower( 1354 self.bss_user_id 1355 ), 1356 ptt_release_id_info=self.ptt_release_id_info, 1357 beacon_message=self.beacon_message, 1358 aprs_symbol=self.aprs_symbol, 1359 aprs_callsign=self.aprs_callsign, 1360 bss_user_id_upper=self._bss_user_id_split.get_upper( 1361 self.bss_user_id 1362 ), 1363 ) 1364 1365 1366ChannelType = t.Literal["OFF", "A", "B"] 1367 1368 1369class Status(ImmutableBaseModel): 1370 """A data object representing the radio status""" 1371 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1372 is_power_on: bool 1373 is_in_tx: bool 1374 is_sq: bool 1375 is_in_rx: bool 1376 double_channel: ChannelType 1377 is_scan: bool 1378 is_radio: bool 1379 curr_ch_id: int 1380 is_gps_locked: bool 1381 is_hfp_connected: bool 1382 is_aoc_connected: bool 1383 rssi: float 1384 curr_region: int 1385 1386 @classmethod 1387 def from_protocol(cls, s: p.Status | p.StatusExt) -> Status: 1388 """@private (Protocol helper)""" 1389 if not isinstance(s, p.StatusExt): 1390 raise ValueError( 1391 "Radio replied with old Status message version. Upgrade your firmware!" 1392 ) 1393 1394 return Status( 1395 is_power_on=s.is_power_on, 1396 is_in_tx=s.is_in_tx, 1397 is_sq=s.is_sq, 1398 is_in_rx=s.is_in_rx, 1399 double_channel=s.double_channel.name, 1400 is_scan=s.is_scan, 1401 is_radio=s.is_radio, 1402 curr_ch_id=cls._channel_split.from_parts( 1403 s.curr_channel_id_upper, s.curr_ch_id_lower 1404 ), 1405 is_gps_locked=s.is_gps_locked, 1406 is_hfp_connected=s.is_hfp_connected, 1407 is_aoc_connected=s.is_aoc_connected, 1408 rssi=s.rssi, 1409 curr_region=s.curr_region 1410 ) 1411 1412 def to_protocol(self) -> p.StatusExt: 1413 """@private (Protocol helper)""" 1414 return p.StatusExt( 1415 is_power_on=self.is_power_on, 1416 is_in_tx=self.is_in_tx, 1417 is_sq=self.is_sq, 1418 is_in_rx=self.is_in_rx, 1419 double_channel=p.ChannelType[self.double_channel], 1420 is_scan=self.is_scan, 1421 is_radio=self.is_radio, 1422 curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id), 1423 is_gps_locked=self.is_gps_locked, 1424 is_hfp_connected=self.is_hfp_connected, 1425 is_aoc_connected=self.is_aoc_connected, 1426 rssi=self.rssi, 1427 curr_region=self.curr_region, 1428 curr_channel_id_upper=self._channel_split.get_upper( 1429 self.curr_ch_id) 1430 ) 1431 1432 1433class Position(ImmutableBaseModel): 1434 """A data object for representing GPS positions""" 1435 latitude: float 1436 longitude: float 1437 altitude: int | None 1438 speed: int | None 1439 heading: int | None 1440 time: datetime 1441 accuracy: int 1442 1443 @classmethod 1444 def from_protocol(cls, x: p.Position) -> Position: 1445 """@private (Protocol helper)""" 1446 return Position( 1447 latitude=x.latitude, 1448 longitude=x.longitude, 1449 altitude=x.altitude, 1450 speed=x.speed, 1451 heading=x.heading, 1452 time=x.time, 1453 accuracy=x.accuracy, 1454 ) 1455 1456 def to_protocol(self) -> p.Position: 1457 """@private (Protocol helper)""" 1458 return p.Position( 1459 latitude=self.latitude, 1460 longitude=self.longitude, 1461 altitude=self.altitude, 1462 speed=self.speed, 1463 heading=self.heading, 1464 time=self.time, 1465 accuracy=self.accuracy, 1466 )
54class CommandConnection: 55 _link: CommandLink 56 _handlers: t.List[RadioMessageHandler] = [] 57 58 def __init__(self, link: CommandLink): 59 self._link = link 60 self._handlers = [] 61 62 @classmethod 63 def new_ble(cls, device_uuid: str) -> CommandConnection: 64 return cls(BleCommandLink(device_uuid)) 65 66 @classmethod 67 def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection: 68 return cls(RfcommCommandLink(device_uuid, channel)) 69 70 def is_connected(self) -> bool: 71 return self._link.is_connected() 72 73 async def connect(self) -> None: 74 await self._link.connect(self._on_recv) 75 76 async def disconnect(self) -> None: 77 self._handlers.clear() 78 await self._link.disconnect() 79 80 async def send_bytes(self, data: bytes) -> None: 81 await self._link.send_bytes(data) 82 83 async def send_message(self, command: CommandMessage) -> None: 84 await self._link.send(command_message_to_protocol(command)) 85 86 async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError: 87 queue: asyncio.Queue[RadioMessageT | 88 MessageReplyError] = asyncio.Queue() 89 90 def reply_handler(reply: RadioMessage): 91 if ( 92 isinstance(reply, expect) or 93 ( 94 isinstance(reply, MessageReplyError) and 95 reply.message_type is expect 96 ) 97 ): 98 queue.put_nowait(reply) 99 100 remove_handler = self._add_message_handler(reply_handler) 101 102 await self.send_message(command) 103 104 out = await queue.get() 105 106 remove_handler() 107 108 return out 109 110 def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]: 111 def event_handler(msg: RadioMessage): 112 if isinstance(msg, EventMessage): 113 handler(msg) 114 return self._add_message_handler(event_handler) 115 116 def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]: 117 self._handlers.append(handler) 118 119 def remove_handler(): 120 self._handlers.remove(handler) 121 122 return remove_handler 123 124 def _on_recv(self, msg: p.Message) -> None: 125 radio_message = radio_message_from_protocol(msg) 126 for handler in self._handlers: 127 handler(radio_message) 128 129 # Command API 130 131 async def enable_event(self, event_type: EventType) -> None: 132 """Enable an event""" 133 # This event doesn't get a reply version of EnableEvent. It does, however 134 # does trigger a StatusChangedEvent... but we don't wait for it here. 135 # Instead, we just fire and forget 136 await self.send_message(EnableEvent(event_type)) 137 138 async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None: 139 """Send Tnc data""" 140 reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply) 141 if isinstance(reply, MessageReplyError): 142 raise reply.as_exception() 143 144 async def get_beacon_settings(self) -> BeaconSettings: 145 """Get the current packet settings""" 146 reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply) 147 if isinstance(reply, MessageReplyError): 148 raise reply.as_exception() 149 return reply.tnc_settings 150 151 async def set_beacon_settings(self, packet_settings: BeaconSettings): 152 """Set the packet settings""" 153 reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply) 154 if isinstance(reply, MessageReplyError): 155 raise reply.as_exception() 156 157 async def get_battery_level(self) -> int: 158 """Get the battery level""" 159 reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply) 160 if isinstance(reply, MessageReplyError): 161 raise reply.as_exception() 162 return reply.battery_level 163 164 async def get_battery_level_as_percentage(self) -> int: 165 """Get the battery level as a percentage""" 166 reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply) 167 if isinstance(reply, MessageReplyError): 168 raise reply.as_exception() 169 return reply.battery_level_as_percentage 170 171 async def get_rc_battery_level(self) -> int: 172 """Get the RC battery level""" 173 reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply) 174 if isinstance(reply, MessageReplyError): 175 raise reply.as_exception() 176 return reply.rc_battery_level 177 178 async def get_battery_voltage(self) -> float: 179 """Get the battery voltage""" 180 reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply) 181 if isinstance(reply, MessageReplyError): 182 raise reply.as_exception() 183 return reply.battery_voltage 184 185 async def get_device_info(self) -> DeviceInfo: 186 """Get the device info""" 187 reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply) 188 if isinstance(reply, MessageReplyError): 189 raise reply.as_exception() 190 return reply.device_info 191 192 async def get_settings(self) -> Settings: 193 """Get the settings""" 194 reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply) 195 if isinstance(reply, MessageReplyError): 196 raise reply.as_exception() 197 return reply.settings 198 199 async def set_settings(self, settings: Settings) -> None: 200 """Set the settings""" 201 reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply) 202 if isinstance(reply, MessageReplyError): 203 raise reply.as_exception() 204 205 async def get_channel(self, channel_id: int) -> Channel: 206 """Get a channel""" 207 reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply) 208 if isinstance(reply, MessageReplyError): 209 raise reply.as_exception() 210 return reply.channel 211 212 async def set_channel(self, channel: Channel): 213 """Set a channel""" 214 reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply) 215 if isinstance(reply, MessageReplyError): 216 raise reply.as_exception() 217 218 async def get_status(self) -> Status: 219 """Get the radio status""" 220 reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply) 221 if isinstance(reply, MessageReplyError): 222 raise reply.as_exception() 223 return reply.status 224 225 async def get_position(self) -> Position: 226 """Get the radio position""" 227 reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply) 228 if isinstance(reply, MessageReplyError): 229 raise reply.as_exception() 230 return reply.position 231 232 async def __aenter__(self): 233 await self.connect() 234 return self 235 236 async def __aexit__( 237 self, 238 exc_type: t.Any, 239 exc_value: t.Any, 240 traceback: t.Any, 241 ) -> None: 242 await self.disconnect()
86 async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError: 87 queue: asyncio.Queue[RadioMessageT | 88 MessageReplyError] = asyncio.Queue() 89 90 def reply_handler(reply: RadioMessage): 91 if ( 92 isinstance(reply, expect) or 93 ( 94 isinstance(reply, MessageReplyError) and 95 reply.message_type is expect 96 ) 97 ): 98 queue.put_nowait(reply) 99 100 remove_handler = self._add_message_handler(reply_handler) 101 102 await self.send_message(command) 103 104 out = await queue.get() 105 106 remove_handler() 107 108 return out
131 async def enable_event(self, event_type: EventType) -> None: 132 """Enable an event""" 133 # This event doesn't get a reply version of EnableEvent. It does, however 134 # does trigger a StatusChangedEvent... but we don't wait for it here. 135 # Instead, we just fire and forget 136 await self.send_message(EnableEvent(event_type))
Enable an event
138 async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None: 139 """Send Tnc data""" 140 reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply) 141 if isinstance(reply, MessageReplyError): 142 raise reply.as_exception()
Send Tnc data
144 async def get_beacon_settings(self) -> BeaconSettings: 145 """Get the current packet settings""" 146 reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply) 147 if isinstance(reply, MessageReplyError): 148 raise reply.as_exception() 149 return reply.tnc_settings
Get the current packet settings
151 async def set_beacon_settings(self, packet_settings: BeaconSettings): 152 """Set the packet settings""" 153 reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply) 154 if isinstance(reply, MessageReplyError): 155 raise reply.as_exception()
Set the packet settings
157 async def get_battery_level(self) -> int: 158 """Get the battery level""" 159 reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply) 160 if isinstance(reply, MessageReplyError): 161 raise reply.as_exception() 162 return reply.battery_level
Get the battery level
164 async def get_battery_level_as_percentage(self) -> int: 165 """Get the battery level as a percentage""" 166 reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply) 167 if isinstance(reply, MessageReplyError): 168 raise reply.as_exception() 169 return reply.battery_level_as_percentage
Get the battery level as a percentage
171 async def get_rc_battery_level(self) -> int: 172 """Get the RC battery level""" 173 reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply) 174 if isinstance(reply, MessageReplyError): 175 raise reply.as_exception() 176 return reply.rc_battery_level
Get the RC battery level
178 async def get_battery_voltage(self) -> float: 179 """Get the battery voltage""" 180 reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply) 181 if isinstance(reply, MessageReplyError): 182 raise reply.as_exception() 183 return reply.battery_voltage
Get the battery voltage
185 async def get_device_info(self) -> DeviceInfo: 186 """Get the device info""" 187 reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply) 188 if isinstance(reply, MessageReplyError): 189 raise reply.as_exception() 190 return reply.device_info
Get the device info
192 async def get_settings(self) -> Settings: 193 """Get the settings""" 194 reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply) 195 if isinstance(reply, MessageReplyError): 196 raise reply.as_exception() 197 return reply.settings
Get the settings
199 async def set_settings(self, settings: Settings) -> None: 200 """Set the settings""" 201 reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply) 202 if isinstance(reply, MessageReplyError): 203 raise reply.as_exception()
Set the settings
205 async def get_channel(self, channel_id: int) -> Channel: 206 """Get a channel""" 207 reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply) 208 if isinstance(reply, MessageReplyError): 209 raise reply.as_exception() 210 return reply.channel
Get a channel
212 async def set_channel(self, channel: Channel): 213 """Set a channel""" 214 reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply) 215 if isinstance(reply, MessageReplyError): 216 raise reply.as_exception()
Set a channel
218 async def get_status(self) -> Status: 219 """Get the radio status""" 220 reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply) 221 if isinstance(reply, MessageReplyError): 222 raise reply.as_exception() 223 return reply.status
Get the radio status
EnableEvent(event_type,)
GetBeaconSettings()
SetBeaconSettings(tnc_settings,)
SetSettings(settings,)
GetBatteryLevelAsPercentage()
GetRCBatteryLevel()
GetBatteryLevel()
GetBatteryVoltage()
GetDeviceInfo()
GetChannel(channel_id,)
SetChannel(channel,)
GetSettings()
GetStatus()
GetPosition()
SendTncDataFragment(tnc_data_fragment,)
SendTncDataFragmentReply()
GetBeaconSettingsReply(tnc_settings,)
SetBeaconSettingsReply()
SetSettingsReply()
GetBatteryLevelAsPercentageReply(battery_level_as_percentage,)
GetRCBatteryLevelReply(rc_battery_level,)
GetBatteryLevelReply(battery_level,)
GetBatteryVoltageReply(battery_voltage,)
GetDeviceInfoReply(device_info,)
GetChannelReply(channel,)
SetChannelReply()
GetStatusReply(status,)
GetSettingsReply(settings,)
GetPositionReply(position,)
693class MessageReplyError(t.NamedTuple): 694 message_type: t.Type[t.Any] 695 reason: ReplyStatus 696 697 def as_exception(self): 698 return ValueError(f"{self.message_type.__name__} failed: {self.reason}")
MessageReplyError(message_type, reason)
Create new instance of MessageReplyError(message_type, reason)
StatusChangedEvent(status,)
ChannelChangedEvent(channel,)
TncDataFragmentReceivedEvent(tnc_data_fragment,)
SettingsChangedEvent(settings,)
UnknownProtocolMessage(message,)
815class TncDataFragment(ImmutableBaseModel): 816 """A data object representing a message packet""" 817 is_final_fragment: bool 818 fragment_id: int 819 data: bytes 820 channel_id: int | None = None 821 822 @classmethod 823 def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment: 824 """@private (Protocol helper)""" 825 return TncDataFragment( 826 is_final_fragment=mp.is_final_fragment, 827 fragment_id=mp.fragment_id, 828 data=mp.data, 829 channel_id=mp.channel_id 830 ) 831 832 def to_protocol(self) -> p.TncDataFragment: 833 """@private (Protocol helper)""" 834 return p.TncDataFragment( 835 is_final_fragment=self.is_final_fragment, 836 with_channel_id=self.channel_id is not None, 837 fragment_id=self.fragment_id, 838 data=self.data, 839 channel_id=self.channel_id 840 )
A data object representing a message packet
848class DCS(t.NamedTuple): 849 """A type for setting Digital Coded Squelch (DCS) on channels""" 850 851 n: int 852 """The DCS Normal (N) code"""
A type for setting Digital Coded Squelch (DCS) on channels
873class ChannelArgs(t.TypedDict, total=False): 874 """A dictionary of the parameters that can be set on a channel""" 875 tx_mod: ModulationType 876 tx_freq: float 877 rx_mod: ModulationType 878 rx_freq: float 879 tx_sub_audio: float | DCS | None 880 rx_sub_audio: float | DCS | None 881 scan: bool 882 tx_at_max_power: bool 883 talk_around: bool 884 bandwidth: BandwidthType 885 pre_de_emph_bypass: bool 886 sign: bool 887 tx_at_med_power: bool 888 tx_disable: bool 889 fixed_freq: bool 890 fixed_bandwidth: bool 891 fixed_tx_power: bool 892 mute: bool 893 name: str
A dictionary of the parameters that can be set on a channel
896class Channel(ImmutableBaseModel): 897 """A data object representing a radio channel""" 898 channel_id: int 899 tx_mod: ModulationType 900 tx_freq: float 901 rx_mod: ModulationType 902 rx_freq: float 903 tx_sub_audio: float | DCS | None 904 rx_sub_audio: float | DCS | None 905 scan: bool 906 tx_at_max_power: bool 907 talk_around: bool 908 bandwidth: BandwidthType 909 pre_de_emph_bypass: bool 910 sign: bool 911 tx_at_med_power: bool 912 tx_disable: bool 913 fixed_freq: bool 914 fixed_bandwidth: bool 915 fixed_tx_power: bool 916 mute: bool 917 name: str 918 919 @classmethod 920 def from_protocol(cls, cs: p.RfCh) -> Channel: 921 """@private (Protocol helper)""" 922 return Channel( 923 channel_id=cs.channel_id, 924 tx_mod=cs.tx_mod.name, 925 tx_freq=cs.tx_freq, 926 rx_mod=cs.rx_mod.name, 927 rx_freq=cs.rx_freq, 928 tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio), 929 rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio), 930 scan=cs.scan, 931 tx_at_max_power=cs.tx_at_max_power, 932 talk_around=cs.talk_around, 933 bandwidth=cs.bandwidth.name, 934 pre_de_emph_bypass=cs.pre_de_emph_bypass, 935 sign=cs.sign, 936 tx_at_med_power=cs.tx_at_med_power, 937 tx_disable=cs.tx_disable, 938 fixed_freq=cs.fixed_freq, 939 fixed_bandwidth=cs.fixed_bandwidth, 940 fixed_tx_power=cs.fixed_tx_power, 941 mute=cs.mute, 942 name=cs.name_str 943 ) 944 945 def to_protocol(self) -> p.RfCh: 946 """@private (Protocol helper)""" 947 return p.RfCh( 948 channel_id=self.channel_id, 949 tx_mod=p.ModulationType[self.tx_mod], 950 tx_freq=self.tx_freq, 951 rx_mod=p.ModulationType[self.rx_mod], 952 rx_freq=self.rx_freq, 953 tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio), 954 rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio), 955 scan=self.scan, 956 tx_at_max_power=self.tx_at_max_power, 957 talk_around=self.talk_around, 958 bandwidth=p.BandwidthType[self.bandwidth], 959 pre_de_emph_bypass=self.pre_de_emph_bypass, 960 sign=self.sign, 961 tx_at_med_power=self.tx_at_med_power, 962 tx_disable=self.tx_disable, 963 fixed_freq=self.fixed_freq, 964 fixed_bandwidth=self.fixed_bandwidth, 965 fixed_tx_power=self.fixed_tx_power, 966 mute=self.mute, 967 name_str=self.name 968 )
A data object representing a radio channel
971class SettingsArgs(t.TypedDict, total=False): 972 """A dictionary of the parameters that can be set in the radio settings""" 973 channel_a: int 974 channel_b: int 975 scan: bool 976 aghfp_call_mode: int 977 double_channel: int 978 squelch_level: int 979 tail_elim: bool 980 auto_relay_en: bool 981 auto_power_on: bool 982 keep_aghfp_link: bool 983 mic_gain: int 984 tx_hold_time: int 985 tx_time_limit: int 986 local_speaker: int 987 bt_mic_gain: int 988 adaptive_response: bool 989 dis_tone: bool 990 power_saving_mode: bool 991 auto_power_off: int 992 auto_share_loc_ch: int | t.Literal["current"] 993 hm_speaker: int 994 positioning_system: int 995 time_offset: int 996 use_freq_range_2: bool 997 ptt_lock: bool 998 leading_sync_bit_en: bool 999 pairing_at_power_on: bool 1000 screen_timeout: int 1001 kiss_upload_tx_msg: bool 1002 kiss_en: bool 1003 imperial_unit: bool 1004 wx_mode: int 1005 noaa_ch: int 1006 vfol_tx_power_x: int 1007 vfo2_tx_power_x: int 1008 dis_digital_mute: bool 1009 signaling_ecc_en: bool 1010 ch_data_lock: bool 1011 kiss_tx_delay: int 1012 kiss_tx_tail: int 1013 vox_en: bool 1014 vox_level: int 1015 dis_bt_mic: bool 1016 vox_delay: int 1017 ns_en: bool 1018 alarm_volume: int 1019 use_custom_location: bool 1020 gpwpl_upload_en: bool 1021 vfo1_mod_freq_x: int 1022 custom_location_lat: int 1023 custom_location_lon: int
A dictionary of the parameters that can be set in the radio settings
1026class Settings(ImmutableBaseModel): 1027 """A data object representing the radio settings""" 1028 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1029 _auto_share_loc_ch_split: t.ClassVar[IntSplit] = IntSplit(3, 5) 1030 channel_a: int 1031 channel_b: int 1032 scan: bool 1033 aghfp_call_mode: int 1034 double_channel: int 1035 squelch_level: int 1036 tail_elim: bool 1037 auto_relay_en: bool 1038 auto_power_on: bool 1039 keep_aghfp_link: bool 1040 mic_gain: int 1041 tx_hold_time: int 1042 tx_time_limit: int 1043 local_speaker: int 1044 bt_mic_gain: int 1045 adaptive_response: bool 1046 dis_tone: bool 1047 power_saving_mode: bool 1048 auto_power_off: int 1049 auto_share_loc_ch: int | t.Literal["current"] 1050 hm_speaker: int 1051 positioning_system: int 1052 time_offset: int 1053 use_freq_range_2: bool 1054 ptt_lock: bool 1055 leading_sync_bit_en: bool 1056 pairing_at_power_on: bool 1057 screen_timeout: int 1058 kiss_upload_tx_msg: bool 1059 kiss_en: bool 1060 imperial_unit: bool 1061 wx_mode: int 1062 noaa_ch: int 1063 vfol_tx_power_x: int 1064 vfo2_tx_power_x: int 1065 dis_digital_mute: bool 1066 signaling_ecc_en: bool 1067 ch_data_lock: bool 1068 kiss_tx_delay: int 1069 kiss_tx_tail: int 1070 vox_en: bool 1071 vox_level: int 1072 dis_bt_mic: bool 1073 vox_delay: int 1074 ns_en: bool 1075 alarm_volume: int 1076 use_custom_location: bool 1077 gpwpl_upload_en: bool 1078 vfo1_mod_freq_x: int 1079 custom_location_lat: int 1080 custom_location_lon: int 1081 1082 @classmethod 1083 def from_protocol(cls, rs: p.Settings) -> Settings: 1084 """@private (Protocol helper)""" 1085 _raw_auto_share_loc_ch = cls._auto_share_loc_ch_split.from_parts( 1086 rs.auto_share_loc_ch_upper, rs.auto_share_loc_ch 1087 ) 1088 return Settings( 1089 channel_a=cls._channel_split.from_parts( 1090 rs.channel_a_upper, rs.channel_a_lower 1091 ), 1092 channel_b=cls._channel_split.from_parts( 1093 rs.channel_b_upper, rs.channel_b_lower 1094 ), 1095 scan=rs.scan, 1096 aghfp_call_mode=rs.aghfp_call_mode, 1097 double_channel=rs.double_channel, 1098 squelch_level=rs.squelch_level, 1099 tail_elim=rs.tail_elim, 1100 auto_relay_en=rs.auto_relay_en, 1101 auto_power_on=rs.auto_power_on, 1102 keep_aghfp_link=rs.keep_aghfp_link, 1103 mic_gain=rs.mic_gain, 1104 tx_hold_time=rs.tx_hold_time, 1105 tx_time_limit=rs.tx_time_limit, 1106 local_speaker=rs.local_speaker, 1107 bt_mic_gain=rs.bt_mic_gain, 1108 adaptive_response=rs.adaptive_response, 1109 dis_tone=rs.dis_tone, 1110 power_saving_mode=rs.power_saving_mode, 1111 auto_power_off=rs.auto_power_off, 1112 auto_share_loc_ch=_raw_auto_share_loc_ch - 1113 1 if _raw_auto_share_loc_ch > 0 else "current", 1114 hm_speaker=rs.hm_speaker, 1115 positioning_system=rs.positioning_system, 1116 time_offset=rs.time_offset, 1117 use_freq_range_2=rs.use_freq_range_2, 1118 ptt_lock=rs.ptt_lock, 1119 leading_sync_bit_en=rs.leading_sync_bit_en, 1120 pairing_at_power_on=rs.pairing_at_power_on, 1121 screen_timeout=rs.screen_timeout, 1122 kiss_upload_tx_msg=rs.kiss_upload_tx_msg, 1123 kiss_en=rs.kiss_en, 1124 imperial_unit=rs.imperial_unit, 1125 wx_mode=rs.wx_mode, 1126 noaa_ch=rs.noaa_ch, 1127 vfol_tx_power_x=rs.vfol_tx_power_x, 1128 vfo2_tx_power_x=rs.vfo2_tx_power_x, 1129 dis_digital_mute=rs.dis_digital_mute, 1130 signaling_ecc_en=rs.signaling_ecc_en, 1131 ch_data_lock=rs.ch_data_lock, 1132 kiss_tx_delay=rs.kiss_tx_delay, 1133 kiss_tx_tail=rs.kiss_tx_tail, 1134 vox_en=rs.vox_en, 1135 vox_level=rs.vox_level, 1136 dis_bt_mic=rs.dis_bt_mic, 1137 vox_delay=rs.vox_delay, 1138 ns_en=rs.ns_en, 1139 alarm_volume=rs.alarm_volume, 1140 use_custom_location=rs.use_custom_location, 1141 gpwpl_upload_en=rs.gpwpl_upload_en, 1142 vfo1_mod_freq_x=rs.vfo1_mod_freq_x, 1143 custom_location_lat=rs.custom_location_lat, 1144 custom_location_lon=rs.custom_location_lon 1145 ) 1146 1147 def to_protocol(self): 1148 """@private (Protocol helper)""" 1149 _raw_auto_share_loc_ch = 0 if self.auto_share_loc_ch == "current" else self.auto_share_loc_ch + 1 1150 return p.Settings( 1151 channel_a_lower=self._channel_split.get_lower(self.channel_a), 1152 channel_b_lower=self._channel_split.get_lower(self.channel_b), 1153 scan=self.scan, 1154 aghfp_call_mode=self.aghfp_call_mode, 1155 double_channel=self.double_channel, 1156 squelch_level=self.squelch_level, 1157 tail_elim=self.tail_elim, 1158 auto_relay_en=self.auto_relay_en, 1159 auto_power_on=self.auto_power_on, 1160 keep_aghfp_link=self.keep_aghfp_link, 1161 mic_gain=self.mic_gain, 1162 tx_hold_time=self.tx_hold_time, 1163 tx_time_limit=self.tx_time_limit, 1164 local_speaker=self.local_speaker, 1165 bt_mic_gain=self.bt_mic_gain, 1166 adaptive_response=self.adaptive_response, 1167 dis_tone=self.dis_tone, 1168 power_saving_mode=self.power_saving_mode, 1169 auto_power_off=self.auto_power_off, 1170 auto_share_loc_ch=self._auto_share_loc_ch_split.get_lower( 1171 _raw_auto_share_loc_ch), 1172 hm_speaker=self.hm_speaker, 1173 positioning_system=self.positioning_system, 1174 time_offset=self.time_offset, 1175 use_freq_range_2=self.use_freq_range_2, 1176 ptt_lock=self.ptt_lock, 1177 leading_sync_bit_en=self.leading_sync_bit_en, 1178 pairing_at_power_on=self.pairing_at_power_on, 1179 screen_timeout=self.screen_timeout, 1180 kiss_upload_tx_msg=self.kiss_upload_tx_msg, 1181 kiss_en=self.kiss_en, 1182 imperial_unit=self.imperial_unit, 1183 channel_a_upper=self._channel_split.get_upper(self.channel_a), 1184 channel_b_upper=self._channel_split.get_upper(self.channel_b), 1185 wx_mode=self.wx_mode, 1186 noaa_ch=self.noaa_ch, 1187 vfol_tx_power_x=self.vfol_tx_power_x, 1188 vfo2_tx_power_x=self.vfo2_tx_power_x, 1189 dis_digital_mute=self.dis_digital_mute, 1190 signaling_ecc_en=self.signaling_ecc_en, 1191 ch_data_lock=self.ch_data_lock, 1192 auto_share_loc_ch_upper=self._auto_share_loc_ch_split.get_upper( 1193 _raw_auto_share_loc_ch), 1194 kiss_tx_delay=self.kiss_tx_delay, 1195 kiss_tx_tail=self.kiss_tx_tail, 1196 vox_en=self.vox_en, 1197 vox_level=self.vox_level, 1198 dis_bt_mic=self.dis_bt_mic, 1199 vox_delay=self.vox_delay, 1200 ns_en=self.ns_en, 1201 alarm_volume=self.alarm_volume, 1202 use_custom_location=self.use_custom_location, 1203 gpwpl_upload_en=self.gpwpl_upload_en, 1204 vfo1_mod_freq_x=self.vfo1_mod_freq_x, 1205 custom_location_lat=self.custom_location_lat, 1206 custom_location_lon=self.custom_location_lon 1207 )
A data object representing the radio settings
1210class DeviceInfo(ImmutableBaseModel): 1211 """A data object representing the device information""" 1212 vendor_id: int 1213 product_id: int 1214 hardware_version: int 1215 firmware_version: int 1216 supports_radio: bool 1217 supports_medium_power: bool 1218 fixed_location_speaker_volume: bool 1219 has_speaker: bool 1220 has_hand_microphone_speaker: bool 1221 region_count: int 1222 supports_noaa: bool 1223 supports_gmrs: bool 1224 supports_vfo: bool 1225 supports_dmr: bool 1226 supports_software_power_control: bool 1227 channel_count: int 1228 frequency_range_count: int 1229 1230 @classmethod 1231 def from_protocol(cls, info: p.DevInfo) -> DeviceInfo: 1232 """@private (Protocol helper)""" 1233 return DeviceInfo( 1234 vendor_id=info.vendor_id, 1235 product_id=info.product_id, 1236 hardware_version=info.hw_ver, 1237 firmware_version=info.soft_ver, 1238 supports_radio=info.support_radio, 1239 supports_medium_power=info.support_medium_power, 1240 fixed_location_speaker_volume=info.fixed_loc_speaker_vol, 1241 supports_software_power_control=not info.not_support_soft_power_ctrl, 1242 has_speaker=not info.have_no_speaker, 1243 has_hand_microphone_speaker=info.have_hm_speaker, 1244 region_count=info.region_count, 1245 supports_noaa=info.support_noaa, 1246 supports_gmrs=info.gmrs, 1247 supports_vfo=info.support_vfo, 1248 supports_dmr=info.support_dmr, 1249 channel_count=info.channel_count, 1250 frequency_range_count=info.freq_range_count 1251 ) 1252 1253 def to_protocol(self) -> p.DevInfo: 1254 """@private (Protocol helper)""" 1255 return p.DevInfo( 1256 vendor_id=self.vendor_id, 1257 product_id=self.product_id, 1258 hw_ver=self.hardware_version, 1259 soft_ver=self.firmware_version, 1260 support_radio=self.supports_radio, 1261 support_medium_power=self.supports_medium_power, 1262 fixed_loc_speaker_vol=self.fixed_location_speaker_volume, 1263 not_support_soft_power_ctrl=not self.supports_software_power_control, 1264 have_no_speaker=not self.has_speaker, 1265 have_hm_speaker=self.has_hand_microphone_speaker, 1266 region_count=self.region_count, 1267 support_noaa=self.supports_noaa, 1268 gmrs=self.supports_gmrs, 1269 support_vfo=self.supports_vfo, 1270 support_dmr=self.supports_dmr, 1271 channel_count=self.channel_count, 1272 freq_range_count=self.frequency_range_count 1273 )
A data object representing the device information
1276class BeaconSettingsArgs(t.TypedDict, total=False): 1277 """A dictionary of the parameters that can be set in the beacon settings""" 1278 max_fwd_times: int 1279 time_to_live: int 1280 ptt_release_send_location: bool 1281 ptt_release_send_id_info: bool 1282 ptt_release_send_bss_user_id: bool 1283 should_share_location: bool 1284 send_pwr_voltage: bool 1285 packet_format: t.Literal["BSS", "APRS"] 1286 allow_position_check: bool 1287 aprs_ssid: int 1288 location_share_interval: int 1289 bss_user_id: int 1290 ptt_release_id_info: str 1291 beacon_message: str 1292 aprs_symbol: str 1293 aprs_callsign: str
A dictionary of the parameters that can be set in the beacon settings
1296class BeaconSettings(ImmutableBaseModel): 1297 """A data object representing the beacon settings""" 1298 _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32) 1299 max_fwd_times: int 1300 time_to_live: int 1301 ptt_release_send_location: bool 1302 ptt_release_send_id_info: bool 1303 ptt_release_send_bss_user_id: bool 1304 should_share_location: bool 1305 send_pwr_voltage: bool 1306 packet_format: t.Literal["BSS", "APRS"] 1307 allow_position_check: bool 1308 aprs_ssid: int 1309 location_share_interval: int 1310 bss_user_id: int 1311 ptt_release_id_info: str 1312 beacon_message: str 1313 aprs_symbol: str 1314 aprs_callsign: str 1315 1316 @classmethod 1317 def from_protocol(cls, bs: p.BSSSettingsV2 | p.BSSSettings) -> BeaconSettings: 1318 """@private (Protocol helper)""" 1319 return BeaconSettings( 1320 max_fwd_times=bs.max_fwd_times, 1321 time_to_live=bs.time_to_live, 1322 ptt_release_send_location=bs.ptt_release_send_location, 1323 ptt_release_send_id_info=bs.ptt_release_send_id_info, 1324 ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id, 1325 should_share_location=bs.should_share_location, 1326 send_pwr_voltage=bs.send_pwr_voltage, 1327 packet_format=bs.packet_format.name, 1328 allow_position_check=bs.allow_position_check, 1329 aprs_ssid=bs.aprs_ssid, 1330 location_share_interval=bs.location_share_interval, 1331 bss_user_id=cls._bss_user_id_split.from_parts( 1332 bs.bss_user_id_upper, bs.bss_user_id_lower 1333 ), 1334 ptt_release_id_info=bs.ptt_release_id_info, 1335 beacon_message=bs.beacon_message, 1336 aprs_symbol=bs.aprs_symbol, 1337 aprs_callsign=bs.aprs_callsign 1338 ) 1339 1340 def to_protocol(self) -> p.BSSSettingsV2: 1341 """@private (Protocol helper)""" 1342 return p.BSSSettingsV2( 1343 max_fwd_times=self.max_fwd_times, 1344 time_to_live=self.time_to_live, 1345 ptt_release_send_location=self.ptt_release_send_location, 1346 ptt_release_send_id_info=self.ptt_release_send_id_info, 1347 ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id, 1348 should_share_location=self.should_share_location, 1349 send_pwr_voltage=self.send_pwr_voltage, 1350 packet_format=p.PacketFormat[self.packet_format], 1351 allow_position_check=self.allow_position_check, 1352 aprs_ssid=self.aprs_ssid, 1353 location_share_interval=self.location_share_interval, 1354 bss_user_id_lower=self._bss_user_id_split.get_lower( 1355 self.bss_user_id 1356 ), 1357 ptt_release_id_info=self.ptt_release_id_info, 1358 beacon_message=self.beacon_message, 1359 aprs_symbol=self.aprs_symbol, 1360 aprs_callsign=self.aprs_callsign, 1361 bss_user_id_upper=self._bss_user_id_split.get_upper( 1362 self.bss_user_id 1363 ), 1364 )
A data object representing the beacon settings
1370class Status(ImmutableBaseModel): 1371 """A data object representing the radio status""" 1372 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1373 is_power_on: bool 1374 is_in_tx: bool 1375 is_sq: bool 1376 is_in_rx: bool 1377 double_channel: ChannelType 1378 is_scan: bool 1379 is_radio: bool 1380 curr_ch_id: int 1381 is_gps_locked: bool 1382 is_hfp_connected: bool 1383 is_aoc_connected: bool 1384 rssi: float 1385 curr_region: int 1386 1387 @classmethod 1388 def from_protocol(cls, s: p.Status | p.StatusExt) -> Status: 1389 """@private (Protocol helper)""" 1390 if not isinstance(s, p.StatusExt): 1391 raise ValueError( 1392 "Radio replied with old Status message version. Upgrade your firmware!" 1393 ) 1394 1395 return Status( 1396 is_power_on=s.is_power_on, 1397 is_in_tx=s.is_in_tx, 1398 is_sq=s.is_sq, 1399 is_in_rx=s.is_in_rx, 1400 double_channel=s.double_channel.name, 1401 is_scan=s.is_scan, 1402 is_radio=s.is_radio, 1403 curr_ch_id=cls._channel_split.from_parts( 1404 s.curr_channel_id_upper, s.curr_ch_id_lower 1405 ), 1406 is_gps_locked=s.is_gps_locked, 1407 is_hfp_connected=s.is_hfp_connected, 1408 is_aoc_connected=s.is_aoc_connected, 1409 rssi=s.rssi, 1410 curr_region=s.curr_region 1411 ) 1412 1413 def to_protocol(self) -> p.StatusExt: 1414 """@private (Protocol helper)""" 1415 return p.StatusExt( 1416 is_power_on=self.is_power_on, 1417 is_in_tx=self.is_in_tx, 1418 is_sq=self.is_sq, 1419 is_in_rx=self.is_in_rx, 1420 double_channel=p.ChannelType[self.double_channel], 1421 is_scan=self.is_scan, 1422 is_radio=self.is_radio, 1423 curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id), 1424 is_gps_locked=self.is_gps_locked, 1425 is_hfp_connected=self.is_hfp_connected, 1426 is_aoc_connected=self.is_aoc_connected, 1427 rssi=self.rssi, 1428 curr_region=self.curr_region, 1429 curr_channel_id_upper=self._channel_split.get_upper( 1430 self.curr_ch_id) 1431 )
A data object representing the radio status
1434class Position(ImmutableBaseModel): 1435 """A data object for representing GPS positions""" 1436 latitude: float 1437 longitude: float 1438 altitude: int | None 1439 speed: int | None 1440 heading: int | None 1441 time: datetime 1442 accuracy: int 1443 1444 @classmethod 1445 def from_protocol(cls, x: p.Position) -> Position: 1446 """@private (Protocol helper)""" 1447 return Position( 1448 latitude=x.latitude, 1449 longitude=x.longitude, 1450 altitude=x.altitude, 1451 speed=x.speed, 1452 heading=x.heading, 1453 time=x.time, 1454 accuracy=x.accuracy, 1455 ) 1456 1457 def to_protocol(self) -> p.Position: 1458 """@private (Protocol helper)""" 1459 return p.Position( 1460 latitude=self.latitude, 1461 longitude=self.longitude, 1462 altitude=self.altitude, 1463 speed=self.speed, 1464 heading=self.heading, 1465 time=self.time, 1466 accuracy=self.accuracy, 1467 )
A data object for representing GPS positions