# -*- coding: utf-8 -*- """ Copyright (C) 2024 Xiaomi Corporation. The ownership and intellectual property rights of Xiaomi Home Assistant Integration and related Xiaomi cloud service API interface provided under this license, including source code and object code (collectively, "Licensed Work"), are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi hereby grants you a personal, limited, non-exclusive, non-transferable, non-sublicensable, and royalty-free license to reproduce, use, modify, and distribute the Licensed Work only for your use of Home Assistant for non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize you to use the Licensed Work for any other purpose, including but not limited to use Licensed Work to develop applications (APP), Web services, and other forms of software. You may reproduce and distribute copies of the Licensed Work, with or without modifications, whether in source or object form, provided that you must give any other recipients of the Licensed Work a copy of this License and retain all copyright and disclaimers. Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible for any direct, indirect, special, incidental, or consequential damages or losses arising from the use or inability to use the Licensed Work. Xiaomi reserves all rights not expressly granted to you in this License. Except for the rights expressly granted by Xiaomi under this License, Xiaomi does not authorize you in any form to use the trademarks, copyrights, or other forms of intellectual property rights of Xiaomi and its affiliates, including, without limitation, without obtaining other written permission from Xiaomi, you shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that may make the public associate with Xiaomi in any form to publicize or promote the software or hardware devices that use the Licensed Work. Xiaomi has the right to immediately terminate all your authorization under this License in the event: 1. You assert patent invalidation, litigation, or other claims against patents or other intellectual property rights of Xiaomi or its affiliates; or, 2. You make, have made, manufacture, sell, or offer to sell products that knock off Xiaomi or its affiliates' products. MIoT Pub/Sub client. """ import asyncio import json import logging import random import re import ssl import struct import threading from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, auto from typing import Any, Callable, Optional, final, Coroutine from paho.mqtt.client import ( MQTT_ERR_SUCCESS, MQTT_ERR_UNKNOWN, Client, MQTTv5, MQTTMessage) # pylint: disable=relative-beyond-top-level from .common import MIoTMatcher from .const import MIHOME_MQTT_KEEPALIVE from .miot_error import MIoTErrorCode, MIoTMipsError _LOGGER = logging.getLogger(__name__) class _MipsMsgTypeOptions(Enum): """MIoT Pub/Sub message type.""" ID = 0 RET_TOPIC = auto() PAYLOAD = auto() FROM = auto() MAX = auto() class _MipsMessage: """MIoT Pub/Sub message.""" mid: int = 0 msg_from: Optional[str] = None ret_topic: Optional[str] = None payload: Optional[str] = None @staticmethod def unpack(data: bytes) -> '_MipsMessage': mips_msg = _MipsMessage() data_len = len(data) data_start = 0 data_end = 0 while data_start < data_len: data_end = data_start+5 unpack_len, unpack_type = struct.unpack( ' bytes: if mid is None or payload is None: raise MIoTMipsError('invalid mid or payload') pack_msg: bytes = b'' # mid pack_msg += struct.pack(' str: return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}' @dataclass class _MipsRequest: """MIoT Pub/Sub request.""" mid: int on_reply: Callable[[str, Any], None] on_reply_ctx: Any timer: Optional[asyncio.TimerHandle] @dataclass class _MipsBroadcast: """MIoT Pub/Sub broadcast.""" topic: str """ param 1: msg topic param 2: msg payload param 3: handle_ctx """ handler: Callable[[str, str, Any], None] handler_ctx: Any def __str__(self) -> str: return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}' @dataclass class _MipsState: """MIoT Pub/Sub state.""" key: str """ str: key bool: mips connect state """ handler: Callable[[str, bool], Coroutine] class MIoTDeviceState(Enum): """MIoT device state define.""" DISABLE = 0 OFFLINE = auto() ONLINE = auto() @dataclass class MipsDeviceState: """MIoT Pub/Sub device state.""" did: Optional[str] = None """handler str: did MIoTDeviceState: online/offline/disable Any: ctx """ handler: Optional[Callable[[str, MIoTDeviceState, Any], None]] = None handler_ctx: Any = None class _MipsClient(ABC): """MIoT Pub/Sub client.""" # pylint: disable=unused-argument MQTT_INTERVAL_S = 1 MIPS_QOS: int = 2 UINT32_MAX: int = 0xFFFFFFFF MIPS_RECONNECT_INTERVAL_MIN: float = 30 MIPS_RECONNECT_INTERVAL_MAX: float = 600 MIPS_SUB_PATCH: int = 300 MIPS_SUB_INTERVAL: float = 1 main_loop: asyncio.AbstractEventLoop _logger: Optional[logging.Logger] _client_id: str _host: str _port: int _username: Optional[str] _password: Optional[str] _ca_file: Optional[str] _cert_file: Optional[str] _key_file: Optional[str] _mqtt_logger: Optional[logging.Logger] _mqtt: Optional[Client] _mqtt_fd: int _mqtt_timer: Optional[asyncio.TimerHandle] _mqtt_state: bool _event_connect: asyncio.Event _event_disconnect: asyncio.Event _internal_loop: asyncio.AbstractEventLoop _mips_thread: Optional[threading.Thread] _mips_reconnect_tag: bool _mips_reconnect_interval: float _mips_reconnect_timer: Optional[asyncio.TimerHandle] _mips_state_sub_map: dict[str, _MipsState] _mips_state_sub_map_lock: threading.Lock _mips_sub_pending_map: dict[str, int] _mips_sub_pending_timer: Optional[asyncio.TimerHandle] def __init__( self, client_id: str, host: str, port: int, username: Optional[str] = None, password: Optional[str] = None, ca_file: Optional[str] = None, cert_file: Optional[str] = None, key_file: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: # MUST run with running loop self.main_loop = loop or asyncio.get_running_loop() self._logger = None self._client_id = client_id self._host = host self._port = port self._username = username self._password = password self._ca_file = ca_file self._cert_file = cert_file self._key_file = key_file self._mqtt_logger = None self._mqtt_fd = -1 self._mqtt_timer = None self._mqtt_state = False self._mqtt = None # Mips init self._event_connect = asyncio.Event() self._event_disconnect = asyncio.Event() self._mips_thread = None self._mips_reconnect_tag = False self._mips_reconnect_interval = 0 self._mips_reconnect_timer = None self._mips_state_sub_map = {} self._mips_state_sub_map_lock = threading.Lock() self._mips_sub_pending_map = {} self._mips_sub_pending_timer = None # DO NOT start the thread yet. Do that on connect @property def client_id(self) -> str: return self._client_id @property def host(self) -> str: return self._host @property def port(self) -> int: return self._port @final @property def mips_state(self) -> bool: """mips connect state. Returns: bool: True: connected, False: disconnected """ if self._mqtt: return self._mqtt.is_connected() return False def connect(self, thread_name: Optional[str] = None) -> None: """mips connect.""" # Start mips thread if self._mips_thread: return self._internal_loop = asyncio.new_event_loop() self._mips_thread = threading.Thread(target=self.__mips_loop_thread) self._mips_thread.daemon = True self._mips_thread.name = ( self._client_id if thread_name is None else thread_name) self._mips_thread.start() async def connect_async(self) -> None: """mips connect async.""" self.connect() await self._event_connect.wait() def disconnect(self) -> None: """mips disconnect.""" if not self._mips_thread: return self._internal_loop.call_soon_threadsafe(self.__mips_disconnect) self._mips_thread.join() self._mips_thread = None self._internal_loop.close() async def disconnect_async(self) -> None: """mips disconnect async.""" self.disconnect() await self._event_disconnect.wait() @final def deinit(self) -> None: self.disconnect() self._logger = None self._username = None self._password = None self._ca_file = None self._cert_file = None self._key_file = None self._mqtt_logger = None with self._mips_state_sub_map_lock: self._mips_state_sub_map.clear() self._mips_sub_pending_map.clear() self._mips_sub_pending_timer = None @final async def deinit_async(self) -> None: await self.disconnect_async() self._logger = None self._username = None self._password = None self._ca_file = None self._cert_file = None self._key_file = None self._mqtt_logger = None with self._mips_state_sub_map_lock: self._mips_state_sub_map.clear() self._mips_sub_pending_map.clear() self._mips_sub_pending_timer = None def update_mqtt_password(self, password: str) -> None: self._password = password if self._mqtt: self._mqtt.username_pw_set( username=self._username, password=self._password) def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._client_id}, '+msg, *args, **kwargs) def log_info(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.info(f'{self._client_id}, '+msg, *args, **kwargs) def log_error(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.error(f'{self._client_id}, '+msg, *args, **kwargs) def enable_logger(self, logger: Optional[logging.Logger] = None) -> None: self._logger = logger def enable_mqtt_logger( self, logger: Optional[logging.Logger] = None ) -> None: self._mqtt_logger = logger if self._mqtt: if logger: self._mqtt.enable_logger(logger=logger) else: self._mqtt.disable_logger() @final def sub_mips_state( self, key: str, handler: Callable[[str, bool], Coroutine] ) -> bool: """Subscribe mips state. NOTICE: callback to main loop thread This will be called before the client is connected. So use mutex instead of IPC. """ if isinstance(key, str) is False or handler is None: raise MIoTMipsError('invalid params') state = _MipsState(key=key, handler=handler) with self._mips_state_sub_map_lock: self._mips_state_sub_map[key] = state self.log_debug(f'mips register mips state, {key}') return True @final def unsub_mips_state(self, key: str) -> bool: """Unsubscribe mips state.""" if isinstance(key, str) is False: raise MIoTMipsError('invalid params') with self._mips_state_sub_map_lock: del self._mips_state_sub_map[key] self.log_debug(f'mips unregister mips state, {key}') return True @abstractmethod def sub_prop( self, did: str, handler: Callable[[dict, Any], None], siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_prop( self, did: str, siid: Optional[int] = None, piid: Optional[int] = None ) -> bool: ... @abstractmethod def sub_event( self, did: str, handler: Callable[[dict, Any], None], siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_event( self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None ) -> bool: ... @abstractmethod async def get_dev_list_async( self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: ... @abstractmethod async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> Any: ... @abstractmethod async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 ) -> dict: ... @abstractmethod async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> dict: ... @abstractmethod def _on_mips_message(self, topic: str, payload: bytes) -> None: ... @abstractmethod def _on_mips_connect(self, rc: int, props: dict) -> None: ... @abstractmethod def _on_mips_disconnect(self, rc: int, props: dict) -> None: ... @final def _mips_sub_internal(self, topic: str) -> None: """mips subscribe. NOTICE: Internal function, only mips threads are allowed to call """ self.__thread_check() if not self._mqtt or not self._mqtt.is_connected(): return try: if topic not in self._mips_sub_pending_map: self._mips_sub_pending_map[topic] = 0 if not self._mips_sub_pending_timer: self._mips_sub_pending_timer = self._internal_loop.call_later( 0.01, self.__mips_sub_internal_pending_handler, topic) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips sub internal error, {topic}. {err}') @final def _mips_unsub_internal(self, topic: str) -> None: """mips unsubscribe. NOTICE: Internal function, only mips threads are allowed to call """ self.__thread_check() if not self._mqtt or not self._mqtt.is_connected(): return try: result, mid = self._mqtt.unsubscribe(topic=topic) if result == MQTT_ERR_SUCCESS: self.log_debug( f'mips unsub internal success, {result}, {mid}, {topic}') return self.log_error( f'mips unsub internal error, {result}, {mid}, {topic}') except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips unsub internal error, {topic}, {err}') @final def _mips_publish_internal( self, topic: str, payload: str | bytes, wait_for_publish: bool = False, timeout_ms: int = 10000 ) -> bool: """mips publish message. NOTICE: Internal function, only mips threads are allowed to call """ self.__thread_check() if not self._mqtt or not self._mqtt.is_connected(): return False try: handle = self._mqtt.publish( topic=topic, payload=payload, qos=self.MIPS_QOS) # self.log_debug(f'_mips_publish_internal, {topic}, {payload}') if wait_for_publish is True: handle.wait_for_publish(timeout_ms/1000.0) return True except Exception as err: # pylint: disable=broad-exception-caught # Catch other exception self.log_error(f'mips publish internal error, {err}') return False def __thread_check(self) -> None: if threading.current_thread() is not self._mips_thread: raise MIoTMipsError('illegal call') def __mqtt_read_handler(self) -> None: self.__mqtt_loop_handler() def __mqtt_write_handler(self) -> None: self._internal_loop.remove_writer(self._mqtt_fd) self.__mqtt_loop_handler() def __mqtt_timer_handler(self) -> None: self.__mqtt_loop_handler() if self._mqtt: self._mqtt_timer = self._internal_loop.call_later( self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) def __mqtt_loop_handler(self) -> None: try: if self._mqtt: self._mqtt.loop_read() if self._mqtt: self._mqtt.loop_write() if self._mqtt: self._mqtt.loop_misc() if self._mqtt and self._mqtt.want_write(): self._internal_loop.add_writer( self._mqtt_fd, self.__mqtt_write_handler) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'__mqtt_loop_handler, {err}') raise err def __mips_loop_thread(self) -> None: self.log_info('mips_loop_thread start') # mqtt init for API_VERSION2, # callback_api_version=CallbackAPIVersion.VERSION2, self._mqtt = Client(client_id=self._client_id, protocol=MQTTv5) self._mqtt.enable_logger(logger=self._mqtt_logger) # Set mqtt config if self._username: self._mqtt.username_pw_set( username=self._username, password=self._password) if ( self._ca_file and self._cert_file and self._key_file ): self._mqtt.tls_set( tls_version=ssl.PROTOCOL_TLS_CLIENT, ca_certs=self._ca_file, certfile=self._cert_file, keyfile=self._key_file) else: self._mqtt.tls_set(tls_version=ssl.PROTOCOL_TLS_CLIENT) self._mqtt.tls_insecure_set(True) self._mqtt.on_connect = self.__on_connect self._mqtt.on_connect_fail = self.__on_connect_failed self._mqtt.on_disconnect = self.__on_disconnect self._mqtt.on_message = self.__on_message # Connect to mips self.__mips_start_connect_tries() # Run event loop self._internal_loop.run_forever() self.log_info('mips_loop_thread exit!') def __on_connect(self, client, user_data, flags, rc, props) -> None: if not self._mqtt: _LOGGER.error('__on_connect, but mqtt is None') return if not self._mqtt.is_connected(): return self.log_info(f'mips connect, {flags}, {rc}, {props}') self._mqtt_state = True self._internal_loop.call_soon( self._on_mips_connect, rc, props) with self._mips_state_sub_map_lock: for item in self._mips_state_sub_map.values(): if item.handler is None: continue self.main_loop.call_soon_threadsafe( self.main_loop.create_task, item.handler(item.key, True)) # Resolve future self.main_loop.call_soon_threadsafe( self._event_connect.set) self.main_loop.call_soon_threadsafe( self._event_disconnect.clear) def __on_connect_failed(self, client: Client, user_data: Any) -> None: self.log_error('mips connect failed') # Try to reconnect self.__mips_try_reconnect() def __on_disconnect(self, client, user_data, rc, props) -> None: if self._mqtt_state: (self.log_info if rc == 0 else self.log_error)( f'mips disconnect, {rc}, {props}') self._mqtt_state = False if self._mqtt_timer: self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: self._internal_loop.remove_reader(self._mqtt_fd) self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 # Clear retry sub if self._mips_sub_pending_timer: self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} self._internal_loop.call_soon( self._on_mips_disconnect, rc, props) # Call state sub handler with self._mips_state_sub_map_lock: for item in self._mips_state_sub_map.values(): if item.handler is None: continue self.main_loop.call_soon_threadsafe( self.main_loop.create_task, item.handler(item.key, False)) # Try to reconnect self.__mips_try_reconnect() # Set event self.main_loop.call_soon_threadsafe( self._event_disconnect.set) self.main_loop.call_soon_threadsafe( self._event_connect.clear) def __on_message( self, client: Client, user_data: Any, msg: MQTTMessage ) -> None: self._on_mips_message(topic=msg.topic, payload=msg.payload) def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: if not self._mqtt or not self._mqtt.is_connected(): _LOGGER.error( 'mips sub internal pending, but mqtt is None or disconnected') return subbed_count = 1 for topic in list(self._mips_sub_pending_map.keys()): if subbed_count > self.MIPS_SUB_PATCH: break count = self._mips_sub_pending_map[topic] if count > 3: self._mips_sub_pending_map.pop(topic) self.log_error(f'retry mips sub internal error, {topic}') continue subbed_count += 1 result, mid = self._mqtt.subscribe(topic, qos=self.MIPS_QOS) if result == MQTT_ERR_SUCCESS: self._mips_sub_pending_map.pop(topic) self.log_debug(f'mips sub internal success, {topic}') continue self._mips_sub_pending_map[topic] = count+1 self.log_error( f'retry mips sub internal, {count}, {topic}, {result}, {mid}') if len(self._mips_sub_pending_map): self._mips_sub_pending_timer = self._internal_loop.call_later( self.MIPS_SUB_INTERVAL, self.__mips_sub_internal_pending_handler, None) else: self._mips_sub_pending_timer = None def __mips_connect(self) -> None: if not self._mqtt: _LOGGER.error('__mips_connect, but mqtt is None') return result = MQTT_ERR_UNKNOWN if self._mips_reconnect_timer: self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None try: # Try clean mqtt fd before mqtt connect if self._mqtt_timer: self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: self._internal_loop.remove_reader(self._mqtt_fd) self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 result = self._mqtt.connect( host=self._host, port=self._port, clean_start=True, keepalive=MIHOME_MQTT_KEEPALIVE) self.log_info(f'__mips_connect success, {result}') except (TimeoutError, OSError) as error: self.log_error('__mips_connect, connect error, %s', error) if result == MQTT_ERR_SUCCESS: socket = self._mqtt.socket() if socket is None: self.log_error( '__mips_connect, connect success, but socket is None') self.__mips_try_reconnect() return self._mqtt_fd = socket.fileno() self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}') self._internal_loop.add_reader( self._mqtt_fd, self.__mqtt_read_handler) if self._mqtt.want_write(): self._internal_loop.add_writer( self._mqtt_fd, self.__mqtt_write_handler) self._mqtt_timer = self._internal_loop.call_later( self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) else: self.log_error(f'__mips_connect error result, {result}') self.__mips_try_reconnect() def __mips_try_reconnect(self, immediately: bool = False) -> None: if self._mips_reconnect_timer: self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None if not self._mips_reconnect_tag: return interval: float = 0 if not immediately: interval = self.__get_next_reconnect_time() self.log_error( 'mips try reconnect after %ss', interval) self._mips_reconnect_timer = self._internal_loop.call_later( interval, self.__mips_connect) def __mips_start_connect_tries(self) -> None: self._mips_reconnect_tag = True self.__mips_try_reconnect(immediately=True) def __mips_disconnect(self) -> None: self._mips_reconnect_tag = False if self._mips_reconnect_timer: self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None if self._mqtt_timer: self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: self._internal_loop.remove_reader(self._mqtt_fd) self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 # Clear retry sub if self._mips_sub_pending_timer: self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} if self._mqtt: self._mqtt.disconnect() self._mqtt = None self._internal_loop.stop() def __get_next_reconnect_time(self) -> float: if self._mips_reconnect_interval == 0: self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN else: self._mips_reconnect_interval = min( self._mips_reconnect_interval*2, self.MIPS_RECONNECT_INTERVAL_MAX) return self._mips_reconnect_interval class MipsCloudClient(_MipsClient): """MIoT Pub/Sub Cloud Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes _msg_matcher: MIoTMatcher def __init__( self, uuid: str, cloud_server: str, app_id: str, token: str, port: int = 8883, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._msg_matcher = MIoTMatcher() super().__init__( client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com', port=port, username=app_id, password=token, loop=loop) @final def disconnect(self) -> None: super().disconnect() self._msg_matcher = MIoTMatcher() def update_access_token(self, access_token: str) -> bool: if not isinstance(access_token, str): raise MIoTMipsError('invalid token') self.update_mqtt_password(password=access_token) return True @final def sub_prop( self, did: str, handler: Callable[[dict, Any], None], siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') def on_prop_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: self.log_error( f'on_prop_msg, invalid msg, {topic}, {payload}') return if ( not isinstance(msg.get('params', None), dict) or 'siid' not in msg['params'] or 'piid' not in msg['params'] or 'value' not in msg['params'] ): self.log_error( f'on_prop_msg, invalid msg, {topic}, {payload}') return if handler: self.log_debug('on properties_changed, %s', payload) handler(msg['params'], ctx) return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final def unsub_prop( self, did: str, siid: Optional[int] = None, piid: Optional[int] = None ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') return self.__unreg_broadcast_external(topic=topic) @final def sub_event( self, did: str, handler: Callable[[dict, Any], None], siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') def on_event_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: self.log_error( f'on_event_msg, invalid msg, {topic}, {payload}') return if ( not isinstance(msg.get('params', None), dict) or 'siid' not in msg['params'] or 'eiid' not in msg['params'] or 'arguments' not in msg['params'] ): self.log_error( f'on_event_msg, invalid msg, {topic}, {payload}') return if handler: self.log_debug('on on_event_msg, %s', payload) msg['params']['from'] = 'cloud' handler(msg['params'], ctx) return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final def unsub_event( self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') return self.__unreg_broadcast_external(topic=topic) @final def sub_device_state( self, did: str, handler: Callable[[str, MIoTDeviceState, Any], None], handler_ctx: Any = None ) -> bool: """subscribe online state.""" if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' def on_state_msg(topic: str, payload: str, ctx: Any) -> None: msg: dict = json.loads(payload) # {"device_id":"xxxx","device_name":"米家智能插座3 ","event":"online", # "model": "cuco.plug.v3","timestamp":1709001070828,"uid":xxxx} if msg is None or 'device_id' not in msg or 'event' not in msg: self.log_error(f'on_state_msg, recv unknown msg, {payload}') return if msg['device_id'] != did: self.log_error( f'on_state_msg, err msg, {did}!={msg["device_id"]}') return if handler: self.log_debug('cloud, device state changed, %s', payload) handler( did, MIoTDeviceState.ONLINE if msg['event'] == 'online' else MIoTDeviceState.OFFLINE, ctx) return self.__reg_broadcast_external( topic=topic, handler=on_state_msg, handler_ctx=handler_ctx) @final def unsub_device_state(self, did: str) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' return self.__unreg_broadcast_external(topic=topic) async def get_dev_list_async( self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: raise NotImplementedError('please call in http client') async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> Any: raise NotImplementedError('please call in http client') async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 ) -> dict: raise NotImplementedError('please call in http client') async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> dict: raise NotImplementedError('please call in http client') def __reg_broadcast_external( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any = None ) -> bool: self._internal_loop.call_soon_threadsafe( self.__reg_broadcast, topic, handler, handler_ctx) return True def __unreg_broadcast_external(self, topic: str) -> bool: self._internal_loop.call_soon_threadsafe( self.__unreg_broadcast, topic) return True def __reg_broadcast( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any = None ) -> None: if not self._msg_matcher.get(topic=topic): sub_bc: _MipsBroadcast = _MipsBroadcast( topic=topic, handler=handler, handler_ctx=handler_ctx) self._msg_matcher[topic] = sub_bc self._mips_sub_internal(topic=topic) else: self.log_debug(f'mips cloud re-reg broadcast, {topic}') def __unreg_broadcast(self, topic: str) -> None: if self._msg_matcher.get(topic=topic): del self._msg_matcher[topic] self._mips_unsub_internal(topic=topic) def _on_mips_connect(self, rc: int, props: dict) -> None: """sub topic.""" for topic, _ in list( self._msg_matcher.iter_all_nodes()): self._mips_sub_internal(topic=topic) def _on_mips_disconnect(self, rc: int, props: dict) -> None: """unsub topic.""" pass def _on_mips_message(self, topic: str, payload: bytes) -> None: """ NOTICE thread safe, this function will be called at the **mips** thread """ # broadcast bc_list: list[_MipsBroadcast] = list( self._msg_matcher.iter_match(topic)) if not bc_list: return # The message from the cloud is not packed. payload_str: str = payload.decode('utf-8') # self.log_debug(f"on broadcast, {topic}, {payload}") for item in bc_list or []: if item.handler is None: continue # NOTICE: call threadsafe self.main_loop.call_soon_threadsafe( item.handler, topic, payload_str, item.handler_ctx) class MipsLocalClient(_MipsClient): """MIoT Pub/Sub Local Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes MIPS_RECONNECT_INTERVAL_MIN: float = 6 MIPS_RECONNECT_INTERVAL_MAX: float = 60 MIPS_SUB_PATCH: int = 1000 MIPS_SUB_INTERVAL: float = 0.1 _did: str _group_id: str _home_name: str _mips_seed_id: int _reply_topic: str _dev_list_change_topic: str _request_map: dict[str, _MipsRequest] _msg_matcher: MIoTMatcher _get_prop_queue: dict[str, list] _get_prop_timer: Optional[asyncio.TimerHandle] _on_dev_list_changed: Optional[Callable[[Any, list[str]], Coroutine]] def __init__( self, did: str, host: str, group_id: str, ca_file: str, cert_file: str, key_file: str, port: int = 8883, home_name: str = '', loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._did = did self._group_id = group_id self._home_name = home_name self._mips_seed_id = random.randint(0, self.UINT32_MAX) self._reply_topic = f'{did}/reply' self._dev_list_change_topic = f'{did}/appMsg/devListChange' self._request_map = {} self._msg_matcher = MIoTMatcher() self._get_prop_queue = {} self._get_prop_timer = None self._on_dev_list_changed = None super().__init__( client_id=did, host=host, port=port, ca_file=ca_file, cert_file=cert_file, key_file=key_file, loop=loop) @property def group_id(self) -> str: return self._group_id def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs) def log_info(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.info(f'{self._home_name}, '+msg, *args, **kwargs) def log_error(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) @final def connect(self, thread_name: Optional[str] = None) -> None: # MIPS local thread name use group_id super().connect(self._group_id) @final def disconnect(self) -> None: super().disconnect() self._request_map = {} self._msg_matcher = MIoTMatcher() @final def sub_prop( self, did: str, handler: Callable[[dict, Any], None], siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') def on_prop_msg(topic: str, payload: str, ctx: Any): msg: dict = json.loads(payload) if ( msg is None or 'did' not in msg or 'siid' not in msg or 'piid' not in msg or 'value' not in msg ): # self.log_error(f'on_prop_msg, recv unknown msg, {payload}') return if handler: self.log_debug('local, on properties_changed, %s', payload) handler(msg, ctx) return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final def unsub_prop( self, did: str, siid: Optional[int] = None, piid: Optional[int] = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') return self.__unreg_broadcast_external(topic=topic) @final def sub_event( self, did: str, handler: Callable[[dict, Any], None], siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') def on_event_msg(topic: str, payload: str, ctx: Any): msg: dict = json.loads(payload) if ( msg is None or 'did' not in msg or 'siid' not in msg or 'eiid' not in msg # or 'arguments' not in msg ): self.log_error('unknown event msg, %s', payload) return if 'arguments' not in msg: self.log_info('wrong event msg, %s', payload) msg['arguments'] = [] if handler: self.log_debug('local, on event_occurred, %s', payload) handler(msg, ctx) return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final def unsub_event( self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') return self.__unreg_broadcast_external(topic=topic) @final async def get_prop_safe_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> Any: self._get_prop_queue.setdefault(did, []) fut: asyncio.Future = self.main_loop.create_future() self._get_prop_queue[did].append({ 'param': json.dumps({ 'did': did, 'siid': siid, 'piid': piid }), 'fut': fut, 'timeout_ms': timeout_ms }) if self._get_prop_timer is None: self._get_prop_timer = self.main_loop.call_later( 0.1, self.main_loop.create_task, self.__get_prop_timer_handle()) return await fut @final async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 ) -> Any: result_obj = await self.__request_async( topic='proxy/get', payload=json.dumps({ 'did': did, 'siid': siid, 'piid': piid }), timeout_ms=timeout_ms) if not isinstance(result_obj, dict) or 'value' not in result_obj: return None return result_obj['value'] @final async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 ) -> dict: payload_obj: dict = { 'did': did, 'rpc': { 'id': self.__gen_mips_id, 'method': 'set_properties', 'params': [{ 'did': did, 'siid': siid, 'piid': piid, 'value': value }] } } result_obj = await self.__request_async( topic='proxy/rpcReq', payload=json.dumps(payload_obj), timeout_ms=timeout_ms) if result_obj: if ( 'result' in result_obj and len(result_obj['result']) == 1 and 'did' in result_obj['result'][0] and result_obj['result'][0]['did'] == did and 'code' in result_obj['result'][0] ): return result_obj['result'][0] if 'error' in result_obj: return result_obj['error'] return { 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'message': 'Invalid result'} @final async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 ) -> dict: payload_obj: dict = { 'did': did, 'rpc': { 'id': self.__gen_mips_id, 'method': 'action', 'params': { 'did': did, 'siid': siid, 'aiid': aiid, 'in': in_list } } } result_obj = await self.__request_async( topic='proxy/rpcReq', payload=json.dumps(payload_obj), timeout_ms=timeout_ms) if result_obj: if 'result' in result_obj and 'code' in result_obj['result']: return result_obj['result'] if 'error' in result_obj: return result_obj['error'] return { 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, 'message': 'Invalid result'} @final async def get_dev_list_async( self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: result_obj = await self.__request_async( topic='proxy/getDevList', payload=payload or '{}', timeout_ms=timeout_ms) if not result_obj or 'devList' not in result_obj: raise MIoTMipsError('invalid result') device_list = {} for did, info in result_obj['devList'].items(): name: str = info.get('name', None) urn: str = info.get('urn', None) model: str = info.get('model', None) if name is None or urn is None or model is None: self.log_error(f'invalid device info, {did}, {info}') continue device_list[did] = { 'did': did, 'name': name, 'urn': urn, 'model': model, 'online': info.get('online', False), 'icon': info.get('icon', None), 'fw_version': None, 'home_id': '', 'home_name': '', 'room_id': info.get('roomId', ''), 'room_name': info.get('roomName', ''), 'specv2_access': info.get('specV2Access', False), 'push_available': info.get('pushAvailable', False), 'manufacturer': model.split('.')[0], } return device_list @final async def get_action_group_list_async( self, timeout_ms: int = 10000 ) -> list[str]: result_obj = await self.__request_async( topic='proxy/getMijiaActionGroupList', payload='{}', timeout_ms=timeout_ms) if not result_obj or 'result' not in result_obj: raise MIoTMipsError('invalid result') return result_obj['result'] @final async def exec_action_group_list_async( self, ag_id: str, timeout_ms: int = 10000 ) -> dict: result_obj = await self.__request_async( topic='proxy/execMijiaActionGroup', payload=f'{{"id":"{ag_id}"}}', timeout_ms=timeout_ms) if result_obj: if 'result' in result_obj: return result_obj['result'] if 'error' in result_obj: return result_obj['error'] return { 'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value, 'message': 'invalid result'} @final @property def on_dev_list_changed( self ) -> Optional[Callable[[Any, list[str]], Coroutine]]: return self._on_dev_list_changed @final @on_dev_list_changed.setter def on_dev_list_changed( self, func: Optional[Callable[[Any, list[str]], Coroutine]] ) -> None: """run in main loop.""" self._on_dev_list_changed = func def __request( self, topic: str, payload: str, on_reply: Callable[[str, Any], None], on_reply_ctx: Any = None, timeout_ms: int = 10000 ) -> None: req = _MipsRequest( mid=self.__gen_mips_id, on_reply=on_reply, on_reply_ctx=on_reply_ctx, timer=None) pub_topic: str = f'master/{topic}' result = self.__mips_publish( topic=pub_topic, payload=payload, mid=req.mid, ret_topic=self._reply_topic) self.log_debug( f'mips local call api, {result}, {req.mid}, {pub_topic}, ' f'{payload}') def on_request_timeout(req: _MipsRequest): self.log_error( f'on mips request timeout, {req.mid}, {pub_topic}' f', {payload}') self._request_map.pop(str(req.mid), None) req.on_reply( '{"error":{"code":-10006, "message":"timeout"}}', req.on_reply_ctx) req.timer = self._internal_loop.call_later( timeout_ms/1000, on_request_timeout, req) self._request_map[str(req.mid)] = req def __reg_broadcast( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any ) -> None: sub_topic: str = f'{self._did}/{topic}' if not self._msg_matcher.get(sub_topic): sub_bc: _MipsBroadcast = _MipsBroadcast( topic=sub_topic, handler=handler, handler_ctx=handler_ctx) self._msg_matcher[sub_topic] = sub_bc self._mips_sub_internal(topic=f'master/{topic}') else: self.log_debug(f'mips re-reg broadcast, {sub_topic}') def __unreg_broadcast(self, topic) -> None: # Central hub gateway needs to add prefix unsub_topic: str = f'{self._did}/{topic}' if self._msg_matcher.get(unsub_topic): del self._msg_matcher[unsub_topic] self._mips_unsub_internal( topic=re.sub(f'^{self._did}', 'master', unsub_topic)) @final def _on_mips_connect(self, rc: int, props: dict) -> None: self.log_debug('__on_mips_connect_handler') # Sub did/#, include reply topic self._mips_sub_internal(f'{self._did}/#') # Sub device list change self._mips_sub_internal('master/appMsg/devListChange') # Do not need to subscribe api topics, for they are covered by did/# # Sub api topic. # Sub broadcast topic for topic, _ in list(self._msg_matcher.iter_all_nodes()): self._mips_sub_internal( topic=re.sub(f'^{self._did}', 'master', topic)) @final def _on_mips_disconnect(self, rc: int, props: dict) -> None: pass @final def _on_mips_message(self, topic: str, payload: bytes) -> None: mips_msg: _MipsMessage = _MipsMessage.unpack(payload) # self.log_debug( # f"mips local client, on_message, {topic} -> {mips_msg}") # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') req: Optional[_MipsRequest] = self._request_map.pop( str(mips_msg.mid), None) if req: # Cancel timer if req.timer: req.timer.cancel() if req.on_reply: self.main_loop.call_soon_threadsafe( req.on_reply, mips_msg.payload or '{}', req.on_reply_ctx) return # Broadcast bc_list: list[_MipsBroadcast] = list(self._msg_matcher.iter_match( topic=topic)) if bc_list: self.log_debug(f'on broadcast, {topic}, {mips_msg}') for item in bc_list or []: if item.handler is None: continue self.main_loop.call_soon_threadsafe( item.handler, topic[topic.find('/')+1:], mips_msg.payload or '{}', item.handler_ctx) return # Device list change if topic == self._dev_list_change_topic: if mips_msg.payload is None: self.log_error('devListChange msg is None') return payload_obj: dict = json.loads(mips_msg.payload) dev_list = payload_obj.get('devList', None) if not isinstance(dev_list, list) or not dev_list: _LOGGER.error( 'unknown devListChange msg, %s', mips_msg.payload) return if self._on_dev_list_changed: self.main_loop.call_soon_threadsafe( self.main_loop.create_task, self._on_dev_list_changed(self, dev_list)) return self.log_debug( f'mips local client, recv unknown msg, {topic} -> {mips_msg}') @property def __gen_mips_id(self) -> int: mips_id: int = self._mips_seed_id self._mips_seed_id = int((self._mips_seed_id+1) % self.UINT32_MAX) return mips_id def __mips_publish( self, topic: str, payload: str, mid: Optional[int] = None, ret_topic: Optional[str] = None, wait_for_publish: bool = False, timeout_ms: int = 10000 ) -> bool: mips_msg: bytes = _MipsMessage.pack( mid=mid or self.__gen_mips_id, payload=payload, msg_from='local', ret_topic=ret_topic) return self._mips_publish_internal( topic=topic.strip(), payload=mips_msg, wait_for_publish=wait_for_publish, timeout_ms=timeout_ms) def __request_external( self, topic: str, payload: str, on_reply: Callable[[str, Any], None], on_reply_ctx: Any = None, timeout_ms: int = 10000 ) -> bool: if topic is None or payload is None or on_reply is None: raise MIoTMipsError('invalid params') self._internal_loop.call_soon_threadsafe( self.__request, topic, payload, on_reply, on_reply_ctx, timeout_ms) return True def __reg_broadcast_external( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any ) -> bool: self._internal_loop.call_soon_threadsafe( self.__reg_broadcast, topic, handler, handler_ctx) return True def __unreg_broadcast_external(self, topic) -> bool: self._internal_loop.call_soon_threadsafe( self.__unreg_broadcast, topic) return True @final async def __request_async( self, topic: str, payload: str, timeout_ms: int = 10000 ) -> dict: fut_handler: asyncio.Future = self.main_loop.create_future() def on_msg_reply(payload: str, ctx: Any): fut: asyncio.Future = ctx if fut: self.main_loop.call_soon_threadsafe(fut.set_result, payload) if not self.__request_external( topic=topic, payload=payload, on_reply=on_msg_reply, on_reply_ctx=fut_handler, timeout_ms=timeout_ms): # Request error fut_handler.set_result('internal request error') result = await fut_handler try: return json.loads(result) except json.JSONDecodeError: return { 'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value, 'message': f'Error: {result}'} async def __get_prop_timer_handle(self) -> None: for did in list(self._get_prop_queue.keys()): item = self._get_prop_queue[did].pop() _LOGGER.debug('get prop, %s, %s', did, item) result_obj = await self.__request_async( topic='proxy/get', payload=item['param'], timeout_ms=item['timeout_ms']) if result_obj is None or 'value' not in result_obj: item['fut'].set_result(None) else: item['fut'].set_result(result_obj['value']) if not self._get_prop_queue[did]: self._get_prop_queue.pop(did, None) if self._get_prop_queue: self._get_prop_timer = self.main_loop.call_later( 0.1, lambda: self.main_loop.create_task( self.__get_prop_timer_handle())) else: self._get_prop_timer = None