# -*- coding: utf-8 -*-
"""
Copyright (C) 2024 Xiaomi Corporation.

The ownership and intellectual property rights of Xiaomi Home Assistant
Integration and related Xiaomi cloud service API interface provided under this
license, including source code and object code (collectively, "Licensed Work"),
are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
hereby grants you a personal, limited, non-exclusive, non-transferable,
non-sublicensable, and royalty-free license to reproduce, use, modify, and
distribute the Licensed Work only for your use of Home Assistant for
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
you to use the Licensed Work for any other purpose, including but not limited
to use Licensed Work to develop applications (APP), Web services, and other
forms of software.

You may reproduce and distribute copies of the Licensed Work, with or without
modifications, whether in source or object form, provided that you must give
any other recipients of the Licensed Work a copy of this License and retain all
copyright and disclaimers.

Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied, including, without
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
for any direct, indirect, special, incidental, or consequential damages or
losses arising from the use or inability to use the Licensed Work.

Xiaomi reserves all rights not expressly granted to you in this License.
Except for the rights expressly granted by Xiaomi under this License, Xiaomi
does not authorize you in any form to use the trademarks, copyrights, or other
forms of intellectual property rights of Xiaomi and its affiliates, including,
without limitation, without obtaining other written permission from Xiaomi, you
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
may make the public associate with Xiaomi in any form to publicize or promote
the software or hardware devices that use the Licensed Work.

Xiaomi has the right to immediately terminate all your authorization under this
License in the event:
1. You assert patent invalidation, litigation, or other claims against patents
or other intellectual property rights of Xiaomi or its affiliates; or,
2. You make, have made, manufacture, sell, or offer to sell products that knock
off Xiaomi or its affiliates' products.

MIoT Pub/Sub client.
"""
import asyncio
import json
import logging
import os
import queue
import random
import re
import ssl
import struct
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Callable, Optional, final

from paho.mqtt.client import (
    MQTT_ERR_SUCCESS,
    MQTT_ERR_UNKNOWN,
    Client,
    MQTTv5)

# pylint: disable=relative-beyond-top-level
from .common import MIoTMatcher
from .const import MIHOME_MQTT_KEEPALIVE
from .miot_error import MIoTErrorCode, MIoTMipsError
from .miot_ev import MIoTEventLoop, TimeoutHandle

_LOGGER = logging.getLogger(__name__)


class MipsMsgTypeOptions(Enum):
    """MIoT Pub/Sub message type."""
    ID = 0
    RET_TOPIC = auto()
    PAYLOAD = auto()
    FROM = auto()
    MAX = auto()


class MipsMessage:
    """MIoT Pub/Sub message."""
    mid: int = 0
    msg_from: str = None
    ret_topic: str = None
    payload: str = None

    @staticmethod
    def unpack(data: bytes):
        mips_msg = MipsMessage()
        data_len = len(data)
        data_start = 0
        data_end = 0
        while data_start < data_len:
            data_end = data_start+5
            unpack_len, unpack_type = struct.unpack(
                '<IB', data[data_start:data_end])
            unpack_data = data[data_end:data_end+unpack_len]
            #  string end with \x00
            match unpack_type:
                case MipsMsgTypeOptions.ID.value:
                    mips_msg.mid = int.from_bytes(
                        unpack_data, byteorder='little')
                case MipsMsgTypeOptions.RET_TOPIC.value:
                    mips_msg.ret_topic = str(
                        unpack_data.strip(b'\x00'), 'utf-8')
                case MipsMsgTypeOptions.PAYLOAD.value:
                    mips_msg.payload = str(unpack_data.strip(b'\x00'), 'utf-8')
                case MipsMsgTypeOptions.FROM.value:
                    mips_msg.msg_from = str(
                        unpack_data.strip(b'\x00'), 'utf-8')
                case _:
                    pass
            data_start = data_end+unpack_len
        return mips_msg

    @staticmethod
    def pack(
        mid: int, payload: str, msg_from: str = None, ret_topic: str = None
    ) -> bytes:
        if mid is None or payload is None:
            raise MIoTMipsError('invalid mid or payload')
        pack_msg: bytes = b''
        # mid
        pack_msg += struct.pack('<IBI', 4, MipsMsgTypeOptions.ID.value, mid)
        # msg_from
        if msg_from:
            pack_len = len(msg_from)
            pack_msg += struct.pack(
                f'<IB{pack_len}sx', pack_len+1,
                MipsMsgTypeOptions.FROM.value, msg_from.encode('utf-8'))
        # ret_topic
        if ret_topic:
            pack_len = len(ret_topic)
            pack_msg += struct.pack(
                f'<IB{pack_len}sx', pack_len+1,
                MipsMsgTypeOptions.RET_TOPIC.value, ret_topic.encode('utf-8'))
        # payload
        pack_len = len(payload)
        pack_msg += struct.pack(
            f'<IB{pack_len}sx', pack_len+1,
            MipsMsgTypeOptions.PAYLOAD.value, payload.encode('utf-8'))
        return pack_msg

    def __str__(self) -> str:
        return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}'


class MipsCmdType(Enum):
    """MIoT Pub/Sub command type."""
    CONNECT = 0
    DISCONNECT = auto()
    DEINIT = auto()
    SUB = auto()
    UNSUB = auto()
    CALL_API = auto()
    REG_BROADCAST = auto()
    UNREG_BROADCAST = auto()

    REG_MIPS_STATE = auto()
    UNREG_MIPS_STATE = auto()
    REG_DEVICE_STATE = auto()
    UNREG_DEVICE_STATE = auto()


@dataclass
class MipsCmd:
    """MIoT Pub/Sub command."""
    type_: MipsCmdType
    data: any

    def __init__(self, type_: MipsCmdType, data: any) -> None:
        self.type_ = type_
        self.data = data


@dataclass
class MipsRequest:
    """MIoT Pub/Sub request."""
    mid: int = None
    on_reply: Callable[[str, any], None] = None
    on_reply_ctx: any = None
    timer: TimeoutHandle = None


@dataclass
class MipsRequestData:
    """MIoT Pub/Sub request data."""
    topic: str = None
    payload: str = None
    on_reply: Callable[[str, any], None] = None
    on_reply_ctx: any = None
    timeout_ms: int = None


@dataclass
class MipsSendBroadcastData:
    """MIoT Pub/Sub send broadcast data."""
    topic: str = None
    payload: str = None


@dataclass
class MipsIncomingApiCall:
    """MIoT Pub/Sub incoming API call."""
    mid: int = None
    ret_topic: str = None
    timer: TimeoutHandle = None


@dataclass
class MipsApi:
    """MIoT Pub/Sub API."""
    topic: str = None
    """
    param1: session
    param2: payload
    param3: handler_ctx
    """
    handler: Callable[[MipsIncomingApiCall, str, any], None] = None
    handler_ctx: any = None


class MipsRegApi(MipsApi):
    """.MIoT Pub/Sub register API."""


@dataclass
class MipsReplyData:
    """MIoT Pub/Sub reply data."""
    session: MipsIncomingApiCall = None
    payload: str = None


@dataclass
class MipsBroadcast:
    """MIoT Pub/Sub broadcast."""
    topic: str = None
    """
    param 1: msg topic
    param 2: msg payload
    param 3: handle_ctx
    """
    handler: Callable[[str, str, any], None] = None
    handler_ctx: any = None

    def __str__(self) -> str:
        return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}'


class MipsRegBroadcast(MipsBroadcast):
    """MIoT Pub/Sub register broadcast."""


