benlink.command
Overview
This module defines a low-level interface for communicating with a radio over BLE or Rfcomm.
Unless you know what you are doing, you probably want to use the
benlink.controller
module instead.
Messages
The RadioMessage
type is a union of all the possible messages that can
sent or received from the radio.
It RadioMessage
s are divided into three categories:
CommandMessage
: Messages that are sent to the radio to request information or to change settings.ReplyMessage
: Messages that are received in response to aCommandMessage
.EventMessage
: Messages that are received from the radio to indicate that an event has occurred. (e.g. a channel has changed, a packet has been received)
Data
The data objects (e.g. DeviceInfo
, Settings
) are used to represent
the data that is sent or received in the messages. Some of these data
objects have accompanying Args
types that are used in the API to allow
for functions that take keyword arguments to set these parameters.
1""" 2# Overview 3 4This module defines a low-level interface for communicating 5with a radio over BLE or Rfcomm. 6 7Unless you know what you are doing, you probably want to use the 8`benlink.controller` module instead. 9 10# Messages 11 12The `RadioMessage` type is a union of all the possible messages that can 13sent or received from the radio. 14 15It `RadioMessage`s are divided into three categories: 16 171. `CommandMessage`: Messages that are sent to the radio to request 18 information or to change settings. 19 202. `ReplyMessage`: Messages that are received in response to a 21 `CommandMessage`. 22 233. `EventMessage`: Messages that are received from the radio to indicate 24 that an event has occurred. (e.g. a channel has changed, a packet has 25 been received) 26 27# Data 28 29The data objects (e.g. `DeviceInfo`, `Settings`) are used to represent 30the data that is sent or received in the messages. Some of these data 31objects have accompanying `Args` types that are used in the API to allow 32for functions that take keyword arguments to set these parameters. 33""" 34 35from __future__ import annotations 36import typing as t 37import asyncio 38from pydantic import BaseModel, ConfigDict 39from . import protocol as p 40from .link import CommandLink, BleCommandLink, RfcommCommandLink 41from datetime import datetime 42 43RADIO_SERVICE_UUID = "00001100-d102-11e1-9b23-00025b00a5a5" 44"""@private""" 45 46RADIO_WRITE_UUID = "00001101-d102-11e1-9b23-00025b00a5a5" 47"""@private""" 48 49RADIO_INDICATE_UUID = "00001102-d102-11e1-9b23-00025b00a5a5" 50"""@private""" 51 52 53class CommandConnection: 54 _link: CommandLink 55 _handlers: t.List[RadioMessageHandler] = [] 56 57 def __init__(self, link: CommandLink): 58 self._link = link 59 self._handlers = [] 60 61 @classmethod 62 def new_ble(cls, device_uuid: str) -> CommandConnection: 63 return cls(BleCommandLink(device_uuid)) 64 65 @classmethod 66 def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection: 67 return cls(RfcommCommandLink(device_uuid, channel)) 68 69 def is_connected(self) -> bool: 70 return self._link.is_connected() 71 72 async def connect(self) -> None: 73 await self._link.connect(self._on_recv) 74 75 async def disconnect(self) -> None: 76 self._handlers.clear() 77 await self._link.disconnect() 78 79 async def send_bytes(self, data: bytes) -> None: 80 await self._link.send_bytes(data) 81 82 async def send_message(self, command: CommandMessage) -> None: 83 await self._link.send(command_message_to_protocol(command)) 84 85 async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError: 86 queue: asyncio.Queue[RadioMessageT | 87 MessageReplyError] = asyncio.Queue() 88 89 def reply_handler(reply: RadioMessage): 90 if ( 91 isinstance(reply, expect) or 92 ( 93 isinstance(reply, MessageReplyError) and 94 reply.message_type is expect 95 ) 96 ): 97 queue.put_nowait(reply) 98 99 remove_handler = self._add_message_handler(reply_handler) 100 101 await self.send_message(command) 102 103 out = await queue.get() 104 105 remove_handler() 106 107 return out 108 109 def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]: 110 def event_handler(msg: RadioMessage): 111 if isinstance(msg, EventMessage): 112 handler(msg) 113 return self._add_message_handler(event_handler) 114 115 def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]: 116 self._handlers.append(handler) 117 118 def remove_handler(): 119 self._handlers.remove(handler) 120 121 return remove_handler 122 123 def _on_recv(self, msg: p.Message) -> None: 124 radio_message = radio_message_from_protocol(msg) 125 for handler in self._handlers: 126 handler(radio_message) 127 128 # Command API 129 130 async def enable_event(self, event_type: EventType) -> None: 131 """Enable an event""" 132 # This event doesn't get a reply version of EnableEvent. It does, however 133 # does trigger a StatusChangedEvent... but we don't wait for it here. 134 # Instead, we just fire and forget 135 await self.send_message(EnableEvent(event_type)) 136 137 async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None: 138 """Send Tnc data""" 139 reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply) 140 if isinstance(reply, MessageReplyError): 141 raise reply.as_exception() 142 143 async def get_beacon_settings(self) -> BeaconSettings: 144 """Get the current packet settings""" 145 reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply) 146 if isinstance(reply, MessageReplyError): 147 raise reply.as_exception() 148 return reply.tnc_settings 149 150 async def set_beacon_settings(self, packet_settings: BeaconSettings): 151 """Set the packet settings""" 152 reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply) 153 if isinstance(reply, MessageReplyError): 154 raise reply.as_exception() 155 156 async def get_battery_level(self) -> int: 157 """Get the battery level""" 158 reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply) 159 if isinstance(reply, MessageReplyError): 160 raise reply.as_exception() 161 return reply.battery_level 162 163 async def get_battery_level_as_percentage(self) -> int: 164 """Get the battery level as a percentage""" 165 reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply) 166 if isinstance(reply, MessageReplyError): 167 raise reply.as_exception() 168 return reply.battery_level_as_percentage 169 170 async def get_rc_battery_level(self) -> int: 171 """Get the RC battery level""" 172 reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply) 173 if isinstance(reply, MessageReplyError): 174 raise reply.as_exception() 175 return reply.rc_battery_level 176 177 async def get_battery_voltage(self) -> float: 178 """Get the battery voltage""" 179 reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply) 180 if isinstance(reply, MessageReplyError): 181 raise reply.as_exception() 182 return reply.battery_voltage 183 184 async def get_device_info(self) -> DeviceInfo: 185 """Get the device info""" 186 reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply) 187 if isinstance(reply, MessageReplyError): 188 raise reply.as_exception() 189 return reply.device_info 190 191 async def get_settings(self) -> Settings: 192 """Get the settings""" 193 reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply) 194 if isinstance(reply, MessageReplyError): 195 raise reply.as_exception() 196 return reply.settings 197 198 async def set_settings(self, settings: Settings) -> None: 199 """Set the settings""" 200 reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply) 201 if isinstance(reply, MessageReplyError): 202 raise reply.as_exception() 203 204 async def get_channel(self, channel_id: int) -> Channel: 205 """Get a channel""" 206 reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply) 207 if isinstance(reply, MessageReplyError): 208 raise reply.as_exception() 209 return reply.channel 210 211 async def set_channel(self, channel: Channel): 212 """Set a channel""" 213 reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply) 214 if isinstance(reply, MessageReplyError): 215 raise reply.as_exception() 216 217 async def get_status(self) -> Status: 218 """Get the radio status""" 219 reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply) 220 if isinstance(reply, MessageReplyError): 221 raise reply.as_exception() 222 return reply.status 223 224 async def get_position(self) -> Position: 225 """Get the radio position""" 226 reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply) 227 if isinstance(reply, MessageReplyError): 228 raise reply.as_exception() 229 return reply.position 230 231 async def __aenter__(self): 232 await self.connect() 233 return self 234 235 async def __aexit__( 236 self, 237 exc_type: t.Any, 238 exc_value: t.Any, 239 traceback: t.Any, 240 ) -> None: 241 await self.disconnect() 242 243 244class ImmutableBaseModel(BaseModel): 245 """@private (A base class for immutable data objects)""" 246 247 model_config = ConfigDict(frozen=True) 248 """@private""" 249 250 251def command_message_to_protocol(m: CommandMessage) -> p.Message: 252 """@private (Protocol helper)""" 253 match m: 254 case EnableEvent(event_type): 255 return p.Message( 256 command_group=p.CommandGroup.BASIC, 257 is_reply=False, 258 command=p.BasicCommand.REGISTER_NOTIFICATION, 259 body=p.RegisterNotificationBody( 260 event_type=p.EventType[event_type], 261 ) 262 ) 263 case SendTncDataFragment(tnc_data_fragment): 264 return p.Message( 265 command_group=p.CommandGroup.BASIC, 266 is_reply=False, 267 command=p.BasicCommand.HT_SEND_DATA, 268 body=p.HTSendDataBody( 269 tnc_data_fragment=tnc_data_fragment.to_protocol() 270 ) 271 ) 272 case GetBeaconSettings(): 273 return p.Message( 274 command_group=p.CommandGroup.BASIC, 275 is_reply=False, 276 command=p.BasicCommand.READ_BSS_SETTINGS, 277 body=p.ReadBSSSettingsBody() 278 ) 279 case SetBeaconSettings(packet_settings): 280 return p.Message( 281 command_group=p.CommandGroup.BASIC, 282 is_reply=False, 283 command=p.BasicCommand.WRITE_BSS_SETTINGS, 284 body=p.WriteBSSSettingsBody( 285 bss_settings=packet_settings.to_protocol() 286 ) 287 ) 288 case GetSettings(): 289 return p.Message( 290 command_group=p.CommandGroup.BASIC, 291 is_reply=False, 292 command=p.BasicCommand.READ_SETTINGS, 293 body=p.ReadSettingsBody() 294 ) 295 case SetSettings(settings): 296 return p.Message( 297 command_group=p.CommandGroup.BASIC, 298 is_reply=False, 299 command=p.BasicCommand.WRITE_SETTINGS, 300 body=p.WriteSettingsBody( 301 settings=settings.to_protocol() 302 ) 303 ) 304 case GetDeviceInfo(): 305 return p.Message( 306 command_group=p.CommandGroup.BASIC, 307 is_reply=False, 308 command=p.BasicCommand.GET_DEV_INFO, 309 body=p.GetDevInfoBody() 310 ) 311 case GetChannel(channel_id): 312 return p.Message( 313 command_group=p.CommandGroup.BASIC, 314 is_reply=False, 315 command=p.BasicCommand.READ_RF_CH, 316 body=p.ReadRFChBody(channel_id=channel_id) 317 ) 318 case SetChannel(channel): 319 return p.Message( 320 command_group=p.CommandGroup.BASIC, 321 is_reply=False, 322 command=p.BasicCommand.WRITE_RF_CH, 323 body=p.WriteRFChBody( 324 rf_ch=channel.to_protocol() 325 ) 326 ) 327 case GetBatteryVoltage(): 328 return p.Message( 329 command_group=p.CommandGroup.BASIC, 330 is_reply=False, 331 command=p.BasicCommand.READ_STATUS, 332 body=p.ReadPowerStatusBody( 333 status_type=p.PowerStatusType.BATTERY_VOLTAGE 334 ) 335 ) 336 case GetBatteryLevel(): 337 return p.Message( 338 command_group=p.CommandGroup.BASIC, 339 is_reply=False, 340 command=p.BasicCommand.READ_STATUS, 341 body=p.ReadPowerStatusBody( 342 status_type=p.PowerStatusType.BATTERY_LEVEL 343 ) 344 ) 345 case GetBatteryLevelAsPercentage(): 346 return p.Message( 347 command_group=p.CommandGroup.BASIC, 348 is_reply=False, 349 command=p.BasicCommand.READ_STATUS, 350 body=p.ReadPowerStatusBody( 351 status_type=p.PowerStatusType.BATTERY_LEVEL_AS_PERCENTAGE 352 ) 353 ) 354 case GetRCBatteryLevel(): 355 return p.Message( 356 command_group=p.CommandGroup.BASIC, 357 is_reply=False, 358 command=p.BasicCommand.READ_STATUS, 359 body=p.ReadPowerStatusBody( 360 status_type=p.PowerStatusType.RC_BATTERY_LEVEL 361 ) 362 ) 363 case GetStatus(): 364 return p.Message( 365 command_group=p.CommandGroup.BASIC, 366 is_reply=False, 367 command=p.BasicCommand.GET_HT_STATUS, 368 body=p.GetHtStatusBody() 369 ) 370 case GetPosition(): 371 return p.Message( 372 command_group=p.CommandGroup.BASIC, 373 is_reply=False, 374 command=p.BasicCommand.GET_POSITION, 375 body=p.GetPositionBody() 376 ) 377 378 379def radio_message_from_protocol(mf: p.Message) -> RadioMessage: 380 """@private (Protocol helper)""" 381 match mf.body: 382 case p.GetPositionReplyBody(reply_status=reply_status, position=position): 383 if position is None: 384 return MessageReplyError( 385 message_type=GetPositionReply, 386 reason=reply_status.name 387 ) 388 return GetPositionReply(Position.from_protocol(position)) 389 case p.GetHtStatusReplyBody(reply_status=reply_status, status=status): 390 if status is None: 391 return MessageReplyError( 392 message_type=GetStatusReply, 393 reason=reply_status.name, 394 ) 395 return GetStatusReply(Status.from_protocol(status)) 396 case p.HTSendDataReplyBody( 397 reply_status=reply_status 398 ): 399 if reply_status != p.ReplyStatus.SUCCESS: 400 return MessageReplyError( 401 message_type=SendTncDataFragmentReply, 402 reason=reply_status.name, 403 ) 404 return SendTncDataFragmentReply() 405 case p.ReadBSSSettingsReplyBody( 406 reply_status=reply_status, 407 bss_settings=bss_settings, 408 ): 409 if bss_settings is None: 410 return MessageReplyError( 411 message_type=GetBeaconSettingsReply, 412 reason=reply_status.name, 413 ) 414 415 return GetBeaconSettingsReply(BeaconSettings.from_protocol(bss_settings)) 416 case p.WriteBSSSettingsReplyBody( 417 reply_status=reply_status, 418 ): 419 if reply_status != p.ReplyStatus.SUCCESS: 420 return MessageReplyError( 421 message_type=SetBeaconSettingsReply, 422 reason=reply_status.name, 423 ) 424 return SetBeaconSettingsReply() 425 case p.ReadPowerStatusReplyBody( 426 reply_status=reply_status, 427 status=power_status 428 ): 429 if power_status is None: 430 return MessageReplyError( 431 message_type=GetBatteryVoltageReply, 432 reason=reply_status.name, 433 ) 434 435 match power_status.value: 436 case p.BatteryVoltageStatus( 437 battery_voltage=battery_voltage 438 ): 439 return GetBatteryVoltageReply( 440 battery_voltage=battery_voltage 441 ) 442 case p.BatteryLevelPercentageStatus( 443 battery_level_as_percentage=battery_level_as_percentage 444 ): 445 return GetBatteryLevelAsPercentageReply( 446 battery_level_as_percentage=battery_level_as_percentage 447 ) 448 case p.BatteryLevelStatus( 449 battery_level=battery_level 450 ): 451 return GetBatteryLevelReply( 452 battery_level=battery_level 453 ) 454 case p.RCBatteryLevelStatus( 455 rc_battery_level=rc_battery_level 456 ): 457 return GetRCBatteryLevelReply( 458 rc_battery_level=rc_battery_level 459 ) 460 case p.EventNotificationBody( 461 event=event 462 ): 463 match event: 464 case p.HTSettingsChangedEvent( 465 settings=settings 466 ): 467 return SettingsChangedEvent(Settings.from_protocol(settings)) 468 case p.DataRxdEvent( 469 tnc_data_fragment=tnc_data_fragment 470 ): 471 return TncDataFragmentReceivedEvent( 472 tnc_data_fragment=TncDataFragment.from_protocol( 473 tnc_data_fragment 474 ) 475 ) 476 case p.HTChChangedEvent( 477 rf_ch=rf_ch 478 ): 479 return ChannelChangedEvent(Channel.from_protocol(rf_ch)) 480 case p.HTStatusChangedEvent( 481 status=status 482 ): 483 return StatusChangedEvent(Status.from_protocol(status)) 484 case _: 485 return UnknownProtocolMessage(mf) 486 case p.ReadSettingsReplyBody( 487 reply_status=reply_status, 488 settings=settings 489 ): 490 if settings is None: 491 return MessageReplyError( 492 message_type=GetSettingsReply, 493 reason=reply_status.name, 494 ) 495 return GetSettingsReply(Settings.from_protocol(settings)) 496 case p.WriteSettingsReplyBody( 497 reply_status=reply_status, 498 ): 499 if reply_status != p.ReplyStatus.SUCCESS: 500 return MessageReplyError( 501 message_type=SetSettingsReply, 502 reason=reply_status.name, 503 ) 504 return SetSettingsReply() 505 case p.GetDevInfoReplyBody( 506 reply_status=reply_status, 507 dev_info=dev_info 508 ): 509 if dev_info is None: 510 return MessageReplyError( 511 message_type=GetDeviceInfoReply, 512 reason=reply_status.name, 513 ) 514 return GetDeviceInfoReply(DeviceInfo.from_protocol(dev_info)) 515 case p.ReadRFChReplyBody( 516 reply_status=reply_status, 517 rf_ch=rf_ch 518 ): 519 if rf_ch is None: 520 return MessageReplyError( 521 message_type=GetChannelReply, 522 reason=reply_status.name, 523 ) 524 return GetChannelReply(Channel.from_protocol(rf_ch)) 525 case p.WriteRFChReplyBody( 526 reply_status=reply_status, 527 ): 528 if reply_status != p.ReplyStatus.SUCCESS: 529 return MessageReplyError( 530 message_type=SetChannelReply, 531 reason=reply_status.name, 532 ) 533 return SetChannelReply() 534 535 case _: 536 return UnknownProtocolMessage(mf) 537 538 539##################### 540# CommandMessage 541 542class EnableEvent(t.NamedTuple): 543 event_type: EventType 544 545 546class GetBeaconSettings(t.NamedTuple): 547 pass 548 549 550class SetBeaconSettings(t.NamedTuple): 551 tnc_settings: BeaconSettings 552 553 554class SetSettings(t.NamedTuple): 555 settings: Settings 556 557 558class GetBatteryLevelAsPercentage(t.NamedTuple): 559 pass 560 561 562class GetRCBatteryLevel(t.NamedTuple): 563 pass 564 565 566class GetBatteryLevel(t.NamedTuple): 567 pass 568 569 570class GetBatteryVoltage(t.NamedTuple): 571 pass 572 573 574class GetDeviceInfo(t.NamedTuple): 575 pass 576 577 578class GetChannel(t.NamedTuple): 579 channel_id: int 580 581 582class SetChannel(t.NamedTuple): 583 channel: Channel 584 585 586class GetSettings(t.NamedTuple): 587 pass 588 589 590class GetStatus(t.NamedTuple): 591 pass 592 593 594class GetPosition(t.NamedTuple): 595 pass 596 597 598class SendTncDataFragment(t.NamedTuple): 599 tnc_data_fragment: TncDataFragment 600 601 602CommandMessage = t.Union[ 603 GetBeaconSettings, 604 SetBeaconSettings, 605 GetRCBatteryLevel, 606 GetBatteryLevelAsPercentage, 607 GetBatteryLevel, 608 GetBatteryVoltage, 609 GetDeviceInfo, 610 GetChannel, 611 SetChannel, 612 GetSettings, 613 SetSettings, 614 SendTncDataFragment, 615 EnableEvent, 616 GetStatus, 617 GetPosition, 618] 619 620##################### 621# ReplyMessage 622 623 624class SendTncDataFragmentReply(t.NamedTuple): 625 pass 626 627 628class GetBeaconSettingsReply(t.NamedTuple): 629 tnc_settings: BeaconSettings 630 631 632class SetBeaconSettingsReply(t.NamedTuple): 633 pass 634 635 636class SetSettingsReply(t.NamedTuple): 637 pass 638 639 640class GetBatteryLevelAsPercentageReply(t.NamedTuple): 641 battery_level_as_percentage: int 642 643 644class GetRCBatteryLevelReply(t.NamedTuple): 645 rc_battery_level: int 646 647 648class GetBatteryLevelReply(t.NamedTuple): 649 battery_level: int 650 651 652class GetBatteryVoltageReply(t.NamedTuple): 653 battery_voltage: float 654 655 656class GetDeviceInfoReply(t.NamedTuple): 657 device_info: DeviceInfo 658 659 660class GetChannelReply(t.NamedTuple): 661 channel: Channel 662 663 664class SetChannelReply(t.NamedTuple): 665 pass 666 667 668class GetStatusReply(t.NamedTuple): 669 status: Status 670 671 672class GetSettingsReply(t.NamedTuple): 673 settings: Settings 674 675 676class GetPositionReply(t.NamedTuple): 677 position: Position 678 679 680ReplyStatus = t.Literal[ 681 "SUCCESS", 682 "NOT_SUPPORTED", 683 "NOT_AUTHENTICATED", 684 "INSUFFICIENT_RESOURCES", 685 "AUTHENTICATING", 686 "INVALID_PARAMETER", 687 "INCORRECT_STATE", 688 "IN_PROGRESS", 689] 690 691 692class MessageReplyError(t.NamedTuple): 693 message_type: t.Type[t.Any] 694 reason: ReplyStatus 695 696 def as_exception(self): 697 return ValueError(f"{self.message_type.__name__} failed: {self.reason}") 698 699 700ReplyMessage = t.Union[ 701 GetBeaconSettingsReply, 702 SetBeaconSettingsReply, 703 GetBatteryLevelAsPercentageReply, 704 GetRCBatteryLevelReply, 705 GetBatteryLevelReply, 706 GetBatteryVoltageReply, 707 GetDeviceInfoReply, 708 GetChannelReply, 709 SetChannelReply, 710 GetSettingsReply, 711 SetSettingsReply, 712 SendTncDataFragmentReply, 713 GetStatusReply, 714 GetPositionReply, 715 MessageReplyError, 716] 717 718##################### 719# EventMessage 720 721 722class StatusChangedEvent(t.NamedTuple): 723 status: Status 724 725 726class ChannelChangedEvent(t.NamedTuple): 727 channel: Channel 728 729 730class TncDataFragmentReceivedEvent(t.NamedTuple): 731 tnc_data_fragment: TncDataFragment 732 733 734class SettingsChangedEvent(t.NamedTuple): 735 settings: Settings 736 737 738class UnknownProtocolMessage(t.NamedTuple): 739 message: p.Message 740 741 742EventMessage = t.Union[ 743 TncDataFragmentReceivedEvent, 744 SettingsChangedEvent, 745 ChannelChangedEvent, 746 StatusChangedEvent, 747 UnknownProtocolMessage, 748] 749 750EventType = t.Literal[ 751 "HT_STATUS_CHANGED", 752 "DATA_RXD", 753 "NEW_INQUIRY_DATA", 754 "RESTORE_FACTORY_SETTINGS", 755 "HT_CH_CHANGED", 756 "HT_SETTINGS_CHANGED", 757 "RINGING_STOPPED" 758 "RADIO_STATUS_CHANGED", 759 "USER_ACTION", 760 "SYSTEM_EVENT", 761 "BSS_SETTINGS_CHANGED", 762 "DATA_TXD", 763 "POSITION_CHANGED" 764] 765 766RadioMessage = ReplyMessage | EventMessage 767 768RadioMessageT = t.TypeVar("RadioMessageT", bound=RadioMessage) 769 770##################### 771# Handlers 772 773RadioMessageHandler = t.Callable[[RadioMessage], None] 774"""@private""" 775 776EventHandler = t.Callable[[EventMessage], None] 777"""@private""" 778 779##################### 780# Protocol to data object conversions 781 782 783class IntSplit(t.NamedTuple): 784 """@private (A helper for working with integers split into upper and lower parts)""" 785 786 n_upper: int 787 n_lower: int 788 789 def from_parts(self, upper: int, lower: int) -> int: 790 if upper >= 1 << self.n_upper: 791 raise ValueError( 792 f"Upper part {upper} is too large for {self.n_upper} bits") 793 if lower >= 1 << self.n_lower: 794 raise ValueError( 795 f"Lower part {lower} is too large for {self.n_lower} bits") 796 797 return (upper << self.n_lower) | lower 798 799 def get_upper(self, n: int) -> int: 800 if n >= 1 << (self.n_upper + self.n_lower): 801 raise ValueError( 802 f"Value {n} is too large for {self.n_upper + self.n_lower} bits" 803 ) 804 return n >> self.n_lower 805 806 def get_lower(self, n: int) -> int: 807 if n >= 1 << (self.n_upper + self.n_lower): 808 raise ValueError( 809 f"Value {n} is too large for {self.n_upper + self.n_lower} bits" 810 ) 811 return n & ((1 << self.n_lower) - 1) 812 813 814class TncDataFragment(ImmutableBaseModel): 815 """A data object representing a message packet""" 816 is_final_fragment: bool 817 fragment_id: int 818 data: bytes 819 channel_id: int | None = None 820 821 @classmethod 822 def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment: 823 """@private (Protocol helper)""" 824 return TncDataFragment( 825 is_final_fragment=mp.is_final_fragment, 826 fragment_id=mp.fragment_id, 827 data=mp.data, 828 channel_id=mp.channel_id 829 ) 830 831 def to_protocol(self) -> p.TncDataFragment: 832 """@private (Protocol helper)""" 833 return p.TncDataFragment( 834 is_final_fragment=self.is_final_fragment, 835 with_channel_id=self.channel_id is not None, 836 fragment_id=self.fragment_id, 837 data=self.data, 838 channel_id=self.channel_id 839 ) 840 841 842ModulationType = t.Literal["AM", "FM", "DMR"] 843 844BandwidthType = t.Literal["NARROW", "WIDE"] 845 846 847class DCS(t.NamedTuple): 848 """A type for setting Digital Coded Squelch (DCS) on channels""" 849 850 n: int 851 """The DCS Normal (N) code""" 852 853 854def sub_audio_from_protocol(x: float | p.DCS | None) -> float | DCS | None: 855 """@private (Protocol helper)""" 856 match x: 857 case p.DCS(n): 858 return DCS(n=n) 859 case _: 860 return x 861 862 863def sub_audio_to_protocol(x: float | DCS | None) -> float | p.DCS | None: 864 """@private (Protocol helper)""" 865 match x: 866 case DCS(n): 867 return p.DCS(n=n) 868 case _: 869 return x 870 871 872class ChannelArgs(t.TypedDict, total=False): 873 """A dictionary of the parameters that can be set on a channel""" 874 tx_mod: ModulationType 875 tx_freq: float 876 rx_mod: ModulationType 877 rx_freq: float 878 tx_sub_audio: float | DCS | None 879 rx_sub_audio: float | DCS | None 880 scan: bool 881 tx_at_max_power: bool 882 talk_around: bool 883 bandwidth: BandwidthType 884 pre_de_emph_bypass: bool 885 sign: bool 886 tx_at_med_power: bool 887 tx_disable: bool 888 fixed_freq: bool 889 fixed_bandwidth: bool 890 fixed_tx_power: bool 891 mute: bool 892 name: str 893 894 895class Channel(ImmutableBaseModel): 896 """A data object representing a radio channel""" 897 channel_id: int 898 tx_mod: ModulationType 899 tx_freq: float 900 rx_mod: ModulationType 901 rx_freq: float 902 tx_sub_audio: float | DCS | None 903 rx_sub_audio: float | DCS | None 904 scan: bool 905 tx_at_max_power: bool 906 talk_around: bool 907 bandwidth: BandwidthType 908 pre_de_emph_bypass: bool 909 sign: bool 910 tx_at_med_power: bool 911 tx_disable: bool 912 fixed_freq: bool 913 fixed_bandwidth: bool 914 fixed_tx_power: bool 915 mute: bool 916 name: str 917 918 @classmethod 919 def from_protocol(cls, cs: p.RfCh) -> Channel: 920 """@private (Protocol helper)""" 921 return Channel( 922 channel_id=cs.channel_id, 923 tx_mod=cs.tx_mod.name, 924 tx_freq=cs.tx_freq, 925 rx_mod=cs.rx_mod.name, 926 rx_freq=cs.rx_freq, 927 tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio), 928 rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio), 929 scan=cs.scan, 930 tx_at_max_power=cs.tx_at_max_power, 931 talk_around=cs.talk_around, 932 bandwidth=cs.bandwidth.name, 933 pre_de_emph_bypass=cs.pre_de_emph_bypass, 934 sign=cs.sign, 935 tx_at_med_power=cs.tx_at_med_power, 936 tx_disable=cs.tx_disable, 937 fixed_freq=cs.fixed_freq, 938 fixed_bandwidth=cs.fixed_bandwidth, 939 fixed_tx_power=cs.fixed_tx_power, 940 mute=cs.mute, 941 name=cs.name_str 942 ) 943 944 def to_protocol(self) -> p.RfCh: 945 """@private (Protocol helper)""" 946 return p.RfCh( 947 channel_id=self.channel_id, 948 tx_mod=p.ModulationType[self.tx_mod], 949 tx_freq=self.tx_freq, 950 rx_mod=p.ModulationType[self.rx_mod], 951 rx_freq=self.rx_freq, 952 tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio), 953 rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio), 954 scan=self.scan, 955 tx_at_max_power=self.tx_at_max_power, 956 talk_around=self.talk_around, 957 bandwidth=p.BandwidthType[self.bandwidth], 958 pre_de_emph_bypass=self.pre_de_emph_bypass, 959 sign=self.sign, 960 tx_at_med_power=self.tx_at_med_power, 961 tx_disable=self.tx_disable, 962 fixed_freq=self.fixed_freq, 963 fixed_bandwidth=self.fixed_bandwidth, 964 fixed_tx_power=self.fixed_tx_power, 965 mute=self.mute, 966 name_str=self.name 967 ) 968 969 970class SettingsArgs(t.TypedDict, total=False): 971 """A dictionary of the parameters that can be set in the radio settings""" 972 channel_a: int 973 channel_b: int 974 scan: bool 975 aghfp_call_mode: int 976 double_channel: int 977 squelch_level: int 978 tail_elim: bool 979 auto_relay_en: bool 980 auto_power_on: bool 981 keep_aghfp_link: bool 982 mic_gain: int 983 tx_hold_time: int 984 tx_time_limit: int 985 local_speaker: int 986 bt_mic_gain: int 987 adaptive_response: bool 988 dis_tone: bool 989 power_saving_mode: bool 990 auto_power_off: int 991 auto_share_loc_ch: int | t.Literal["current"] 992 hm_speaker: int 993 positioning_system: int 994 time_offset: int 995 use_freq_range_2: bool 996 ptt_lock: bool 997 leading_sync_bit_en: bool 998 pairing_at_power_on: bool 999 screen_timeout: int 1000 vfo_x: int 1001 imperial_unit: bool 1002 wx_mode: int 1003 noaa_ch: int 1004 vfol_tx_power_x: int 1005 vfo2_tx_power_x: int 1006 dis_digital_mute: bool 1007 signaling_ecc_en: bool 1008 ch_data_lock: bool 1009 vfo1_mod_freq_x: int 1010 vfo2_mod_freq_x: int 1011 1012 1013class Settings(ImmutableBaseModel): 1014 """A data object representing the radio settings""" 1015 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1016 channel_a: int 1017 channel_b: int 1018 scan: bool 1019 aghfp_call_mode: int 1020 double_channel: int 1021 squelch_level: int 1022 tail_elim: bool 1023 auto_relay_en: bool 1024 auto_power_on: bool 1025 keep_aghfp_link: bool 1026 mic_gain: int 1027 tx_hold_time: int 1028 tx_time_limit: int 1029 local_speaker: int 1030 bt_mic_gain: int 1031 adaptive_response: bool 1032 dis_tone: bool 1033 power_saving_mode: bool 1034 auto_power_off: int 1035 auto_share_loc_ch: int | t.Literal["current"] 1036 hm_speaker: int 1037 positioning_system: int 1038 time_offset: int 1039 use_freq_range_2: bool 1040 ptt_lock: bool 1041 leading_sync_bit_en: bool 1042 pairing_at_power_on: bool 1043 screen_timeout: int 1044 vfo_x: int 1045 imperial_unit: bool 1046 wx_mode: int 1047 noaa_ch: int 1048 vfol_tx_power_x: int 1049 vfo2_tx_power_x: int 1050 dis_digital_mute: bool 1051 signaling_ecc_en: bool 1052 ch_data_lock: bool 1053 vfo1_mod_freq_x: int 1054 vfo2_mod_freq_x: int 1055 1056 @classmethod 1057 def from_protocol(cls, rs: p.Settings) -> Settings: 1058 """@private (Protocol helper)""" 1059 return Settings( 1060 channel_a=cls._channel_split.from_parts( 1061 rs.channel_a_upper, rs.channel_a_lower 1062 ), 1063 channel_b=cls._channel_split.from_parts( 1064 rs.channel_b_upper, rs.channel_b_lower 1065 ), 1066 scan=rs.scan, 1067 aghfp_call_mode=rs.aghfp_call_mode, 1068 double_channel=rs.double_channel, 1069 squelch_level=rs.squelch_level, 1070 tail_elim=rs.tail_elim, 1071 auto_relay_en=rs.auto_relay_en, 1072 auto_power_on=rs.auto_power_on, 1073 keep_aghfp_link=rs.keep_aghfp_link, 1074 mic_gain=rs.mic_gain, 1075 tx_hold_time=rs.tx_hold_time, 1076 tx_time_limit=rs.tx_time_limit, 1077 local_speaker=rs.local_speaker, 1078 bt_mic_gain=rs.bt_mic_gain, 1079 adaptive_response=rs.adaptive_response, 1080 dis_tone=rs.dis_tone, 1081 power_saving_mode=rs.power_saving_mode, 1082 auto_power_off=rs.auto_power_off, 1083 auto_share_loc_ch=rs.auto_share_loc_ch, 1084 hm_speaker=rs.hm_speaker, 1085 positioning_system=rs.positioning_system, 1086 time_offset=rs.time_offset, 1087 use_freq_range_2=rs.use_freq_range_2, 1088 ptt_lock=rs.ptt_lock, 1089 leading_sync_bit_en=rs.leading_sync_bit_en, 1090 pairing_at_power_on=rs.pairing_at_power_on, 1091 screen_timeout=rs.screen_timeout, 1092 vfo_x=rs.vfo_x, 1093 imperial_unit=rs.imperial_unit, 1094 wx_mode=rs.wx_mode, 1095 noaa_ch=rs.noaa_ch, 1096 vfol_tx_power_x=rs.vfol_tx_power_x, 1097 vfo2_tx_power_x=rs.vfo2_tx_power_x, 1098 dis_digital_mute=rs.dis_digital_mute, 1099 signaling_ecc_en=rs.signaling_ecc_en, 1100 ch_data_lock=rs.ch_data_lock, 1101 vfo1_mod_freq_x=rs.vfo1_mod_freq_x, 1102 vfo2_mod_freq_x=rs.vfo2_mod_freq_x 1103 ) 1104 1105 def to_protocol(self): 1106 """@private (Protocol helper)""" 1107 return p.Settings( 1108 channel_a_lower=self._channel_split.get_lower(self.channel_a), 1109 channel_b_lower=self._channel_split.get_lower(self.channel_b), 1110 scan=self.scan, 1111 aghfp_call_mode=self.aghfp_call_mode, 1112 double_channel=self.double_channel, 1113 squelch_level=self.squelch_level, 1114 tail_elim=self.tail_elim, 1115 auto_relay_en=self.auto_relay_en, 1116 auto_power_on=self.auto_power_on, 1117 keep_aghfp_link=self.keep_aghfp_link, 1118 mic_gain=self.mic_gain, 1119 tx_hold_time=self.tx_hold_time, 1120 tx_time_limit=self.tx_time_limit, 1121 local_speaker=self.local_speaker, 1122 bt_mic_gain=self.bt_mic_gain, 1123 adaptive_response=self.adaptive_response, 1124 dis_tone=self.dis_tone, 1125 power_saving_mode=self.power_saving_mode, 1126 auto_power_off=self.auto_power_off, 1127 auto_share_loc_ch=self.auto_share_loc_ch, 1128 hm_speaker=self.hm_speaker, 1129 positioning_system=self.positioning_system, 1130 time_offset=self.time_offset, 1131 use_freq_range_2=self.use_freq_range_2, 1132 ptt_lock=self.ptt_lock, 1133 leading_sync_bit_en=self.leading_sync_bit_en, 1134 pairing_at_power_on=self.pairing_at_power_on, 1135 screen_timeout=self.screen_timeout, 1136 vfo_x=self.vfo_x, 1137 imperial_unit=self.imperial_unit, 1138 channel_a_upper=self._channel_split.get_upper(self.channel_a), 1139 channel_b_upper=self._channel_split.get_upper(self.channel_b), 1140 wx_mode=self.wx_mode, 1141 noaa_ch=self.noaa_ch, 1142 vfol_tx_power_x=self.vfol_tx_power_x, 1143 vfo2_tx_power_x=self.vfo2_tx_power_x, 1144 dis_digital_mute=self.dis_digital_mute, 1145 signaling_ecc_en=self.signaling_ecc_en, 1146 ch_data_lock=self.ch_data_lock, 1147 vfo1_mod_freq_x=self.vfo1_mod_freq_x, 1148 vfo2_mod_freq_x=self.vfo2_mod_freq_x 1149 ) 1150 1151 1152class DeviceInfo(ImmutableBaseModel): 1153 """A data object representing the device information""" 1154 vendor_id: int 1155 product_id: int 1156 hardware_version: int 1157 firmware_version: int 1158 supports_radio: bool 1159 supports_medium_power: bool 1160 fixed_location_speaker_volume: bool 1161 has_speaker: bool 1162 has_hand_microphone_speaker: bool 1163 region_count: int 1164 supports_noaa: bool 1165 supports_gmrs: bool 1166 supports_vfo: bool 1167 supports_dmr: bool 1168 supports_software_power_control: bool 1169 channel_count: int 1170 frequency_range_count: int 1171 1172 @classmethod 1173 def from_protocol(cls, info: p.DevInfo) -> DeviceInfo: 1174 """@private (Protocol helper)""" 1175 return DeviceInfo( 1176 vendor_id=info.vendor_id, 1177 product_id=info.product_id, 1178 hardware_version=info.hw_ver, 1179 firmware_version=info.soft_ver, 1180 supports_radio=info.support_radio, 1181 supports_medium_power=info.support_medium_power, 1182 fixed_location_speaker_volume=info.fixed_loc_speaker_vol, 1183 supports_software_power_control=not info.not_support_soft_power_ctrl, 1184 has_speaker=not info.have_no_speaker, 1185 has_hand_microphone_speaker=info.have_hm_speaker, 1186 region_count=info.region_count, 1187 supports_noaa=info.support_noaa, 1188 supports_gmrs=info.gmrs, 1189 supports_vfo=info.support_vfo, 1190 supports_dmr=info.support_dmr, 1191 channel_count=info.channel_count, 1192 frequency_range_count=info.freq_range_count 1193 ) 1194 1195 def to_protocol(self) -> p.DevInfo: 1196 """@private (Protocol helper)""" 1197 return p.DevInfo( 1198 vendor_id=self.vendor_id, 1199 product_id=self.product_id, 1200 hw_ver=self.hardware_version, 1201 soft_ver=self.firmware_version, 1202 support_radio=self.supports_radio, 1203 support_medium_power=self.supports_medium_power, 1204 fixed_loc_speaker_vol=self.fixed_location_speaker_volume, 1205 not_support_soft_power_ctrl=not self.supports_software_power_control, 1206 have_no_speaker=not self.has_speaker, 1207 have_hm_speaker=self.has_hand_microphone_speaker, 1208 region_count=self.region_count, 1209 support_noaa=self.supports_noaa, 1210 gmrs=self.supports_gmrs, 1211 support_vfo=self.supports_vfo, 1212 support_dmr=self.supports_dmr, 1213 channel_count=self.channel_count, 1214 freq_range_count=self.frequency_range_count 1215 ) 1216 1217 1218class BeaconSettingsArgs(t.TypedDict, total=False): 1219 """A dictionary of the parameters that can be set in the beacon settings""" 1220 max_fwd_times: int 1221 time_to_live: int 1222 ptt_release_send_location: bool 1223 ptt_release_send_id_info: bool 1224 ptt_release_send_bss_user_id: bool 1225 should_share_location: bool 1226 send_pwr_voltage: bool 1227 packet_format: t.Literal["BSS", "APRS"] 1228 allow_position_check: bool 1229 aprs_ssid: int 1230 location_share_interval: int 1231 bss_user_id: int 1232 ptt_release_id_info: str 1233 beacon_message: str 1234 aprs_symbol: str 1235 aprs_callsign: str 1236 1237 1238class BeaconSettings(ImmutableBaseModel): 1239 """A data object representing the beacon settings""" 1240 _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32) 1241 max_fwd_times: int 1242 time_to_live: int 1243 ptt_release_send_location: bool 1244 ptt_release_send_id_info: bool 1245 ptt_release_send_bss_user_id: bool 1246 should_share_location: bool 1247 send_pwr_voltage: bool 1248 packet_format: t.Literal["BSS", "APRS"] 1249 allow_position_check: bool 1250 aprs_ssid: int 1251 location_share_interval: int 1252 bss_user_id: int 1253 ptt_release_id_info: str 1254 beacon_message: str 1255 aprs_symbol: str 1256 aprs_callsign: str 1257 1258 @classmethod 1259 def from_protocol(cls, bs: p.BSSSettingsExt | p.BSSSettings) -> BeaconSettings: 1260 """@private (Protocol helper)""" 1261 1262 if not isinstance(bs, p.BSSSettingsExt): 1263 raise ValueError( 1264 "Radio replied with old BSSSettings message version. Upgrade your firmware!" 1265 ) 1266 1267 return BeaconSettings( 1268 max_fwd_times=bs.max_fwd_times, 1269 time_to_live=bs.time_to_live, 1270 ptt_release_send_location=bs.ptt_release_send_location, 1271 ptt_release_send_id_info=bs.ptt_release_send_id_info, 1272 ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id, 1273 should_share_location=bs.should_share_location, 1274 send_pwr_voltage=bs.send_pwr_voltage, 1275 packet_format=bs.packet_format.name, 1276 allow_position_check=bs.allow_position_check, 1277 aprs_ssid=bs.aprs_ssid, 1278 location_share_interval=bs.location_share_interval, 1279 bss_user_id=cls._bss_user_id_split.from_parts( 1280 bs.bss_user_id_upper, bs.bss_user_id_lower 1281 ), 1282 ptt_release_id_info=bs.ptt_release_id_info, 1283 beacon_message=bs.beacon_message, 1284 aprs_symbol=bs.aprs_symbol, 1285 aprs_callsign=bs.aprs_callsign 1286 ) 1287 1288 def to_protocol(self) -> p.BSSSettingsExt: 1289 """@private (Protocol helper)""" 1290 return p.BSSSettingsExt( 1291 max_fwd_times=self.max_fwd_times, 1292 time_to_live=self.time_to_live, 1293 ptt_release_send_location=self.ptt_release_send_location, 1294 ptt_release_send_id_info=self.ptt_release_send_id_info, 1295 ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id, 1296 should_share_location=self.should_share_location, 1297 send_pwr_voltage=self.send_pwr_voltage, 1298 packet_format=p.PacketFormat[self.packet_format], 1299 allow_position_check=self.allow_position_check, 1300 aprs_ssid=self.aprs_ssid, 1301 location_share_interval=self.location_share_interval, 1302 bss_user_id_lower=self._bss_user_id_split.get_lower( 1303 self.bss_user_id 1304 ), 1305 ptt_release_id_info=self.ptt_release_id_info, 1306 beacon_message=self.beacon_message, 1307 aprs_symbol=self.aprs_symbol, 1308 aprs_callsign=self.aprs_callsign, 1309 bss_user_id_upper=self._bss_user_id_split.get_upper( 1310 self.bss_user_id 1311 ), 1312 ) 1313 1314 1315ChannelType = t.Literal["OFF", "A", "B"] 1316 1317 1318class Status(ImmutableBaseModel): 1319 """A data object representing the radio status""" 1320 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1321 is_power_on: bool 1322 is_in_tx: bool 1323 is_sq: bool 1324 is_in_rx: bool 1325 double_channel: ChannelType 1326 is_scan: bool 1327 is_radio: bool 1328 curr_ch_id: int 1329 is_gps_locked: bool 1330 is_hfp_connected: bool 1331 is_aoc_connected: bool 1332 rssi: float 1333 curr_region: int 1334 1335 @classmethod 1336 def from_protocol(cls, s: p.Status | p.StatusExt) -> Status: 1337 """@private (Protocol helper)""" 1338 if not isinstance(s, p.StatusExt): 1339 raise ValueError( 1340 "Radio replied with old Status message version. Upgrade your firmware!" 1341 ) 1342 1343 return Status( 1344 is_power_on=s.is_power_on, 1345 is_in_tx=s.is_in_tx, 1346 is_sq=s.is_sq, 1347 is_in_rx=s.is_in_rx, 1348 double_channel=s.double_channel.name, 1349 is_scan=s.is_scan, 1350 is_radio=s.is_radio, 1351 curr_ch_id=cls._channel_split.from_parts( 1352 s.curr_channel_id_upper, s.curr_ch_id_lower 1353 ), 1354 is_gps_locked=s.is_gps_locked, 1355 is_hfp_connected=s.is_hfp_connected, 1356 is_aoc_connected=s.is_aoc_connected, 1357 rssi=s.rssi, 1358 curr_region=s.curr_region 1359 ) 1360 1361 def to_protocol(self) -> p.StatusExt: 1362 """@private (Protocol helper)""" 1363 return p.StatusExt( 1364 is_power_on=self.is_power_on, 1365 is_in_tx=self.is_in_tx, 1366 is_sq=self.is_sq, 1367 is_in_rx=self.is_in_rx, 1368 double_channel=p.ChannelType[self.double_channel], 1369 is_scan=self.is_scan, 1370 is_radio=self.is_radio, 1371 curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id), 1372 is_gps_locked=self.is_gps_locked, 1373 is_hfp_connected=self.is_hfp_connected, 1374 is_aoc_connected=self.is_aoc_connected, 1375 rssi=self.rssi, 1376 curr_region=self.curr_region, 1377 curr_channel_id_upper=self._channel_split.get_upper( 1378 self.curr_ch_id) 1379 ) 1380 1381 1382class Position(ImmutableBaseModel): 1383 """A data object for representing GPS positions""" 1384 latitude: float 1385 longitude: float 1386 altitude: int | None 1387 speed: int | None 1388 heading: int | None 1389 time: datetime 1390 accuracy: int 1391 1392 @classmethod 1393 def from_protocol(cls, x: p.Position) -> Position: 1394 """@private (Protocol helper)""" 1395 return Position( 1396 latitude=x.latitude, 1397 longitude=x.longitude, 1398 altitude=x.altitude, 1399 speed=x.speed, 1400 heading=x.heading, 1401 time=x.time, 1402 accuracy=x.accuracy, 1403 ) 1404 1405 def to_protocol(self) -> p.Position: 1406 """@private (Protocol helper)""" 1407 return p.Position( 1408 latitude=self.latitude, 1409 longitude=self.longitude, 1410 altitude=self.altitude, 1411 speed=self.speed, 1412 heading=self.heading, 1413 time=self.time, 1414 accuracy=self.accuracy, 1415 )
54class CommandConnection: 55 _link: CommandLink 56 _handlers: t.List[RadioMessageHandler] = [] 57 58 def __init__(self, link: CommandLink): 59 self._link = link 60 self._handlers = [] 61 62 @classmethod 63 def new_ble(cls, device_uuid: str) -> CommandConnection: 64 return cls(BleCommandLink(device_uuid)) 65 66 @classmethod 67 def new_rfcomm(cls, device_uuid: str, channel: int | t.Literal["auto"] = "auto") -> CommandConnection: 68 return cls(RfcommCommandLink(device_uuid, channel)) 69 70 def is_connected(self) -> bool: 71 return self._link.is_connected() 72 73 async def connect(self) -> None: 74 await self._link.connect(self._on_recv) 75 76 async def disconnect(self) -> None: 77 self._handlers.clear() 78 await self._link.disconnect() 79 80 async def send_bytes(self, data: bytes) -> None: 81 await self._link.send_bytes(data) 82 83 async def send_message(self, command: CommandMessage) -> None: 84 await self._link.send(command_message_to_protocol(command)) 85 86 async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError: 87 queue: asyncio.Queue[RadioMessageT | 88 MessageReplyError] = asyncio.Queue() 89 90 def reply_handler(reply: RadioMessage): 91 if ( 92 isinstance(reply, expect) or 93 ( 94 isinstance(reply, MessageReplyError) and 95 reply.message_type is expect 96 ) 97 ): 98 queue.put_nowait(reply) 99 100 remove_handler = self._add_message_handler(reply_handler) 101 102 await self.send_message(command) 103 104 out = await queue.get() 105 106 remove_handler() 107 108 return out 109 110 def add_event_handler(self, handler: EventHandler) -> t.Callable[[], None]: 111 def event_handler(msg: RadioMessage): 112 if isinstance(msg, EventMessage): 113 handler(msg) 114 return self._add_message_handler(event_handler) 115 116 def _add_message_handler(self, handler: RadioMessageHandler) -> t.Callable[[], None]: 117 self._handlers.append(handler) 118 119 def remove_handler(): 120 self._handlers.remove(handler) 121 122 return remove_handler 123 124 def _on_recv(self, msg: p.Message) -> None: 125 radio_message = radio_message_from_protocol(msg) 126 for handler in self._handlers: 127 handler(radio_message) 128 129 # Command API 130 131 async def enable_event(self, event_type: EventType) -> None: 132 """Enable an event""" 133 # This event doesn't get a reply version of EnableEvent. It does, however 134 # does trigger a StatusChangedEvent... but we don't wait for it here. 135 # Instead, we just fire and forget 136 await self.send_message(EnableEvent(event_type)) 137 138 async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None: 139 """Send Tnc data""" 140 reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply) 141 if isinstance(reply, MessageReplyError): 142 raise reply.as_exception() 143 144 async def get_beacon_settings(self) -> BeaconSettings: 145 """Get the current packet settings""" 146 reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply) 147 if isinstance(reply, MessageReplyError): 148 raise reply.as_exception() 149 return reply.tnc_settings 150 151 async def set_beacon_settings(self, packet_settings: BeaconSettings): 152 """Set the packet settings""" 153 reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply) 154 if isinstance(reply, MessageReplyError): 155 raise reply.as_exception() 156 157 async def get_battery_level(self) -> int: 158 """Get the battery level""" 159 reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply) 160 if isinstance(reply, MessageReplyError): 161 raise reply.as_exception() 162 return reply.battery_level 163 164 async def get_battery_level_as_percentage(self) -> int: 165 """Get the battery level as a percentage""" 166 reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply) 167 if isinstance(reply, MessageReplyError): 168 raise reply.as_exception() 169 return reply.battery_level_as_percentage 170 171 async def get_rc_battery_level(self) -> int: 172 """Get the RC battery level""" 173 reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply) 174 if isinstance(reply, MessageReplyError): 175 raise reply.as_exception() 176 return reply.rc_battery_level 177 178 async def get_battery_voltage(self) -> float: 179 """Get the battery voltage""" 180 reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply) 181 if isinstance(reply, MessageReplyError): 182 raise reply.as_exception() 183 return reply.battery_voltage 184 185 async def get_device_info(self) -> DeviceInfo: 186 """Get the device info""" 187 reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply) 188 if isinstance(reply, MessageReplyError): 189 raise reply.as_exception() 190 return reply.device_info 191 192 async def get_settings(self) -> Settings: 193 """Get the settings""" 194 reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply) 195 if isinstance(reply, MessageReplyError): 196 raise reply.as_exception() 197 return reply.settings 198 199 async def set_settings(self, settings: Settings) -> None: 200 """Set the settings""" 201 reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply) 202 if isinstance(reply, MessageReplyError): 203 raise reply.as_exception() 204 205 async def get_channel(self, channel_id: int) -> Channel: 206 """Get a channel""" 207 reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply) 208 if isinstance(reply, MessageReplyError): 209 raise reply.as_exception() 210 return reply.channel 211 212 async def set_channel(self, channel: Channel): 213 """Set a channel""" 214 reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply) 215 if isinstance(reply, MessageReplyError): 216 raise reply.as_exception() 217 218 async def get_status(self) -> Status: 219 """Get the radio status""" 220 reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply) 221 if isinstance(reply, MessageReplyError): 222 raise reply.as_exception() 223 return reply.status 224 225 async def get_position(self) -> Position: 226 """Get the radio position""" 227 reply = await self.send_message_expect_reply(GetPosition(), GetPositionReply) 228 if isinstance(reply, MessageReplyError): 229 raise reply.as_exception() 230 return reply.position 231 232 async def __aenter__(self): 233 await self.connect() 234 return self 235 236 async def __aexit__( 237 self, 238 exc_type: t.Any, 239 exc_value: t.Any, 240 traceback: t.Any, 241 ) -> None: 242 await self.disconnect()
86 async def send_message_expect_reply(self, command: CommandMessage, expect: t.Type[RadioMessageT]) -> RadioMessageT | MessageReplyError: 87 queue: asyncio.Queue[RadioMessageT | 88 MessageReplyError] = asyncio.Queue() 89 90 def reply_handler(reply: RadioMessage): 91 if ( 92 isinstance(reply, expect) or 93 ( 94 isinstance(reply, MessageReplyError) and 95 reply.message_type is expect 96 ) 97 ): 98 queue.put_nowait(reply) 99 100 remove_handler = self._add_message_handler(reply_handler) 101 102 await self.send_message(command) 103 104 out = await queue.get() 105 106 remove_handler() 107 108 return out
131 async def enable_event(self, event_type: EventType) -> None: 132 """Enable an event""" 133 # This event doesn't get a reply version of EnableEvent. It does, however 134 # does trigger a StatusChangedEvent... but we don't wait for it here. 135 # Instead, we just fire and forget 136 await self.send_message(EnableEvent(event_type))
Enable an event
138 async def send_tnc_data_fragment(self, tnc_data_fragment: TncDataFragment) -> None: 139 """Send Tnc data""" 140 reply = await self.send_message_expect_reply(SendTncDataFragment(tnc_data_fragment), SendTncDataFragmentReply) 141 if isinstance(reply, MessageReplyError): 142 raise reply.as_exception()
Send Tnc data
144 async def get_beacon_settings(self) -> BeaconSettings: 145 """Get the current packet settings""" 146 reply = await self.send_message_expect_reply(GetBeaconSettings(), GetBeaconSettingsReply) 147 if isinstance(reply, MessageReplyError): 148 raise reply.as_exception() 149 return reply.tnc_settings
Get the current packet settings
151 async def set_beacon_settings(self, packet_settings: BeaconSettings): 152 """Set the packet settings""" 153 reply = await self.send_message_expect_reply(SetBeaconSettings(packet_settings), SetBeaconSettingsReply) 154 if isinstance(reply, MessageReplyError): 155 raise reply.as_exception()
Set the packet settings
157 async def get_battery_level(self) -> int: 158 """Get the battery level""" 159 reply = await self.send_message_expect_reply(GetBatteryLevel(), GetBatteryLevelReply) 160 if isinstance(reply, MessageReplyError): 161 raise reply.as_exception() 162 return reply.battery_level
Get the battery level
164 async def get_battery_level_as_percentage(self) -> int: 165 """Get the battery level as a percentage""" 166 reply = await self.send_message_expect_reply(GetBatteryLevelAsPercentage(), GetBatteryLevelAsPercentageReply) 167 if isinstance(reply, MessageReplyError): 168 raise reply.as_exception() 169 return reply.battery_level_as_percentage
Get the battery level as a percentage
171 async def get_rc_battery_level(self) -> int: 172 """Get the RC battery level""" 173 reply = await self.send_message_expect_reply(GetRCBatteryLevel(), GetRCBatteryLevelReply) 174 if isinstance(reply, MessageReplyError): 175 raise reply.as_exception() 176 return reply.rc_battery_level
Get the RC battery level
178 async def get_battery_voltage(self) -> float: 179 """Get the battery voltage""" 180 reply = await self.send_message_expect_reply(GetBatteryVoltage(), GetBatteryVoltageReply) 181 if isinstance(reply, MessageReplyError): 182 raise reply.as_exception() 183 return reply.battery_voltage
Get the battery voltage
185 async def get_device_info(self) -> DeviceInfo: 186 """Get the device info""" 187 reply = await self.send_message_expect_reply(GetDeviceInfo(), GetDeviceInfoReply) 188 if isinstance(reply, MessageReplyError): 189 raise reply.as_exception() 190 return reply.device_info
Get the device info
192 async def get_settings(self) -> Settings: 193 """Get the settings""" 194 reply = await self.send_message_expect_reply(GetSettings(), GetSettingsReply) 195 if isinstance(reply, MessageReplyError): 196 raise reply.as_exception() 197 return reply.settings
Get the settings
199 async def set_settings(self, settings: Settings) -> None: 200 """Set the settings""" 201 reply = await self.send_message_expect_reply(SetSettings(settings), SetSettingsReply) 202 if isinstance(reply, MessageReplyError): 203 raise reply.as_exception()
Set the settings
205 async def get_channel(self, channel_id: int) -> Channel: 206 """Get a channel""" 207 reply = await self.send_message_expect_reply(GetChannel(channel_id), GetChannelReply) 208 if isinstance(reply, MessageReplyError): 209 raise reply.as_exception() 210 return reply.channel
Get a channel
212 async def set_channel(self, channel: Channel): 213 """Set a channel""" 214 reply = await self.send_message_expect_reply(SetChannel(channel), SetChannelReply) 215 if isinstance(reply, MessageReplyError): 216 raise reply.as_exception()
Set a channel
218 async def get_status(self) -> Status: 219 """Get the radio status""" 220 reply = await self.send_message_expect_reply(GetStatus(), GetStatusReply) 221 if isinstance(reply, MessageReplyError): 222 raise reply.as_exception() 223 return reply.status
Get the radio status
EnableEvent(event_type,)
GetBeaconSettings()
SetBeaconSettings(tnc_settings,)
SetSettings(settings,)
GetBatteryLevelAsPercentage()
GetRCBatteryLevel()
GetBatteryLevel()
GetBatteryVoltage()
GetDeviceInfo()
GetChannel(channel_id,)
SetChannel(channel,)
GetSettings()
GetStatus()
GetPosition()
SendTncDataFragment(tnc_data_fragment,)
SendTncDataFragmentReply()
GetBeaconSettingsReply(tnc_settings,)
SetBeaconSettingsReply()
SetSettingsReply()
GetBatteryLevelAsPercentageReply(battery_level_as_percentage,)
GetRCBatteryLevelReply(rc_battery_level,)
GetBatteryLevelReply(battery_level,)
GetBatteryVoltageReply(battery_voltage,)
GetDeviceInfoReply(device_info,)
GetChannelReply(channel,)
SetChannelReply()
GetStatusReply(status,)
GetSettingsReply(settings,)
GetPositionReply(position,)
693class MessageReplyError(t.NamedTuple): 694 message_type: t.Type[t.Any] 695 reason: ReplyStatus 696 697 def as_exception(self): 698 return ValueError(f"{self.message_type.__name__} failed: {self.reason}")
MessageReplyError(message_type, reason)
Create new instance of MessageReplyError(message_type, reason)
StatusChangedEvent(status,)
ChannelChangedEvent(channel,)
TncDataFragmentReceivedEvent(tnc_data_fragment,)
SettingsChangedEvent(settings,)
UnknownProtocolMessage(message,)
815class TncDataFragment(ImmutableBaseModel): 816 """A data object representing a message packet""" 817 is_final_fragment: bool 818 fragment_id: int 819 data: bytes 820 channel_id: int | None = None 821 822 @classmethod 823 def from_protocol(cls, mp: p.TncDataFragment) -> TncDataFragment: 824 """@private (Protocol helper)""" 825 return TncDataFragment( 826 is_final_fragment=mp.is_final_fragment, 827 fragment_id=mp.fragment_id, 828 data=mp.data, 829 channel_id=mp.channel_id 830 ) 831 832 def to_protocol(self) -> p.TncDataFragment: 833 """@private (Protocol helper)""" 834 return p.TncDataFragment( 835 is_final_fragment=self.is_final_fragment, 836 with_channel_id=self.channel_id is not None, 837 fragment_id=self.fragment_id, 838 data=self.data, 839 channel_id=self.channel_id 840 )
A data object representing a message packet
848class DCS(t.NamedTuple): 849 """A type for setting Digital Coded Squelch (DCS) on channels""" 850 851 n: int 852 """The DCS Normal (N) code"""
A type for setting Digital Coded Squelch (DCS) on channels
873class ChannelArgs(t.TypedDict, total=False): 874 """A dictionary of the parameters that can be set on a channel""" 875 tx_mod: ModulationType 876 tx_freq: float 877 rx_mod: ModulationType 878 rx_freq: float 879 tx_sub_audio: float | DCS | None 880 rx_sub_audio: float | DCS | None 881 scan: bool 882 tx_at_max_power: bool 883 talk_around: bool 884 bandwidth: BandwidthType 885 pre_de_emph_bypass: bool 886 sign: bool 887 tx_at_med_power: bool 888 tx_disable: bool 889 fixed_freq: bool 890 fixed_bandwidth: bool 891 fixed_tx_power: bool 892 mute: bool 893 name: str
A dictionary of the parameters that can be set on a channel
896class Channel(ImmutableBaseModel): 897 """A data object representing a radio channel""" 898 channel_id: int 899 tx_mod: ModulationType 900 tx_freq: float 901 rx_mod: ModulationType 902 rx_freq: float 903 tx_sub_audio: float | DCS | None 904 rx_sub_audio: float | DCS | None 905 scan: bool 906 tx_at_max_power: bool 907 talk_around: bool 908 bandwidth: BandwidthType 909 pre_de_emph_bypass: bool 910 sign: bool 911 tx_at_med_power: bool 912 tx_disable: bool 913 fixed_freq: bool 914 fixed_bandwidth: bool 915 fixed_tx_power: bool 916 mute: bool 917 name: str 918 919 @classmethod 920 def from_protocol(cls, cs: p.RfCh) -> Channel: 921 """@private (Protocol helper)""" 922 return Channel( 923 channel_id=cs.channel_id, 924 tx_mod=cs.tx_mod.name, 925 tx_freq=cs.tx_freq, 926 rx_mod=cs.rx_mod.name, 927 rx_freq=cs.rx_freq, 928 tx_sub_audio=sub_audio_from_protocol(cs.tx_sub_audio), 929 rx_sub_audio=sub_audio_from_protocol(cs.rx_sub_audio), 930 scan=cs.scan, 931 tx_at_max_power=cs.tx_at_max_power, 932 talk_around=cs.talk_around, 933 bandwidth=cs.bandwidth.name, 934 pre_de_emph_bypass=cs.pre_de_emph_bypass, 935 sign=cs.sign, 936 tx_at_med_power=cs.tx_at_med_power, 937 tx_disable=cs.tx_disable, 938 fixed_freq=cs.fixed_freq, 939 fixed_bandwidth=cs.fixed_bandwidth, 940 fixed_tx_power=cs.fixed_tx_power, 941 mute=cs.mute, 942 name=cs.name_str 943 ) 944 945 def to_protocol(self) -> p.RfCh: 946 """@private (Protocol helper)""" 947 return p.RfCh( 948 channel_id=self.channel_id, 949 tx_mod=p.ModulationType[self.tx_mod], 950 tx_freq=self.tx_freq, 951 rx_mod=p.ModulationType[self.rx_mod], 952 rx_freq=self.rx_freq, 953 tx_sub_audio=sub_audio_to_protocol(self.tx_sub_audio), 954 rx_sub_audio=sub_audio_to_protocol(self.rx_sub_audio), 955 scan=self.scan, 956 tx_at_max_power=self.tx_at_max_power, 957 talk_around=self.talk_around, 958 bandwidth=p.BandwidthType[self.bandwidth], 959 pre_de_emph_bypass=self.pre_de_emph_bypass, 960 sign=self.sign, 961 tx_at_med_power=self.tx_at_med_power, 962 tx_disable=self.tx_disable, 963 fixed_freq=self.fixed_freq, 964 fixed_bandwidth=self.fixed_bandwidth, 965 fixed_tx_power=self.fixed_tx_power, 966 mute=self.mute, 967 name_str=self.name 968 )
A data object representing a radio channel
971class SettingsArgs(t.TypedDict, total=False): 972 """A dictionary of the parameters that can be set in the radio settings""" 973 channel_a: int 974 channel_b: int 975 scan: bool 976 aghfp_call_mode: int 977 double_channel: int 978 squelch_level: int 979 tail_elim: bool 980 auto_relay_en: bool 981 auto_power_on: bool 982 keep_aghfp_link: bool 983 mic_gain: int 984 tx_hold_time: int 985 tx_time_limit: int 986 local_speaker: int 987 bt_mic_gain: int 988 adaptive_response: bool 989 dis_tone: bool 990 power_saving_mode: bool 991 auto_power_off: int 992 auto_share_loc_ch: int | t.Literal["current"] 993 hm_speaker: int 994 positioning_system: int 995 time_offset: int 996 use_freq_range_2: bool 997 ptt_lock: bool 998 leading_sync_bit_en: bool 999 pairing_at_power_on: bool 1000 screen_timeout: int 1001 vfo_x: int 1002 imperial_unit: bool 1003 wx_mode: int 1004 noaa_ch: int 1005 vfol_tx_power_x: int 1006 vfo2_tx_power_x: int 1007 dis_digital_mute: bool 1008 signaling_ecc_en: bool 1009 ch_data_lock: bool 1010 vfo1_mod_freq_x: int 1011 vfo2_mod_freq_x: int
A dictionary of the parameters that can be set in the radio settings
1014class Settings(ImmutableBaseModel): 1015 """A data object representing the radio settings""" 1016 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1017 channel_a: int 1018 channel_b: int 1019 scan: bool 1020 aghfp_call_mode: int 1021 double_channel: int 1022 squelch_level: int 1023 tail_elim: bool 1024 auto_relay_en: bool 1025 auto_power_on: bool 1026 keep_aghfp_link: bool 1027 mic_gain: int 1028 tx_hold_time: int 1029 tx_time_limit: int 1030 local_speaker: int 1031 bt_mic_gain: int 1032 adaptive_response: bool 1033 dis_tone: bool 1034 power_saving_mode: bool 1035 auto_power_off: int 1036 auto_share_loc_ch: int | t.Literal["current"] 1037 hm_speaker: int 1038 positioning_system: int 1039 time_offset: int 1040 use_freq_range_2: bool 1041 ptt_lock: bool 1042 leading_sync_bit_en: bool 1043 pairing_at_power_on: bool 1044 screen_timeout: int 1045 vfo_x: int 1046 imperial_unit: bool 1047 wx_mode: int 1048 noaa_ch: int 1049 vfol_tx_power_x: int 1050 vfo2_tx_power_x: int 1051 dis_digital_mute: bool 1052 signaling_ecc_en: bool 1053 ch_data_lock: bool 1054 vfo1_mod_freq_x: int 1055 vfo2_mod_freq_x: int 1056 1057 @classmethod 1058 def from_protocol(cls, rs: p.Settings) -> Settings: 1059 """@private (Protocol helper)""" 1060 return Settings( 1061 channel_a=cls._channel_split.from_parts( 1062 rs.channel_a_upper, rs.channel_a_lower 1063 ), 1064 channel_b=cls._channel_split.from_parts( 1065 rs.channel_b_upper, rs.channel_b_lower 1066 ), 1067 scan=rs.scan, 1068 aghfp_call_mode=rs.aghfp_call_mode, 1069 double_channel=rs.double_channel, 1070 squelch_level=rs.squelch_level, 1071 tail_elim=rs.tail_elim, 1072 auto_relay_en=rs.auto_relay_en, 1073 auto_power_on=rs.auto_power_on, 1074 keep_aghfp_link=rs.keep_aghfp_link, 1075 mic_gain=rs.mic_gain, 1076 tx_hold_time=rs.tx_hold_time, 1077 tx_time_limit=rs.tx_time_limit, 1078 local_speaker=rs.local_speaker, 1079 bt_mic_gain=rs.bt_mic_gain, 1080 adaptive_response=rs.adaptive_response, 1081 dis_tone=rs.dis_tone, 1082 power_saving_mode=rs.power_saving_mode, 1083 auto_power_off=rs.auto_power_off, 1084 auto_share_loc_ch=rs.auto_share_loc_ch, 1085 hm_speaker=rs.hm_speaker, 1086 positioning_system=rs.positioning_system, 1087 time_offset=rs.time_offset, 1088 use_freq_range_2=rs.use_freq_range_2, 1089 ptt_lock=rs.ptt_lock, 1090 leading_sync_bit_en=rs.leading_sync_bit_en, 1091 pairing_at_power_on=rs.pairing_at_power_on, 1092 screen_timeout=rs.screen_timeout, 1093 vfo_x=rs.vfo_x, 1094 imperial_unit=rs.imperial_unit, 1095 wx_mode=rs.wx_mode, 1096 noaa_ch=rs.noaa_ch, 1097 vfol_tx_power_x=rs.vfol_tx_power_x, 1098 vfo2_tx_power_x=rs.vfo2_tx_power_x, 1099 dis_digital_mute=rs.dis_digital_mute, 1100 signaling_ecc_en=rs.signaling_ecc_en, 1101 ch_data_lock=rs.ch_data_lock, 1102 vfo1_mod_freq_x=rs.vfo1_mod_freq_x, 1103 vfo2_mod_freq_x=rs.vfo2_mod_freq_x 1104 ) 1105 1106 def to_protocol(self): 1107 """@private (Protocol helper)""" 1108 return p.Settings( 1109 channel_a_lower=self._channel_split.get_lower(self.channel_a), 1110 channel_b_lower=self._channel_split.get_lower(self.channel_b), 1111 scan=self.scan, 1112 aghfp_call_mode=self.aghfp_call_mode, 1113 double_channel=self.double_channel, 1114 squelch_level=self.squelch_level, 1115 tail_elim=self.tail_elim, 1116 auto_relay_en=self.auto_relay_en, 1117 auto_power_on=self.auto_power_on, 1118 keep_aghfp_link=self.keep_aghfp_link, 1119 mic_gain=self.mic_gain, 1120 tx_hold_time=self.tx_hold_time, 1121 tx_time_limit=self.tx_time_limit, 1122 local_speaker=self.local_speaker, 1123 bt_mic_gain=self.bt_mic_gain, 1124 adaptive_response=self.adaptive_response, 1125 dis_tone=self.dis_tone, 1126 power_saving_mode=self.power_saving_mode, 1127 auto_power_off=self.auto_power_off, 1128 auto_share_loc_ch=self.auto_share_loc_ch, 1129 hm_speaker=self.hm_speaker, 1130 positioning_system=self.positioning_system, 1131 time_offset=self.time_offset, 1132 use_freq_range_2=self.use_freq_range_2, 1133 ptt_lock=self.ptt_lock, 1134 leading_sync_bit_en=self.leading_sync_bit_en, 1135 pairing_at_power_on=self.pairing_at_power_on, 1136 screen_timeout=self.screen_timeout, 1137 vfo_x=self.vfo_x, 1138 imperial_unit=self.imperial_unit, 1139 channel_a_upper=self._channel_split.get_upper(self.channel_a), 1140 channel_b_upper=self._channel_split.get_upper(self.channel_b), 1141 wx_mode=self.wx_mode, 1142 noaa_ch=self.noaa_ch, 1143 vfol_tx_power_x=self.vfol_tx_power_x, 1144 vfo2_tx_power_x=self.vfo2_tx_power_x, 1145 dis_digital_mute=self.dis_digital_mute, 1146 signaling_ecc_en=self.signaling_ecc_en, 1147 ch_data_lock=self.ch_data_lock, 1148 vfo1_mod_freq_x=self.vfo1_mod_freq_x, 1149 vfo2_mod_freq_x=self.vfo2_mod_freq_x 1150 )
A data object representing the radio settings
1153class DeviceInfo(ImmutableBaseModel): 1154 """A data object representing the device information""" 1155 vendor_id: int 1156 product_id: int 1157 hardware_version: int 1158 firmware_version: int 1159 supports_radio: bool 1160 supports_medium_power: bool 1161 fixed_location_speaker_volume: bool 1162 has_speaker: bool 1163 has_hand_microphone_speaker: bool 1164 region_count: int 1165 supports_noaa: bool 1166 supports_gmrs: bool 1167 supports_vfo: bool 1168 supports_dmr: bool 1169 supports_software_power_control: bool 1170 channel_count: int 1171 frequency_range_count: int 1172 1173 @classmethod 1174 def from_protocol(cls, info: p.DevInfo) -> DeviceInfo: 1175 """@private (Protocol helper)""" 1176 return DeviceInfo( 1177 vendor_id=info.vendor_id, 1178 product_id=info.product_id, 1179 hardware_version=info.hw_ver, 1180 firmware_version=info.soft_ver, 1181 supports_radio=info.support_radio, 1182 supports_medium_power=info.support_medium_power, 1183 fixed_location_speaker_volume=info.fixed_loc_speaker_vol, 1184 supports_software_power_control=not info.not_support_soft_power_ctrl, 1185 has_speaker=not info.have_no_speaker, 1186 has_hand_microphone_speaker=info.have_hm_speaker, 1187 region_count=info.region_count, 1188 supports_noaa=info.support_noaa, 1189 supports_gmrs=info.gmrs, 1190 supports_vfo=info.support_vfo, 1191 supports_dmr=info.support_dmr, 1192 channel_count=info.channel_count, 1193 frequency_range_count=info.freq_range_count 1194 ) 1195 1196 def to_protocol(self) -> p.DevInfo: 1197 """@private (Protocol helper)""" 1198 return p.DevInfo( 1199 vendor_id=self.vendor_id, 1200 product_id=self.product_id, 1201 hw_ver=self.hardware_version, 1202 soft_ver=self.firmware_version, 1203 support_radio=self.supports_radio, 1204 support_medium_power=self.supports_medium_power, 1205 fixed_loc_speaker_vol=self.fixed_location_speaker_volume, 1206 not_support_soft_power_ctrl=not self.supports_software_power_control, 1207 have_no_speaker=not self.has_speaker, 1208 have_hm_speaker=self.has_hand_microphone_speaker, 1209 region_count=self.region_count, 1210 support_noaa=self.supports_noaa, 1211 gmrs=self.supports_gmrs, 1212 support_vfo=self.supports_vfo, 1213 support_dmr=self.supports_dmr, 1214 channel_count=self.channel_count, 1215 freq_range_count=self.frequency_range_count 1216 )
A data object representing the device information
1219class BeaconSettingsArgs(t.TypedDict, total=False): 1220 """A dictionary of the parameters that can be set in the beacon settings""" 1221 max_fwd_times: int 1222 time_to_live: int 1223 ptt_release_send_location: bool 1224 ptt_release_send_id_info: bool 1225 ptt_release_send_bss_user_id: bool 1226 should_share_location: bool 1227 send_pwr_voltage: bool 1228 packet_format: t.Literal["BSS", "APRS"] 1229 allow_position_check: bool 1230 aprs_ssid: int 1231 location_share_interval: int 1232 bss_user_id: int 1233 ptt_release_id_info: str 1234 beacon_message: str 1235 aprs_symbol: str 1236 aprs_callsign: str
A dictionary of the parameters that can be set in the beacon settings
1239class BeaconSettings(ImmutableBaseModel): 1240 """A data object representing the beacon settings""" 1241 _bss_user_id_split: t.ClassVar[IntSplit] = IntSplit(32, 32) 1242 max_fwd_times: int 1243 time_to_live: int 1244 ptt_release_send_location: bool 1245 ptt_release_send_id_info: bool 1246 ptt_release_send_bss_user_id: bool 1247 should_share_location: bool 1248 send_pwr_voltage: bool 1249 packet_format: t.Literal["BSS", "APRS"] 1250 allow_position_check: bool 1251 aprs_ssid: int 1252 location_share_interval: int 1253 bss_user_id: int 1254 ptt_release_id_info: str 1255 beacon_message: str 1256 aprs_symbol: str 1257 aprs_callsign: str 1258 1259 @classmethod 1260 def from_protocol(cls, bs: p.BSSSettingsExt | p.BSSSettings) -> BeaconSettings: 1261 """@private (Protocol helper)""" 1262 1263 if not isinstance(bs, p.BSSSettingsExt): 1264 raise ValueError( 1265 "Radio replied with old BSSSettings message version. Upgrade your firmware!" 1266 ) 1267 1268 return BeaconSettings( 1269 max_fwd_times=bs.max_fwd_times, 1270 time_to_live=bs.time_to_live, 1271 ptt_release_send_location=bs.ptt_release_send_location, 1272 ptt_release_send_id_info=bs.ptt_release_send_id_info, 1273 ptt_release_send_bss_user_id=bs.ptt_release_send_bss_user_id, 1274 should_share_location=bs.should_share_location, 1275 send_pwr_voltage=bs.send_pwr_voltage, 1276 packet_format=bs.packet_format.name, 1277 allow_position_check=bs.allow_position_check, 1278 aprs_ssid=bs.aprs_ssid, 1279 location_share_interval=bs.location_share_interval, 1280 bss_user_id=cls._bss_user_id_split.from_parts( 1281 bs.bss_user_id_upper, bs.bss_user_id_lower 1282 ), 1283 ptt_release_id_info=bs.ptt_release_id_info, 1284 beacon_message=bs.beacon_message, 1285 aprs_symbol=bs.aprs_symbol, 1286 aprs_callsign=bs.aprs_callsign 1287 ) 1288 1289 def to_protocol(self) -> p.BSSSettingsExt: 1290 """@private (Protocol helper)""" 1291 return p.BSSSettingsExt( 1292 max_fwd_times=self.max_fwd_times, 1293 time_to_live=self.time_to_live, 1294 ptt_release_send_location=self.ptt_release_send_location, 1295 ptt_release_send_id_info=self.ptt_release_send_id_info, 1296 ptt_release_send_bss_user_id=self.ptt_release_send_bss_user_id, 1297 should_share_location=self.should_share_location, 1298 send_pwr_voltage=self.send_pwr_voltage, 1299 packet_format=p.PacketFormat[self.packet_format], 1300 allow_position_check=self.allow_position_check, 1301 aprs_ssid=self.aprs_ssid, 1302 location_share_interval=self.location_share_interval, 1303 bss_user_id_lower=self._bss_user_id_split.get_lower( 1304 self.bss_user_id 1305 ), 1306 ptt_release_id_info=self.ptt_release_id_info, 1307 beacon_message=self.beacon_message, 1308 aprs_symbol=self.aprs_symbol, 1309 aprs_callsign=self.aprs_callsign, 1310 bss_user_id_upper=self._bss_user_id_split.get_upper( 1311 self.bss_user_id 1312 ), 1313 )
A data object representing the beacon settings
1319class Status(ImmutableBaseModel): 1320 """A data object representing the radio status""" 1321 _channel_split: t.ClassVar[IntSplit] = IntSplit(4, 4) 1322 is_power_on: bool 1323 is_in_tx: bool 1324 is_sq: bool 1325 is_in_rx: bool 1326 double_channel: ChannelType 1327 is_scan: bool 1328 is_radio: bool 1329 curr_ch_id: int 1330 is_gps_locked: bool 1331 is_hfp_connected: bool 1332 is_aoc_connected: bool 1333 rssi: float 1334 curr_region: int 1335 1336 @classmethod 1337 def from_protocol(cls, s: p.Status | p.StatusExt) -> Status: 1338 """@private (Protocol helper)""" 1339 if not isinstance(s, p.StatusExt): 1340 raise ValueError( 1341 "Radio replied with old Status message version. Upgrade your firmware!" 1342 ) 1343 1344 return Status( 1345 is_power_on=s.is_power_on, 1346 is_in_tx=s.is_in_tx, 1347 is_sq=s.is_sq, 1348 is_in_rx=s.is_in_rx, 1349 double_channel=s.double_channel.name, 1350 is_scan=s.is_scan, 1351 is_radio=s.is_radio, 1352 curr_ch_id=cls._channel_split.from_parts( 1353 s.curr_channel_id_upper, s.curr_ch_id_lower 1354 ), 1355 is_gps_locked=s.is_gps_locked, 1356 is_hfp_connected=s.is_hfp_connected, 1357 is_aoc_connected=s.is_aoc_connected, 1358 rssi=s.rssi, 1359 curr_region=s.curr_region 1360 ) 1361 1362 def to_protocol(self) -> p.StatusExt: 1363 """@private (Protocol helper)""" 1364 return p.StatusExt( 1365 is_power_on=self.is_power_on, 1366 is_in_tx=self.is_in_tx, 1367 is_sq=self.is_sq, 1368 is_in_rx=self.is_in_rx, 1369 double_channel=p.ChannelType[self.double_channel], 1370 is_scan=self.is_scan, 1371 is_radio=self.is_radio, 1372 curr_ch_id_lower=self._channel_split.get_lower(self.curr_ch_id), 1373 is_gps_locked=self.is_gps_locked, 1374 is_hfp_connected=self.is_hfp_connected, 1375 is_aoc_connected=self.is_aoc_connected, 1376 rssi=self.rssi, 1377 curr_region=self.curr_region, 1378 curr_channel_id_upper=self._channel_split.get_upper( 1379 self.curr_ch_id) 1380 )
A data object representing the radio status
1383class Position(ImmutableBaseModel): 1384 """A data object for representing GPS positions""" 1385 latitude: float 1386 longitude: float 1387 altitude: int | None 1388 speed: int | None 1389 heading: int | None 1390 time: datetime 1391 accuracy: int 1392 1393 @classmethod 1394 def from_protocol(cls, x: p.Position) -> Position: 1395 """@private (Protocol helper)""" 1396 return Position( 1397 latitude=x.latitude, 1398 longitude=x.longitude, 1399 altitude=x.altitude, 1400 speed=x.speed, 1401 heading=x.heading, 1402 time=x.time, 1403 accuracy=x.accuracy, 1404 ) 1405 1406 def to_protocol(self) -> p.Position: 1407 """@private (Protocol helper)""" 1408 return p.Position( 1409 latitude=self.latitude, 1410 longitude=self.longitude, 1411 altitude=self.altitude, 1412 speed=self.speed, 1413 heading=self.heading, 1414 time=self.time, 1415 accuracy=self.accuracy, 1416 )
A data object for representing GPS positions