@dataclass
class MipsState:
    """MIoT Pub/Sub state."""
    key: str = None
    """
    str: key
    bool: mips connect state
    any: ctx
    """
    handler: Callable[[str, bool], asyncio.Future] = None


class MipsRegState(MipsState):
    """MIoT Pub/Sub register state."""


class MIoTDeviceState(Enum):
    """MIoT device state define."""
    DISABLE = 0
    OFFLINE = auto()
    ONLINE = auto()


@dataclass
class MipsDeviceState:
    """MIoT Pub/Sub device state."""
    did: str = None
    """handler
    str: did
    MIoTDeviceState: online/offline/disable
    any: ctx
    """
    handler: Callable[[str, MIoTDeviceState, any], None] = None
    handler_ctx: any = None


class MipsRegDeviceState(MipsDeviceState):
    """MIoT Pub/Sub register device state."""


class MipsClient(ABC):
    """MIoT Pub/Sub client."""
    # pylint: disable=unused-argument
    MQTT_INTERVAL_MS = 1000
    MIPS_QOS: int = 2
    UINT32_MAX: int = 0xFFFFFFFF
    MIPS_RECONNECT_INTERVAL_MIN: int = 30000
    MIPS_RECONNECT_INTERVAL_MAX: int = 600000
    MIPS_SUB_PATCH: int = 300
    MIPS_SUB_INTERVAL: int = 1000
    main_loop: asyncio.AbstractEventLoop
    _logger: logging.Logger
    _client_id: str
    _host: str
    _port: int
    _username: str
    _password: str
    _ca_file: str
    _cert_file: str
    _key_file: str

    _mqtt_logger: logging.Logger
    _mqtt: Client
    _mqtt_fd: int
    _mqtt_timer: TimeoutHandle
    _mqtt_state: bool

    _event_connect: asyncio.Event
    _event_disconnect: asyncio.Event
    _mev: MIoTEventLoop
    _mips_thread: threading.Thread
    _mips_queue: queue.Queue
    _cmd_event_fd: os.eventfd
    _mips_reconnect_tag: bool
    _mips_reconnect_interval: int
    _mips_reconnect_timer: Optional[TimeoutHandle]
    _mips_state_sub_map: dict[str, MipsState]
    _mips_sub_pending_map: dict[str, int]
    _mips_sub_pending_timer: Optional[TimeoutHandle]

    _on_mips_cmd: Callable[[MipsCmd], None]
    _on_mips_message: Callable[[str, bytes], None]
    _on_mips_connect: Callable[[int, dict], None]
    _on_mips_disconnect: Callable[[int, dict], None]

    def __init__(
            self, client_id: str, host: str, port: int,
            username: str = None, password: str = None,
            ca_file: str = None, cert_file: str = None, key_file: str = None,
            loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        # MUST run with running loop
        self.main_loop = loop or asyncio.get_running_loop()
        self._logger = None
        self._client_id = client_id
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._ca_file = ca_file
        self._cert_file = cert_file
        self._key_file = key_file

        self._mqtt_logger = None
        self._mqtt_fd = -1
        self._mqtt_timer = None
        self._mqtt_state = False
        # mqtt init for API_VERSION2,
        # callback_api_version=CallbackAPIVersion.VERSION2,
        self._mqtt = Client(client_id=self._client_id, protocol=MQTTv5)
        self._mqtt.enable_logger(logger=self._mqtt_logger)

        # Mips init
        self._event_connect = asyncio.Event()
        self._event_disconnect = asyncio.Event()
        self._mips_reconnect_tag = False
        self._mips_reconnect_interval = 0
        self._mips_reconnect_timer = None
        self._mips_state_sub_map = {}
        self._mips_sub_pending_map = {}
        self._mips_sub_pending_timer = None
        self._mev = MIoTEventLoop()
        self._mips_queue = queue.Queue()
        self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK)
        self.mev_set_read_handler(
            self._cmd_event_fd, self.__mips_cmd_read_handler, None)
        self._mips_thread = threading.Thread(target=self.__mips_loop_thread)
        self._mips_thread.daemon = True
        self._mips_thread.name = self._client_id
        self._mips_thread.start()

        self._on_mips_cmd = None
        self._on_mips_message = None
        self._on_mips_connect = None
        self._on_mips_disconnect = None

    @property
    def client_id(self) -> str:
        return self._client_id

    @property
    def host(self) -> str:
        return self._host

    @property
    def port(self) -> int:
        return self._port

    @final
    @property
    def mips_state(self) -> bool:
        """mips connect state.

        Returns:
            bool: True: connected, False: disconnected
        """
        return self._mqtt and self._mqtt.is_connected()

    @final
    def mips_deinit(self) -> None:
        self._mips_send_cmd(type_=MipsCmdType.DEINIT, data=None)
        self._mips_thread.join()
        self._mips_thread = None

        self._logger = None
        self._client_id = None
        self._host = None
        self._port = None
        self._username = None
        self._password = None
        self._ca_file = None
        self._cert_file = None
        self._key_file = None
        self._mqtt_logger = None
        self._mips_state_sub_map = None
        self._mips_sub_pending_map = None
        self._mips_sub_pending_timer = None

        self._event_connect = None
        self._event_disconnect = None

    def update_mqtt_password(self, password: str) -> None:
        self._password = password
        self._mqtt.username_pw_set(
            username=self._username, password=self._password)

    def log_debug(self, msg, *args, **kwargs) -> None:
        if self._logger:
            self._logger.debug(f'{self._client_id}, '+msg, *args, **kwargs)

    def log_info(self, msg, *args, **kwargs) -> None:
        if self._logger:
            self._logger.info(f'{self._client_id}, '+msg, *args, **kwargs)

    def log_error(self, msg, *args, **kwargs) -> None:
        if self._logger:
            self._logger.error(f'{self._client_id}, '+msg, *args, **kwargs)

    def enable_logger(self, logger: Optional[logging.Logger] = None) -> None:
        self._logger = logger

    def enable_mqtt_logger(
        self, logger: Optional[logging.Logger] = None
    ) -> None:
        if logger:
            self._mqtt.enable_logger(logger=logger)
        else:
            self._mqtt.disable_logger()

    @final
    def mips_connect(self) -> None:
        """mips connect."""
        return self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None)

    @final
    async def mips_connect_async(self) -> None:
        """mips connect async."""
        self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None)
        return await self._event_connect.wait()

    @final
    def mips_disconnect(self) -> None:
        """mips disconnect."""
        return self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None)

    @final
    async def mips_disconnect_async(self) -> None:
        """mips disconnect async."""
        self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None)
        return await self._event_disconnect.wait()

    @final
    def sub_mips_state(
        self, key: str, handler: Callable[[str, bool], asyncio.Future]
    ) -> bool:
        """Subscribe mips state.
        NOTICE: callback to main loop thread
        """
        if isinstance(key, str) is False or handler is None:
            raise MIoTMipsError('invalid params')
        return self._mips_send_cmd(
            type_=MipsCmdType.REG_MIPS_STATE,
            data=MipsRegState(key=key, handler=handler))

    @final
    def unsub_mips_state(self, key: str) -> bool:
        """Unsubscribe mips state."""
        if isinstance(key, str) is False:
            raise MIoTMipsError('invalid params')
        return self._mips_send_cmd(
            type_=MipsCmdType.UNREG_MIPS_STATE, data=MipsRegState(key=key))

    @final
    def mev_set_timeout(
        self, timeout_ms: int, handler: Callable[[any], None],
        handler_ctx: any = None
    ) -> Optional[TimeoutHandle]:
        """set timeout.
        NOTICE: Internal function, only mips threads are allowed to call
        """
        if self._mev is None:
            return None
        return self._mev.set_timeout(
            timeout_ms=timeout_ms,  handler=handler, handler_ctx=handler_ctx)

    @final
    def mev_clear_timeout(self, handle: TimeoutHandle) -> None:
        """clear timeout.
        NOTICE: Internal function, only mips threads are allowed to call
        """
        if self._mev is None:
            return
        self._mev.clear_timeout(handle)

    @final
    def mev_set_read_handler(
        self, fd: int, handler: Callable[[any], None], handler_ctx: any
    ) -> bool:
        """set read handler.
        NOTICE: Internal function, only mips threads are allowed to call
        """
        if self._mev is None:
            return False
        return self._mev.set_read_handler(
            fd=fd, handler=handler, handler_ctx=handler_ctx)

    @final
    def mev_set_write_handler(
        self, fd: int, handler: Callable[[any], None], handler_ctx: any
    ) -> bool:
        """set write handler.
        NOTICE: Internal function, only mips threads are allowed to call
        """
        if self._mev is None:
            return False
        return self._mev.set_write_handler(
            fd=fd, handler=handler, handler_ctx=handler_ctx)

    @property
    def on_mips_cmd(self) -> Callable[[MipsCmd], None]:
        return self._on_mips_cmd

    @on_mips_cmd.setter
    def on_mips_cmd(self, handler: Callable[[MipsCmd], None]) -> None:
        """MUST set after __init__ done.
        NOTICE thread safe, this function will be called at the **mips** thread
        """
        self._on_mips_cmd = handler

    @property
    def on_mips_message(self) -> Callable[[str, bytes], None]:
        return self._on_mips_message

    @on_mips_message.setter
    def on_mips_message(self, handler: Callable[[str, bytes], None]) -> None:
        """MUST set after __init__ done.
        NOTICE thread safe, this function will be called at the **mips** thread
        """
        self._on_mips_message = handler

    @property
    def on_mips_connect(self) -> Callable[[int, dict], None]:
        return self._on_mips_connect

    @on_mips_connect.setter
    def on_mips_connect(self, handler: Callable[[int, dict], None]) -> None:
        """MUST set after __init__ done.
        NOTICE thread safe, this function will be called at the
        **main loop** thread
        """
        self._on_mips_connect = handler

    @property
    def on_mips_disconnect(self) -> Callable[[int, dict], None]:
        return self._on_mips_disconnect

    @on_mips_disconnect.setter
    def on_mips_disconnect(self, handler: Callable[[int, dict], None]) -> None:
        """MUST set after __init__ done.
        NOTICE thread safe, this function will be called at the
        **main loop** thread
        """
        self._on_mips_disconnect = handler

    @abstractmethod
    def sub_prop(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, piid: int = None, handler_ctx: any = None
    ) -> bool: ...

    @abstractmethod
    def unsub_prop(
        self, did: str, siid: int = None, piid: int = None
    ) -> bool: ...

    @abstractmethod
    def sub_event(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, eiid: int = None, handler_ctx: any = None
    ) -> bool: ...

    @abstractmethod
    def unsub_event(
        self, did: str, siid: int = None, eiid: int = None
    ) -> bool: ...

    @abstractmethod
    async def get_dev_list_async(
        self, payload: str = None, timeout_ms: int = 10000
    ) -> dict[str, dict]: ...

    @abstractmethod
    async def get_prop_async(
        self, did: str, siid: int, piid: int, timeout_ms: int = 10000
    ) -> any: ...

    @abstractmethod
    async def set_prop_async(
        self, did: str, siid: int, piid: int, value: any,
        timeout_ms: int = 10000
    ) -> bool: ...

    @abstractmethod
    async def action_async(
        self, did: str, siid: int, aiid: int, in_list: list,
        timeout_ms: int = 10000
    ) -> tuple[bool, list]: ...

    @final
    def _mips_sub_internal(self, topic: str) -> None:
        """mips subscribe.
        NOTICE: Internal function, only mips threads are allowed to call
        """
        self.__thread_check()
        if not self._mqtt or not self._mqtt.is_connected():
            return
        try:
            if topic not in self._mips_sub_pending_map:
                self._mips_sub_pending_map[topic] = 0
            if not self._mips_sub_pending_timer:
                self._mips_sub_pending_timer = self.mev_set_timeout(
                    10, self.__mips_sub_internal_pending_handler, topic)
        except Exception as err:  # pylint: disable=broad-exception-caught
            # Catch all exception
            self.log_error(f'mips sub internal error, {topic}. {err}')

    @final
    def _mips_unsub_internal(self, topic: str) -> None:
        """mips unsubscribe.
        NOTICE: Internal function, only mips threads are allowed to call
        """
        self.__thread_check()
        if not self._mqtt or not self._mqtt.is_connected():
            return
        try:
            result, mid = self._mqtt.unsubscribe(topic=topic)
            if result == MQTT_ERR_SUCCESS:
                self.log_debug(
                    f'mips unsub internal success, {result}, {mid}, {topic}')
                return
            self.log_error(
                f'mips unsub internal error, {result}, {mid}, {topic}')
        except Exception as err:  # pylint: disable=broad-exception-caught
            # Catch all exception
            self.log_error(f'mips unsub internal error, {topic}, {err}')

    @final
    def _mips_publish_internal(
        self, topic: str, payload: str | bytes,
        wait_for_publish: bool = False, timeout_ms: int = 10000
    ) -> bool:
        """mips publish message.
        NOTICE: Internal function, only mips threads are allowed to call

        """
        self.__thread_check()
        if not self._mqtt or not self._mqtt.is_connected():
            return False
        try:
            handle = self._mqtt.publish(
                topic=topic, payload=payload, qos=self.MIPS_QOS)
            # self.log_debug(f'_mips_publish_internal, {topic}, {payload}')
            if wait_for_publish is True:
                handle.wait_for_publish(timeout_ms/1000.0)
            return True
        except Exception as err:  # pylint: disable=broad-exception-caught
            # Catch other exception
            self.log_error(f'mips publish internal error, {err}')
        return False

    @final
    def _mips_send_cmd(self, type_: MipsCmdType, data: any) -> bool:
        if self._mips_queue is None or self._cmd_event_fd is None:
            raise MIoTMipsError('send mips cmd disable')
        # Put data to queue
        self._mips_queue.put(MipsCmd(type_=type_, data=data))
        # Write event fd
        os.eventfd_write(self._cmd_event_fd, 1)
        # self.log_debug(f'send mips cmd, {type}, {data}')
        return True

    def __thread_check(self) -> None:
        if threading.current_thread() is not self._mips_thread:
            raise MIoTMipsError('illegal call')

    def __mips_cmd_read_handler(self, ctx: any) -> None:
        fd_value = os.eventfd_read(self._cmd_event_fd)
        if fd_value == 0:
            return
        while self._mips_queue.empty() is False:
            mips_cmd: MipsCmd = self._mips_queue.get(block=False)
            if mips_cmd.type_ == MipsCmdType.CONNECT:
                self._mips_reconnect_tag = True
                self.__mips_try_reconnect(immediately=True)
            elif mips_cmd.type_ == MipsCmdType.DISCONNECT:
                self._mips_reconnect_tag = False
                self.__mips_disconnect()
            elif mips_cmd.type_ == MipsCmdType.DEINIT:
                self.log_info('mips client recv deinit cmd')
                self.__mips_disconnect()
                # Close cmd event fd
                if self._cmd_event_fd:
                    self.mev_set_read_handler(
                        self._cmd_event_fd, None, None)
                    os.close(self._cmd_event_fd)
                    self._cmd_event_fd = None
                if self._mips_queue:
                    self._mips_queue = None
                # ev loop stop
                if self._mev:
                    self._mev.loop_stop()
                    self._mev = None
                break
            elif mips_cmd.type_ == MipsCmdType.REG_MIPS_STATE:
                state: MipsState = mips_cmd.data
                self._mips_state_sub_map[state.key] = state
                self.log_debug(f'mips register mips state, {state.key}')
            elif mips_cmd.type_ == MipsCmdType.UNREG_MIPS_STATE:
                state: MipsState = mips_cmd.data
                del self._mips_state_sub_map[state.key]
                self.log_debug(f'mips unregister mips state, {state.key}')
            else:
                if self._on_mips_cmd:
                    self._on_mips_cmd(mips_cmd=mips_cmd)

    def __mqtt_read_handler(self, ctx: any) -> None:
        self.__mqtt_loop_handler(ctx=ctx)

    def __mqtt_write_handler(self, ctx: any) -> None:
        self.mev_set_write_handler(self._mqtt_fd, None, None)
        self.__mqtt_loop_handler(ctx=ctx)

    def __mqtt_timer_handler(self, ctx: any) -> None:
        self.__mqtt_loop_handler(ctx=ctx)
        if self._mqtt:
            self._mqtt_timer = self.mev_set_timeout(
                self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None)

    def __mqtt_loop_handler(self, ctx: any) -> None:
        try:
            if self._mqtt:
                self._mqtt.loop_read()
            if self._mqtt:
                self._mqtt.loop_write()
            if self._mqtt:
                self._mqtt.loop_misc()
            if self._mqtt and self._mqtt.want_write():
                self.mev_set_write_handler(
                    self._mqtt_fd, self.__mqtt_write_handler, None)
        except Exception as err:  # pylint: disable=broad-exception-caught
            # Catch all exception
            self.log_error(f'__mqtt_loop_handler, {err}')
            raise err

    def __mips_loop_thread(self) -> None:
        self.log_info('mips_loop_thread start')
        # Set mqtt config
        if self._username:
            self._mqtt.username_pw_set(
                username=self._username, password=self._password)
        if (
            self._ca_file
            and self._cert_file
            and self._key_file
        ):
            self._mqtt.tls_set(
                tls_version=ssl.PROTOCOL_TLS_CLIENT,
                ca_certs=self._ca_file,
                certfile=self._cert_file,
                keyfile=self._key_file)
        else:
            self._mqtt.tls_set(tls_version=ssl.PROTOCOL_TLS_CLIENT)
        self._mqtt.tls_insecure_set(True)
        self._mqtt.on_connect = self.__on_connect
        self._mqtt.on_connect_fail = self.__on_connect_failed
        self._mqtt.on_disconnect = self.__on_disconnect
        self._mqtt.on_message = self.__on_message
        # Run event loop
        self._mev.loop_forever()
        self.log_info('mips_loop_thread exit!')

    def __on_connect(self, client, user_data, flags, rc, props) -> None:
        if not self._mqtt.is_connected():
            return
        self.log_info(f'mips connect, {flags}, {rc}, {props}')
        self._mqtt_state = True
        if self._on_mips_connect:
            self.mev_set_timeout(
                timeout_ms=0,
                handler=lambda ctx:
                    self._on_mips_connect(rc, props))
        for item in self._mips_state_sub_map.values():
            if item.handler is None:
                continue
            self.main_loop.call_soon_threadsafe(
                self.main_loop.create_task,
                item.handler(item.key, True))
        # Resolve future
        self._event_connect.set()
        self._event_disconnect.clear()

    def __on_connect_failed(self, client, user_data, flags, rc) -> None:
        self.log_error(f'mips connect failed, {flags}, {rc}')
        # Try to reconnect
        self.__mips_try_reconnect()

    def __on_disconnect(self,  client, user_data, rc, props) -> None:
        if self._mqtt_state:
            self.log_error(f'mips disconnect, {rc}, {props}')
            self._mqtt_state = False
            if self._mqtt_timer:
                self.mev_clear_timeout(self._mqtt_timer)
                self._mqtt_timer = None
            if self._mqtt_fd != -1:
                self.mev_set_read_handler(self._mqtt_fd, None, None)
                self.mev_set_write_handler(self._mqtt_fd, None, None)
                self._mqtt_fd = -1
            # Clear retry sub
            if self._mips_sub_pending_timer:
                self.mev_clear_timeout(self._mips_sub_pending_timer)
                self._mips_sub_pending_timer = None
            self._mips_sub_pending_map = {}
            if self._on_mips_disconnect:
                self.mev_set_timeout(
                    timeout_ms=0,
                    handler=lambda ctx:
                        self._on_mips_disconnect(rc, props))
            # Call state sub handler
            for item in self._mips_state_sub_map.values():
                if item.handler is None:
                    continue
                self.main_loop.call_soon_threadsafe(
                    self.main_loop.create_task,
                    item.handler(item.key, False))

        # Try to reconnect
        self.__mips_try_reconnect()
        # Set event
        self._event_disconnect.set()
        self._event_connect.clear()

    def __on_message(self, client, user_data, msg) -> None:
        self._on_mips_message(topic=msg.topic, payload=msg.payload)

    def __mips_try_reconnect(self, immediately: bool = False) -> None:
        if self._mips_reconnect_timer:
            self.mev_clear_timeout(self._mips_reconnect_timer)
            self._mips_reconnect_timer = None
        if not self._mips_reconnect_tag:
            return
        interval: int = 0
        if not immediately:
            interval = self.__get_next_reconnect_time()
            self.log_error(
                'mips try reconnect after %sms', interval)
        self._mips_reconnect_timer = self.mev_set_timeout(
            interval, self.__mips_connect, None)

    def __mips_sub_internal_pending_handler(self, ctx: any) -> None:
        subbed_count = 1
        for topic in list(self._mips_sub_pending_map.keys()):
            if subbed_count > self.MIPS_SUB_PATCH:
                break
            count = self._mips_sub_pending_map[topic]
            if count > 3:
                self._mips_sub_pending_map.pop(topic)
                self.log_error(f'retry mips sub internal error, {topic}')
                continue
            subbed_count += 1
            result, mid = self._mqtt.subscribe(topic, qos=self.MIPS_QOS)
            if result == MQTT_ERR_SUCCESS:
                self._mips_sub_pending_map.pop(topic)
                self.log_debug(f'mips sub internal success, {topic}')
                continue
            self._mips_sub_pending_map[topic] = count+1
            self.log_error(
                f'retry mips sub internal, {count}, {topic}, {result}, {mid}')

        if len(self._mips_sub_pending_map):
            self._mips_sub_pending_timer = self.mev_set_timeout(
                self.MIPS_SUB_INTERVAL,
                self.__mips_sub_internal_pending_handler, None)
        else:
            self._mips_sub_pending_timer = None

    def __mips_connect(self, ctx: any = None) -> None:
        result = MQTT_ERR_UNKNOWN
        if self._mips_reconnect_timer:
            self.mev_clear_timeout(self._mips_reconnect_timer)
            self._mips_reconnect_timer = None
        try:
            # Try clean mqtt fd before mqtt connect
            if self._mqtt_timer:
                self.mev_clear_timeout(self._mqtt_timer)
                self._mqtt_timer = None
            if self._mqtt_fd != -1:
                self.mev_set_read_handler(self._mqtt_fd, None, None)
                self.mev_set_write_handler(self._mqtt_fd, None, None)
                self._mqtt_fd = -1
            result = self._mqtt.connect(
                host=self._host, port=self._port,
                clean_start=True, keepalive=MIHOME_MQTT_KEEPALIVE)
            self.log_info(f'__mips_connect success, {result}')
        except (TimeoutError, OSError) as error:
            self.log_error('__mips_connect, connect error, %s', error)

        if result == MQTT_ERR_SUCCESS:
            self._mqtt_fd = self._mqtt.socket()
            self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}')
            self.mev_set_read_handler(
                self._mqtt_fd, self.__mqtt_read_handler, None)
            if self._mqtt.want_write():
                self.mev_set_write_handler(
                    self._mqtt_fd, self.__mqtt_write_handler, None)
            self._mqtt_timer = self.mev_set_timeout(
                self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None)
        else:
            self.log_error(f'__mips_connect error result, {result}')
            self.__mips_try_reconnect()

    def __mips_disconnect(self) -> None:
        if self._mips_reconnect_timer:
            self.mev_clear_timeout(self._mips_reconnect_timer)
            self._mips_reconnect_timer = None
        if self._mqtt_timer:
            self.mev_clear_timeout(self._mqtt_timer)
            self._mqtt_timer = None
        if self._mqtt_fd != -1:
            self.mev_set_read_handler(self._mqtt_fd, None, None)
            self.mev_set_write_handler(self._mqtt_fd, None, None)
            self._mqtt_fd = -1
        self._mqtt.disconnect()

    def __get_next_reconnect_time(self) -> int:
        if self._mips_reconnect_interval == 0:
            self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN
        else:
            self._mips_reconnect_interval = min(
                self._mips_reconnect_interval*2,
                self.MIPS_RECONNECT_INTERVAL_MAX)
        return self._mips_reconnect_interval


class MipsCloudClient(MipsClient):
    """MIoT Pub/Sub Cloud Client."""
    # pylint: disable=unused-argument
    # pylint: disable=inconsistent-quotes
    _msg_matcher: MIoTMatcher

    def __init__(
            self, uuid: str, cloud_server: str, app_id: str,
            token: str, port: int = 8883,
            loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        self._msg_matcher = MIoTMatcher()
        super().__init__(
            client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com',
            port=port, username=app_id, password=token, loop=loop)

        self.on_mips_cmd = self.__on_mips_cmd_handler
        self.on_mips_message = self.__on_mips_message_handler
        self.on_mips_connect = self.__on_mips_connect_handler
        self.on_mips_disconnect = self.__on_mips_disconnect_handler

    def deinit(self) -> None:
        self.mips_deinit()
        self._msg_matcher = None
        self.on_mips_cmd = None
        self.on_mips_message = None
        self.on_mips_connect = None

    @final
    def connect(self) -> None:
        self.mips_connect()

    @final
    async def connect_async(self) -> None:
        await self.mips_connect_async()

    @final
    def disconnect(self) -> None:
        self.mips_disconnect()
        self._msg_matcher = MIoTMatcher()

    @final
    async def disconnect_async(self) -> None:
        await self.mips_disconnect_async()
        self._msg_matcher = MIoTMatcher()

    def update_access_token(self, access_token: str) -> bool:
        if not isinstance(access_token, str):
            raise MIoTMipsError('invalid token')
        return self.update_mqtt_password(password=access_token)

    @final
    def sub_prop(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, piid: int = None, handler_ctx: any = None
    ) -> bool:
        if not isinstance(did, str) or handler is None:
            raise MIoTMipsError('invalid params')

        topic: str = (
            f'device/{did}/up/properties_changed/'
            f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')

        def on_prop_msg(topic: str, payload: str, ctx: any) -> bool:
            try:
                msg: dict = json.loads(payload)
            except json.JSONDecodeError:
                self.log_error(
                    f'on_prop_msg, invalid msg, {topic}, {payload}')
                return
            if (
                not isinstance(msg.get('params', None), dict)
                or 'siid' not in msg['params']
                or 'piid' not in msg['params']
                or 'value' not in msg['params']
            ):
                self.log_error(
                    f'on_prop_msg, invalid msg, {topic}, {payload}')
                return
            if handler:
                self.log_debug('on properties_changed, %s', payload)
                handler(msg['params'], ctx)
        return self.__reg_broadcast(
            topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx)

    @final
    def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool:
        if not isinstance(did, str):
            raise MIoTMipsError('invalid params')
        topic: str = (
            f'device/{did}/up/properties_changed/'
            f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
        return self.__unreg_broadcast(topic=topic)

    @final
    def sub_event(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, eiid: int = None, handler_ctx: any = None
    ) -> bool:
        if not isinstance(did, str) or handler is None:
            raise MIoTMipsError('invalid params')
        # Spelling error: event_occured
        topic: str = (
            f'device/{did}/up/event_occured/'
            f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')

        def on_event_msg(topic: str, payload: str, ctx: any) -> bool:
            try:
                msg: dict = json.loads(payload)
            except json.JSONDecodeError:
                self.log_error(
                    f'on_event_msg, invalid msg, {topic}, {payload}')
                return
            if (
                not isinstance(msg.get('params', None), dict)
                or 'siid' not in msg['params']
                or 'eiid' not in msg['params']
                or 'arguments' not in msg['params']
            ):
                self.log_error(
                    f'on_event_msg, invalid msg, {topic}, {payload}')
                return
            if handler:
                self.log_debug('on on_event_msg, %s', payload)
                msg['params']['from'] = 'cloud'
                handler(msg['params'], ctx)
        return self.__reg_broadcast(
            topic=topic, handler=on_event_msg, handler_ctx=handler_ctx)

    @final
    def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool:
        if not isinstance(did, str):
            raise MIoTMipsError('invalid params')
        # Spelling error: event_occured
        topic: str = (
            f'device/{did}/up/event_occured/'
            f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
        return self.__unreg_broadcast(topic=topic)

    @final
    def sub_device_state(
        self, did: str, handler: Callable[[str, MIoTDeviceState, any], None],
        handler_ctx: any = None
    ) -> bool:
        """subscribe online state."""
        if not isinstance(did, str) or handler is None:
            raise MIoTMipsError('invalid params')
        topic: str = f'device/{did}/state/#'

        def on_state_msg(topic: str, payload: str, ctx: any) -> None:
            msg: dict = json.loads(payload)
            # {"device_id":"xxxx","device_name":"米家智能插座3   ","event":"online",
            # "model": "cuco.plug.v3","timestamp":1709001070828,"uid":xxxx}
            if msg is None or 'device_id' not in msg or 'event' not in msg:
                self.log_error(f'on_state_msg, recv unknown msg, {payload}')
                return
            if msg['device_id'] != did:
                self.log_error(
                    f'on_state_msg, err msg, {did}!={msg["device_id"]}')
                return
            if handler:
                self.log_debug('cloud, device state changed, %s', payload)
                handler(
                    did, MIoTDeviceState.ONLINE if msg['event'] == 'online'
                    else MIoTDeviceState.OFFLINE, ctx)
        return self.__reg_broadcast(
            topic=topic, handler=on_state_msg, handler_ctx=handler_ctx)

    @final
    def unsub_device_state(self, did: str) -> bool:
        if not isinstance(did, str):
            raise MIoTMipsError('invalid params')
        topic: str = f'device/{did}/state/#'
        return self.__unreg_broadcast(topic=topic)

    async def get_dev_list_async(
        self, payload: str = None, timeout_ms: int = 10000
    ) -> dict[str, dict]:
        raise NotImplementedError('please call in http client')

    async def get_prop_async(
        self, did: str, siid: int, piid: int,  timeout_ms: int = 10000
    ) -> any:
        raise NotImplementedError('please call in http client')

    async def set_prop_async(
        self, did: str, siid: int, piid: int, value: any,
        timeout_ms: int = 10000
    ) -> bool:
        raise NotImplementedError('please call in http client')

    async def action_async(
        self, did: str, siid: int, aiid: int, in_list: list,
        timeout_ms: int = 10000
    ) -> tuple[bool, list]:
        raise NotImplementedError('please call in http client')

    def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None:
        """
        NOTICE thread safe, this function will be called at the **mips** thread
        """
        if mips_cmd.type_ == MipsCmdType.REG_BROADCAST:
            reg_bc: MipsRegBroadcast = mips_cmd.data
            if not self._msg_matcher.get(topic=reg_bc.topic):
                sub_bc: MipsBroadcast = MipsBroadcast(
                    topic=reg_bc.topic, handler=reg_bc.handler,
                    handler_ctx=reg_bc.handler_ctx)
                self._msg_matcher[reg_bc.topic] = sub_bc
                self._mips_sub_internal(topic=reg_bc.topic)
            else:
                self.log_debug(f'mips cloud re-reg broadcast, {reg_bc.topic}')
        elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST:
            unreg_bc: MipsRegBroadcast = mips_cmd.data
            if self._msg_matcher.get(topic=unreg_bc.topic):
                del self._msg_matcher[unreg_bc.topic]
                self._mips_unsub_internal(topic=unreg_bc.topic)

    def __reg_broadcast(
        self, topic: str, handler: Callable[[str, str, any], None],
        handler_ctx: any = None
    ) -> bool:
        return self._mips_send_cmd(
            type_=MipsCmdType.REG_BROADCAST,
            data=MipsRegBroadcast(
                topic=topic, handler=handler, handler_ctx=handler_ctx))

    def __unreg_broadcast(self, topic: str) -> bool:
        return self._mips_send_cmd(
            type_=MipsCmdType.UNREG_BROADCAST,
            data=MipsRegBroadcast(topic=topic))

    def __on_mips_connect_handler(self, rc, props) -> None:
        """sub topic."""
        for topic, _ in list(
                self._msg_matcher.iter_all_nodes()):
            self._mips_sub_internal(topic=topic)

    def __on_mips_disconnect_handler(self, rc, props) -> None:
        """unsub topic."""
        pass

    def __on_mips_message_handler(self, topic: str, payload) -> None:
        """
        NOTICE thread safe, this function will be called at the **mips** thread
        """
        # broadcast
        bc_list: list[MipsBroadcast] = list(
            self._msg_matcher.iter_match(topic))
        if not bc_list:
            return
        # self.log_debug(f"on broadcast, {topic}, {payload}")
        for item in bc_list or []:
            if item.handler is None:
                continue
            # NOTICE: call threadsafe
            self.main_loop.call_soon_threadsafe(
                item.handler, topic, payload, item.handler_ctx)


class MipsLocalClient(MipsClient):
    """MIoT Pub/Sub Local Client."""
    # pylint: disable=unused-argument
    # pylint: disable=inconsistent-quotes
    MIPS_RECONNECT_INTERVAL_MIN: int = 6000
    MIPS_RECONNECT_INTERVAL_MAX: int = 60000
    MIPS_SUB_PATCH: int = 1000
    MIPS_SUB_INTERVAL: int = 100
    _did: str
    _group_id: str
    _home_name: str
    _mips_seed_id: int
    _reply_topic: str
    _dev_list_change_topic: str
    _request_map: dict[str, MipsRequest]
    _msg_matcher: MIoTMatcher
    _device_state_sub_map: dict[str, MipsDeviceState]
    _get_prop_queue: dict[str, list]
    _get_prop_timer: asyncio.TimerHandle
    _on_dev_list_changed: Callable[[any, list[str]], asyncio.Future]

    def __init__(
        self, did: str, host: str, group_id: str,
        ca_file: str, cert_file: str, key_file: str,
        port: int = 8883, home_name: str = '',
        loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        self._did = did
        self._group_id = group_id
        self._home_name = home_name
        self._mips_seed_id = random.randint(0, self.UINT32_MAX)
        self._reply_topic = f'{did}/reply'
        self._dev_list_change_topic = f'{did}/appMsg/devListChange'
        self._request_map = {}
        self._msg_matcher = MIoTMatcher()
        self._device_state_sub_map = {}
        self._get_prop_queue = {}
        self._get_prop_timer = None
        self._on_dev_list_changed = None

        super().__init__(
            client_id=did, host=host, port=port,
            ca_file=ca_file, cert_file=cert_file, key_file=key_file, loop=loop)
        # MIPS local thread name use group_id
        self._mips_thread.name = self._group_id

        self.on_mips_cmd = self.__on_mips_cmd_handler
        self.on_mips_message = self.__on_mips_message_handler
        self.on_mips_connect = self.__on_mips_connect_handler

    @property
    def group_id(self) -> str:
        return self._group_id

    def deinit(self) -> None:
        self.mips_deinit()
        self._did = None
        self._mips_seed_id = None
        self._reply_topic = None
        self._dev_list_change_topic = None
        self._request_map = None
        self._msg_matcher = None
        self._device_state_sub_map = None
        self._get_prop_queue = None
        self._get_prop_timer = None
        self._on_dev_list_changed = None

        self.on_mips_cmd = None
        self.on_mips_message = None
        self.on_mips_connect = None

    def log_debug(self, msg, *args, **kwargs) -> None:
        if self._logger:
            self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs)

    def log_info(self, msg, *args, **kwargs) -> None:
        if self._logger:
            self._logger.info(f'{self._home_name}, '+msg, *args, **kwargs)

    def log_error(self, msg, *args, **kwargs) -> None:
        if self._logger:
            self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs)

    @final
    def connect(self) -> None:
        self.mips_connect()

    @final
    async def connect_async(self) -> None:
        await self.mips_connect_async()

    @final
    def disconnect(self) -> None:
        self.mips_disconnect()
        self._request_map = {}
        self._msg_matcher = MIoTMatcher()
        self._device_state_sub_map = {}

    @final
    async def disconnect_async(self) -> None:
        await self.mips_disconnect_async()
        self._request_map = {}
        self._msg_matcher = MIoTMatcher()
        self._device_state_sub_map = {}

    @final
    def sub_prop(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, piid: int = None, handler_ctx: any = None
    ) -> bool:
        topic: str = (
            f'appMsg/notify/iot/{did}/property/'
            f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}')

        def on_prop_msg(topic: str, payload: str, ctx: any):
            msg: dict = json.loads(payload)
            if (
                msg is None
                or 'did' not in msg
                or 'siid' not in msg
                or 'piid' not in msg
                or 'value' not in msg
            ):
                # self.log_error(f'on_prop_msg, recv unknown msg, {payload}')
                return
            if handler:
                self.log_debug('local, on properties_changed, %s', payload)
                handler(msg, ctx)
        return self.__reg_broadcast(
            topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx)

    @final
    def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool:
        topic: str = (
            f'appMsg/notify/iot/{did}/property/'
            f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}')
        return self.__unreg_broadcast(topic=topic)

    @final
    def sub_event(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, eiid: int = None, handler_ctx: any = None
    ) -> bool:
        topic: str = (
            f'appMsg/notify/iot/{did}/event/'
            f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}')

        def on_event_msg(topic: str, payload: str, ctx: any):
            msg: dict = json.loads(payload)
            if (
                msg is None
                or 'did' not in msg
                or 'siid' not in msg
                or 'eiid' not in msg
                or 'arguments' not in msg
            ):
                # self.log_error(f'on_event_msg, recv unknown msg, {payload}')
                return
            if handler:
                self.log_debug('local, on event_occurred, %s', payload)
                handler(msg, ctx)
        return self.__reg_broadcast(
            topic=topic, handler=on_event_msg, handler_ctx=handler_ctx)

    @final
    def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool:
        topic: str = (
            f'appMsg/notify/iot/{did}/event/'
            f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}')
        return self.__unreg_broadcast(topic=topic)

    @final
    async def get_prop_safe_async(
        self, did: str, siid: int, piid: int, timeout_ms: int = 10000
    ) -> any:
        self._get_prop_queue.setdefault(did, [])
        fut: asyncio.Future = self.main_loop.create_future()
        self._get_prop_queue[did].append({
            'param': json.dumps({
                'did': did,
                'siid': siid,
                'piid': piid
            }),
            'fut': fut,
            'timeout_ms': timeout_ms
        })
        if self._get_prop_timer is None:
            self._get_prop_timer = self.main_loop.create_task(
                self.__get_prop_timer_handle())
        return await fut

    @final
    async def get_prop_async(
        self, did: str, siid: int, piid: int, timeout_ms: int = 10000
    ) -> any:
        result_obj = await self.__request_async(
            topic='proxy/get',
            payload=json.dumps({
                'did': did,
                'siid': siid,
                'piid': piid
            }),
            timeout_ms=timeout_ms)
        if not isinstance(result_obj, dict) or 'value' not in result_obj:
            return None
        return result_obj['value']

    @final
    async def set_prop_async(
        self, did: str, siid: int, piid: int, value: any,
        timeout_ms: int = 10000
    ) -> dict:
        payload_obj: dict = {
            'did': did,
            'rpc': {
                'id': self.__gen_mips_id,
                'method': 'set_properties',
                'params': [{
                    'did': did,
                    'siid': siid,
                    'piid': piid,
                    'value': value
                }]
            }
        }
        result_obj = await self.__request_async(
            topic='proxy/rpcReq',
            payload=json.dumps(payload_obj),
            timeout_ms=timeout_ms)
        if result_obj:
            if (
                'result' in result_obj
                and len(result_obj['result']) == 1
                and 'did' in result_obj['result'][0]
                and result_obj['result'][0]['did'] == did
                and 'code' in result_obj['result'][0]
            ):
                return result_obj['result'][0]
            if 'error' in result_obj:
                return result_obj['error']
        return {
            'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
            'message': 'Invalid result'}

    @final
    async def action_async(
        self, did: str, siid: int, aiid: int, in_list: list,
        timeout_ms: int = 10000
    ) -> dict:
        payload_obj: dict = {
            'did': did,
            'rpc': {
                'id': self.__gen_mips_id,
                'method': 'action',
                'params': {
                    'did': did,
                    'siid': siid,
                    'aiid': aiid,
                    'in': in_list
                }
            }
        }
        result_obj = await self.__request_async(
            topic='proxy/rpcReq', payload=json.dumps(payload_obj),
            timeout_ms=timeout_ms)
        if result_obj:
            if 'result' in result_obj and 'code' in result_obj['result']:
                return result_obj['result']
            if 'error' in result_obj:
                return result_obj['error']
        return {
            'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
            'message': 'Invalid result'}

    @final
    async def get_dev_list_async(
        self, payload: str = None, timeout_ms: int = 10000
    ) -> dict[str, dict]:
        result_obj = await self.__request_async(
            topic='proxy/getDevList', payload=payload or '{}',
            timeout_ms=timeout_ms)
        if not result_obj or 'devList' not in result_obj:
            return None
        device_list = {}
        for did, info in result_obj['devList'].items():
            name: str = info.get('name', None)
            urn: str = info.get('urn', None)
            model: str = info.get('model', None)
            if name is None or urn is None or model is None:
                self.log_error(f'invalid device info, {did}, {info}')
                continue
            device_list[did] = {
                'did': did,
                'name': name,
                'urn': urn,
                'model': model,
                'online': info.get('online', False),
                'icon': info.get('icon', None),
                'fw_version': None,
                'home_id': '',
                'home_name': '',
                'room_id': info.get('roomId', ''),
                'room_name': info.get('roomName', ''),
                'specv2_access': info.get('specV2Access', False),
                'push_available': info.get('pushAvailable', False),
                'manufacturer': model.split('.')[0],
            }
        return device_list

    @final
    async def get_action_group_list_async(
        self, timeout_ms: int = 10000
    ) -> list[str]:
        result_obj = await self.__request_async(
            topic='proxy/getMijiaActionGroupList',
            payload='{}',
            timeout_ms=timeout_ms)
        if not result_obj or 'result' not in result_obj:
            return None
        return result_obj['result']

    @final
    async def exec_action_group_list_async(
        self, ag_id: str, timeout_ms: int = 10000
    ) -> dict:
        result_obj = await self.__request_async(
            topic='proxy/execMijiaActionGroup',
            payload=f'{{"id":"{ag_id}"}}',
            timeout_ms=timeout_ms)
        if result_obj:
            if 'result' in result_obj:
                return result_obj['result']
            if 'error' in result_obj:
                return result_obj['error']
        return {
            'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value,
            'message': 'invalid result'}

    @final
    @property
    def on_dev_list_changed(self) -> Callable[[any, list[str]], asyncio.Future]:
        return self._on_dev_list_changed

    @final
    @on_dev_list_changed.setter
    def on_dev_list_changed(
        self, func: Callable[[any, list[str]], asyncio.Future]
    ) -> None:
        """run in main loop."""
        self._on_dev_list_changed = func

    @final
    def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None:
        if mips_cmd.type_ == MipsCmdType.CALL_API:
            req_data: MipsRequestData = mips_cmd.data
            req = MipsRequest()
            req.mid = self.__gen_mips_id
            req.on_reply = req_data.on_reply
            req.on_reply_ctx = req_data.on_reply_ctx
            pub_topic: str = f'master/{req_data.topic}'
            result = self.__mips_publish(
                topic=pub_topic, payload=req_data.payload, mid=req.mid,
                ret_topic=self._reply_topic)
            self.log_debug(
                f'mips local call api, {result}, {req.mid}, {pub_topic}, '
                f'{req_data.payload}')

            def on_request_timeout(req: MipsRequest):
                self.log_error(
                    f'on mips request timeout, {req.mid}, {pub_topic}'
                    f', {req_data.payload}')
                self._request_map.pop(str(req.mid), None)
                req.on_reply(
                    '{"error":{"code":-10006, "message":"timeout"}}',
                    req.on_reply_ctx)
            req.timer = self.mev_set_timeout(
                req_data.timeout_ms, on_request_timeout, req)
            self._request_map[str(req.mid)] = req
        elif mips_cmd.type_ == MipsCmdType.REG_BROADCAST:
            reg_bc: MipsRegBroadcast = mips_cmd.data
            sub_topic: str = f'{self._did}/{reg_bc.topic}'
            if not self._msg_matcher.get(sub_topic):
                sub_bc: MipsBroadcast = MipsBroadcast(
                    topic=sub_topic, handler=reg_bc.handler,
                    handler_ctx=reg_bc.handler_ctx)
                self._msg_matcher[sub_topic] = sub_bc
                self._mips_sub_internal(topic=f'master/{reg_bc.topic}')
            else:
                self.log_debug(f'mips re-reg broadcast, {sub_topic}')
        elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST:
            unreg_bc: MipsRegBroadcast = mips_cmd.data
            # Central hub gateway needs to add prefix
            unsub_topic: str = f'{self._did}/{unreg_bc.topic}'
            if self._msg_matcher.get(unsub_topic):
                del self._msg_matcher[unsub_topic]
                self._mips_unsub_internal(
                    topic=re.sub(f'^{self._did}', 'master', unsub_topic))
        elif mips_cmd.type_ == MipsCmdType.REG_DEVICE_STATE:
            reg_dev_state: MipsRegDeviceState = mips_cmd.data
            self._device_state_sub_map[reg_dev_state.did] = reg_dev_state
            self.log_debug(
                f'mips local reg device state, {reg_dev_state.did}')
        elif mips_cmd.type_ == MipsCmdType.UNREG_DEVICE_STATE:
            unreg_dev_state: MipsRegDeviceState = mips_cmd.data
            del self._device_state_sub_map[unreg_dev_state.did]
            self.log_debug(
                f'mips local unreg device state, {unreg_dev_state.did}')
        else:
            self.log_error(
                f'mips local recv unknown cmd, {mips_cmd.type_}, '
                f'{mips_cmd.data}')

    def __on_mips_connect_handler(self, rc, props) -> None:
        self.log_debug('__on_mips_connect_handler')
        # Sub did/#, include reply topic
        self._mips_sub_internal(f'{self._did}/#')
        # Sub device list change
        self._mips_sub_internal('master/appMsg/devListChange')
        # Do not need to subscribe api topics, for they are covered by did/#
        # Sub api topic.
        # Sub broadcast topic
        for topic, _ in list(self._msg_matcher.iter_all_nodes()):
            self._mips_sub_internal(
                topic=re.sub(f'^{self._did}', 'master', topic))

    @final
    def __on_mips_message_handler(self, topic: str, payload: bytes) -> None:
        mips_msg: MipsMessage = MipsMessage.unpack(payload)
        # self.log_debug(
        #     f"mips local client, on_message, {topic} -> {mips_msg}")
        # Reply
        if topic == self._reply_topic:
            self.log_debug(f'on request reply, {mips_msg}')
            req: MipsRequest = self._request_map.pop(str(mips_msg.mid), None)
            if req:
                # Cancel timer
                self.mev_clear_timeout(req.timer)
                if req.on_reply:
                    self.main_loop.call_soon_threadsafe(
                        req.on_reply, mips_msg.payload or '{}',
                        req.on_reply_ctx)
            return
        # Broadcast
        bc_list: list[MipsBroadcast] = list(self._msg_matcher.iter_match(
            topic=topic))
        if bc_list:
            self.log_debug(f'on broadcast, {topic}, {mips_msg}')
            for item in bc_list or []:
                if item.handler is None:
                    continue
                self.main_loop.call_soon_threadsafe(
                    item.handler, topic[topic.find('/')+1:],
                    mips_msg.payload or '{}', item.handler_ctx)
            return
        # Device list change
        if topic == self._dev_list_change_topic:
            payload_obj: dict = json.loads(mips_msg.payload)
            dev_list = payload_obj.get('devList', None)
            if not isinstance(dev_list, list) or not dev_list:
                _LOGGER.error(
                    'unknown devListChange msg, %s', mips_msg.payload)
                return
            if self._on_dev_list_changed:
                self.main_loop.call_soon_threadsafe(
                    self.main_loop.create_task,
                    self._on_dev_list_changed(self, payload_obj['devList']))
            return

        self.log_debug(
            f'mips local client, recv unknown msg, {topic} -> {mips_msg}')

    @property
    def __gen_mips_id(self) -> int:
        mips_id: int = self._mips_seed_id
        self._mips_seed_id = int((self._mips_seed_id+1) % self.UINT32_MAX)
        return mips_id

    def __mips_publish(
            self, topic: str, payload: str | bytes, mid: int = None,
            ret_topic: str = None, wait_for_publish: bool = False,
            timeout_ms: int = 10000
    ) -> bool:
        mips_msg: bytes = MipsMessage.pack(
            mid=mid or self.__gen_mips_id, payload=payload,
            msg_from='local', ret_topic=ret_topic)
        return self._mips_publish_internal(
            topic=topic.strip(), payload=mips_msg,
            wait_for_publish=wait_for_publish, timeout_ms=timeout_ms)

    def __request(
            self, topic: str, payload: str,
            on_reply: Callable[[str, any], None],
            on_reply_ctx: any = None, timeout_ms: int = 10000
    ) -> bool:
        if topic is None or payload is None or on_reply is None:
            raise MIoTMipsError('invalid params')
        req_data: MipsRequestData = MipsRequestData()
        req_data.topic = topic
        req_data.payload = payload
        req_data.on_reply = on_reply
        req_data.on_reply_ctx = on_reply_ctx
        req_data.timeout_ms = timeout_ms
        return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data)

    def __reg_broadcast(
        self, topic: str, handler: Callable[[str, str, any], None],
        handler_ctx: any
    ) -> bool:
        return self._mips_send_cmd(
            type_=MipsCmdType.REG_BROADCAST,
            data=MipsRegBroadcast(
                topic=topic, handler=handler, handler_ctx=handler_ctx))

    def __unreg_broadcast(self, topic) -> bool:
        return self._mips_send_cmd(
            type_=MipsCmdType.UNREG_BROADCAST,
            data=MipsRegBroadcast(topic=topic))

    @final
    async def __request_async(
        self, topic: str, payload: str, timeout_ms: int = 10000
    ) -> dict:
        fut_handler: asyncio.Future = self.main_loop.create_future()

        def on_msg_reply(payload: str, ctx: any):
            fut: asyncio.Future = ctx
            if fut:
                self.main_loop.call_soon_threadsafe(fut.set_result, payload)
        if not self.__request(
                topic=topic,
                payload=payload,
                on_reply=on_msg_reply,
                on_reply_ctx=fut_handler,
                timeout_ms=timeout_ms):
            # Request error
            fut_handler.set_result('internal request error')

        result = await fut_handler
        try:
            return json.loads(result)
        except json.JSONDecodeError:
            return {
                'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value,
                'message': f'Error: {result}'}

    async def __get_prop_timer_handle(self) -> None:
        for did in list(self._get_prop_queue.keys()):
            item = self._get_prop_queue[did].pop()
            _LOGGER.debug('get prop, %s, %s', did, item)
            result_obj = await self.__request_async(
                topic='proxy/get',
                payload=item['param'],
                timeout_ms=item['timeout_ms'])
            if result_obj is None or 'value' not in result_obj:
                item['fut'].set_result(None)
            else:
                item['fut'].set_result(result_obj['value'])

            if not self._get_prop_queue[did]:
                self._get_prop_queue.pop(did, None)

        if self._get_prop_queue:
            self._get_prop_timer = self.main_loop.call_later(
                0.1, lambda: self.main_loop.create_task(
                    self.__get_prop_timer_handle()))
        else:
            self._get_prop_timer = None