# -*- coding: utf-8 -*-
"""
Copyright (C) 2024 Xiaomi Corporation.

The ownership and intellectual property rights of Xiaomi Home Assistant
Integration and related Xiaomi cloud service API interface provided under this
license, including source code and object code (collectively, "Licensed Work"),
are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
hereby grants you a personal, limited, non-exclusive, non-transferable,
non-sublicensable, and royalty-free license to reproduce, use, modify, and
distribute the Licensed Work only for your use of Home Assistant for
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
you to use the Licensed Work for any other purpose, including but not limited
to use Licensed Work to develop applications (APP), Web services, and other
forms of software.

You may reproduce and distribute copies of the Licensed Work, with or without
modifications, whether in source or object form, provided that you must give
any other recipients of the Licensed Work a copy of this License and retain all
copyright and disclaimers.

Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied, including, without
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
for any direct, indirect, special, incidental, or consequential damages or
losses arising from the use or inability to use the Licensed Work.

Xiaomi reserves all rights not expressly granted to you in this License.
Except for the rights expressly granted by Xiaomi under this License, Xiaomi
does not authorize you in any form to use the trademarks, copyrights, or other
forms of intellectual property rights of Xiaomi and its affiliates, including,
without limitation, without obtaining other written permission from Xiaomi, you
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
may make the public associate with Xiaomi in any form to publicize or promote
the software or hardware devices that use the Licensed Work.

Xiaomi has the right to immediately terminate all your authorization under this
License in the event:
1. You assert patent invalidation, litigation, or other claims against patents
or other intellectual property rights of Xiaomi or its affiliates; or,
2. You make, have made, manufacture, sell, or offer to sell products that knock
off Xiaomi or its affiliates' products.

MIoT lan device control, only support MIoT SPEC-v2 WiFi devices.
"""


import json
import time
import asyncio
from dataclasses import dataclass
from enum import Enum, auto
import logging
import os
import queue
import random
import secrets
import socket
import struct
import threading
from typing import Callable, Optional, final
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes

# pylint: disable=relative-beyond-top-level
from .miot_error import MIoTErrorCode
from .miot_ev import MIoTEventLoop, TimeoutHandle
from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo
from .miot_mdns import MipsService, MipsServiceState
from .common import randomize_int, MIoTMatcher


_LOGGER = logging.getLogger(__name__)


class MIoTLanCmdType(Enum):
    """MIoT lan command."""
    DEINIT = 0
    CALL_API = auto()
    SUB_DEVICE_STATE = auto()
    UNSUB_DEVICE_STATE = auto()
    REG_BROADCAST = auto()
    UNREG_BROADCAST = auto()
    GET_DEV_LIST = auto()
    DEVICE_UPDATE = auto()
    DEVICE_DELETE = auto()
    NET_INFO_UPDATE = auto()
    NET_IFS_UPDATE = auto()
    OPTIONS_UPDATE = auto()


@dataclass
class MIoTLanCmd:
    """MIoT lan command."""
    type_: MIoTLanCmdType
    data: any


@dataclass
class MIoTLanCmdData:
    handler: Callable[[dict, any], None]
    handler_ctx: any
    timeout_ms: int


@dataclass
class MIoTLanGetDevListData(MIoTLanCmdData):
    ...


@dataclass
class MIoTLanCallApiData(MIoTLanCmdData):
    did: str
    msg: dict


class MIoTLanSendBroadcastData(MIoTLanCallApiData):
    ...


@dataclass
class MIoTLanUnregisterBroadcastData:
    key: str


@dataclass
class MIoTLanRegisterBroadcastData:
    key: str
    handler: Callable[[dict, any], None]
    handler_ctx: any


@dataclass
class MIoTLanUnsubDeviceState:
    key: str


@dataclass
class MIoTLanSubDeviceState:
    key: str
    handler: Callable[[str, dict, any], None]
    handler_ctx: any


@dataclass
class MIoTLanNetworkUpdateData:
    status: InterfaceStatus
    if_name: str


@dataclass
class MIoTLanRequestData:
    msg_id: int
    handler: Callable[[dict, any], None]
    handler_ctx: any
    timeout: TimeoutHandle


class MIoTLanDeviceState(Enum):
    FRESH = 0
    PING1 = auto()
    PING2 = auto()
    PING3 = auto()
    DEAD = auto()


class MIoTLanDevice:
    """MIoT lan device."""
    # pylint: disable=unused-argument
    OT_HEADER: int = 0x2131
    OT_HEADER_LEN: int = 32
    NETWORK_UNSTABLE_CNT_TH: int = 10
    NETWORK_UNSTABLE_TIME_TH: int = 120000
    NETWORK_UNSTABLE_RESUME_TH: int = 300
    FAST_PING_INTERVAL: int = 5000
    CONSTRUCT_STATE_PENDING: int = 15000
    KA_INTERVAL_MIN = 10000
    KA_INTERVAL_MAX = 50000

    did: str
    token: bytes
    cipher: Cipher
    ip: Optional[str]

    offset: int
    subscribed: bool
    sub_ts: int
    supported_wildcard_sub: bool

    _manager: any
    _if_name: Optional[str]
    _sub_locked: bool
    _state: MIoTLanDeviceState
    _online: bool
    _online_offline_history: list[dict[str, any]]
    _online_offline_timer: Optional[TimeoutHandle]

    _ka_timer: TimeoutHandle
    _ka_internal: int

    def __init__(
            self, manager: any,  did: str, token: str, ip: Optional[str] = None
    ) -> None:
        self._manager: MIoTLan = manager
        self.did = did
        self.token = bytes.fromhex(token)
        aes_key: bytes = self.__md5(self.token)
        aex_iv: bytes = self.__md5(aes_key + self.token)
        self.cipher = Cipher(
            algorithms.AES128(aes_key), modes.CBC(aex_iv), default_backend())
        self.ip = ip
        self.offset = 0
        self.subscribed = False
        self.sub_ts = 0
        self.supported_wildcard_sub = False
        self._if_name = None
        self._sub_locked = False
        self._state = MIoTLanDeviceState.DEAD
        self._online = False
        self._online_offline_history = []
        self._online_offline_timer = None

        def ka_init_handler(ctx: any) -> None:
            self._ka_internal = self.KA_INTERVAL_MIN
            self.__update_keep_alive(state=MIoTLanDeviceState.DEAD)
        self._ka_timer = self._manager.mev.set_timeout(
            randomize_int(self.CONSTRUCT_STATE_PENDING, 0.5),
            ka_init_handler, None)
        _LOGGER.debug('miot lan device add, %s', self.did)

    def keep_alive(self, ip: str, if_name: str) -> None:
        self.ip = ip
        if self._if_name != if_name:
            self._if_name = if_name
            _LOGGER.info(
                'device if_name change, %s, %s', self._if_name, self.did)
        self.__update_keep_alive(state=MIoTLanDeviceState.FRESH)

    @property
    def online(self) -> bool:
        return self._online

    @online.setter
    def online(self, online: bool) -> None:
        if self._online == online:
            return
        self._online = online
        self._manager.broadcast_device_state(
            did=self.did, state={
                'online': self._online, 'push_available': self.subscribed})

    @property
    def if_name(self) -> Optional[str]:
        return self._if_name

    def gen_packet(
        self, out_buffer: bytearray, clear_data: dict, did: str, offset: int
    ) -> int:
        clear_bytes = json.dumps(clear_data).encode('utf-8')
        padder = padding.PKCS7(algorithms.AES128.block_size).padder()
        padded_data = padder.update(clear_bytes) + padder.finalize()
        if len(padded_data) + self.OT_HEADER_LEN > len(out_buffer):
            raise ValueError('rpc too long')
        encryptor = self.cipher.encryptor()
        encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
        data_len: int = len(encrypted_data)+self.OT_HEADER_LEN
        out_buffer[:32] = struct.pack(
            '>HHQI16s', self.OT_HEADER, data_len, int(did), offset,
            self.token)
        out_buffer[32:data_len] = encrypted_data
        msg_md5: bytes = self.__md5(out_buffer[0:data_len])
        out_buffer[16:32] = msg_md5
        return data_len

    def decrypt_packet(self, encrypted_data: bytearray) -> dict:
        data_len: int = struct.unpack('>H', encrypted_data[2:4])[0]
        md5_orig: bytes = encrypted_data[16:32]
        encrypted_data[16:32] = self.token
        md5_calc: bytes = self.__md5(encrypted_data[0:data_len])
        if md5_orig != md5_calc:
            raise ValueError(f'invalid md5, {md5_orig}, {md5_calc}')
        decryptor = self.cipher.decryptor()
        decrypted_padded_data = decryptor.update(
            encrypted_data[32:data_len]) + decryptor.finalize()
        unpadder = padding.PKCS7(algorithms.AES128.block_size).unpadder()
        decrypted_data = unpadder.update(
            decrypted_padded_data) + unpadder.finalize()
        # Some device will add a redundant \0 at the end of JSON string
        decrypted_data = decrypted_data.rstrip(b'\x00')
        return json.loads(decrypted_data)

    def subscribe(self) -> None:
        if self._sub_locked:
            return
        self._sub_locked = True
        try:
            sub_ts: int = int(time.time())
            self._manager.send2device(
                did=self.did,
                msg={
                    'method': 'miIO.sub',
                    'params': {
                        'version': '2.0',
                        'did': self._manager.virtual_did,
                        'update_ts': sub_ts,
                        'sub_method': '.'
                    }
                },
                handler=self.__subscribe_handler,
                handler_ctx=sub_ts,
                timeout_ms=5000)
        except Exception as err:  # pylint: disable=broad-exception-caught
            _LOGGER.error('subscribe device error, %s', err)

        self._sub_locked = False

    def unsubscribe(self) -> None:
        if not self.subscribed:
            return
        self._manager.send2device(
            did=self.did,
            msg={
                'method': 'miIO.unsub',
                'params': {
                    'version': '2.0',
                    'did': self._manager.virtual_did,
                    'update_ts': self.sub_ts or 0,
                    'sub_method': '.'
                }
            },
            handler=self.__unsubscribe_handler,
            timeout_ms=5000)
        self.subscribed = False
        self._manager.broadcast_device_state(
            did=self.did, state={
                'online': self._online, 'push_available': self.subscribed})

    def on_delete(self) -> None:
        if self._ka_timer:
            self._manager.mev.clear_timeout(self._ka_timer)
        if self._online_offline_timer:
            self._manager.mev.clear_timeout(self._online_offline_timer)
        self._manager = None
        self.cipher = None
        _LOGGER.debug('miot lan device delete, %s', self.did)

    def update_info(self, info: dict) -> None:
        if (
            'token' in info
            and len(info['token']) == 32
            and info['token'].upper() != self.token.hex().upper()
        ):
            # Update token
            self.token = bytes.fromhex(info['token'])
            aes_key: bytes = self.__md5(self.token)
            aex_iv: bytes = self.__md5(aes_key + self.token)
            self.cipher = Cipher(
                algorithms.AES128(aes_key),
                modes.CBC(aex_iv), default_backend())
            _LOGGER.debug('update token, %s', self.did)

    def __subscribe_handler(self, msg: dict, sub_ts: int) -> None:
        if (
            'result' not in msg
            or 'code' not in msg['result']
            or msg['result']['code'] != 0
        ):
            _LOGGER.error('subscribe device error, %s, %s', self.did, msg)
            return
        self.subscribed = True
        self.sub_ts = sub_ts
        self._manager.broadcast_device_state(
            did=self.did, state={
                'online': self._online, 'push_available': self.subscribed})
        _LOGGER.info('subscribe success, %s, %s', self._if_name, self.did)

    def __unsubscribe_handler(self, msg: dict, ctx: any) -> None:
        if (
            'result' not in msg
            or 'code' not in msg['result']
            or msg['result']['code'] != 0
        ):
            _LOGGER.error('unsubscribe device error, %s, %s', self.did, msg)
            return
        _LOGGER.info('unsubscribe success, %s, %s', self._if_name, self.did)

    def __update_keep_alive(self, state: MIoTLanDeviceState) -> None:
        last_state: MIoTLanDeviceState = self._state
        self._state = state
        if self._state != MIoTLanDeviceState.FRESH:
            _LOGGER.debug('device status, %s, %s', self.did, self._state)
        if self._ka_timer:
            self._manager.mev.clear_timeout(self._ka_timer)
            self._ka_timer = None
        match state:
            case MIoTLanDeviceState.FRESH:
                if last_state == MIoTLanDeviceState.DEAD:
                    self._ka_internal = self.KA_INTERVAL_MIN
                    self.__change_online(True)
                self._ka_timer = self._manager.mev.set_timeout(
                    self.__get_next_ka_timeout(), self.__update_keep_alive,
                    MIoTLanDeviceState.PING1)
            case (
                    MIoTLanDeviceState.PING1
                    | MIoTLanDeviceState.PING2
                    | MIoTLanDeviceState.PING3
            ):
                self._manager.ping(if_name=self._if_name, target_ip=self.ip)
                # Fast ping
                self._ka_timer = self._manager.mev.set_timeout(
                    self.FAST_PING_INTERVAL, self.__update_keep_alive,
                    MIoTLanDeviceState(state.value+1))
            case MIoTLanDeviceState.DEAD:
                if last_state == MIoTLanDeviceState.PING3:
                    self._ka_internal = self.KA_INTERVAL_MIN
                    self.__change_online(False)
            case _:
                _LOGGER.error('invalid state, %s', state)

    def __get_next_ka_timeout(self) -> int:
        self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX)
        return randomize_int(self._ka_internal, 0.1)

    def __change_online(self, online: bool) -> None:
        _LOGGER.info('change online, %s, %s', self.did, online)
        ts_now: int = int(time.time())
        self._online_offline_history.append({'ts': ts_now, 'online': online})
        if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH:
            self._online_offline_history.pop(0)
        if self._online_offline_timer:
            self._manager.mev.clear_timeout(self._online_offline_timer)
        if not online:
            self.online = False
        else:
            if (
                len(self._online_offline_history) < self.NETWORK_UNSTABLE_CNT_TH
                or (
                    ts_now - self._online_offline_history[0]['ts'] >
                    self.NETWORK_UNSTABLE_TIME_TH)
            ):
                self.online = True
            else:
                _LOGGER.info('unstable device detected, %s', self.did)
                self._online_offline_timer = self._manager.mev.set_timeout(
                    self.NETWORK_UNSTABLE_RESUME_TH,
                    self.__online_resume_handler, None)

    def __online_resume_handler(self, ctx: any) -> None:
        _LOGGER.info('unstable resume threshold past, %s', self.did)
        self.online = True

    def __md5(self, data: bytes) -> bytes:
        hasher = hashes.Hash(hashes.MD5(), default_backend())
        hasher.update(data)
        return hasher.finalize()


class MIoTLan:
    """MIoT lan device control."""
    # pylint: disable=unused-argument
    # pylint: disable=inconsistent-quotes
    OT_HEADER: bytes = b'\x21\x31'
    OT_PORT: int = 54321
    OT_PROBE_LEN: int = 32
    OT_MSG_LEN: int = 1400
    OT_SUPPORT_WILDCARD_SUB: int = 0xFE

    OT_PROBE_INTERVAL_MIN: int = 5000
    OT_PROBE_INTERVAL_MAX: int = 45000

    _main_loop: asyncio.AbstractEventLoop
    _net_ifs: set[str]
    _network: MIoTNetwork
    _mips_service: MipsService
    _enable_subscribe: bool
    _lan_devices: dict[str, MIoTLanDevice]
    _virtual_did: str
    _probe_msg: bytes
    _write_buffer: bytearray
    _read_buffer: bytearray

    _mev: MIoTEventLoop
    _thread: threading.Thread
    _queue: queue.Queue
    _cmd_event_fd: os.eventfd

    _available_net_ifs: set[str]
    _broadcast_socks: dict[str, socket.socket]
    _local_port: Optional[int]
    _scan_timer: TimeoutHandle
    _last_scan_interval: Optional[int]
    _msg_id_counter: int
    _pending_requests: dict[int, MIoTLanRequestData]
    _device_msg_matcher: MIoTMatcher
    _device_state_sub_map: dict[str, MIoTLanSubDeviceState]
    _reply_msg_buffer: dict[str, TimeoutHandle]

    _lan_state_sub_map: dict[str, Callable[[bool], asyncio.Future]]
    _lan_ctrl_vote_map: dict[str, bool]

    _init_done: bool

    def __init__(
            self,
            net_ifs: list[str],
            network: MIoTNetwork,
            mips_service: MipsService,
            enable_subscribe: bool = False,
            virtual_did: Optional[int] = None,
            loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        if not network:
            raise ValueError('network is required')
        if not mips_service:
            raise ValueError('mips_service is required')
        self._main_loop = loop or asyncio.get_event_loop()
        self._net_ifs = set(net_ifs)
        self._network = network
        self._network.sub_network_info(
            key='miot_lan', handler=self.__on_network_info_change)
        self._mips_service = mips_service
        self._mips_service.sub_service_change(
            key='miot_lan', group_id='*',
            handler=self.__on_mips_service_change)
        self._enable_subscribe = enable_subscribe
        self._virtual_did = virtual_did or str(secrets.randbits(64))
        # Init socket probe message
        probe_bytes = bytearray(self.OT_PROBE_LEN)
        probe_bytes[:20] = (
            b'!1\x00\x20\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFFMDID')
        probe_bytes[20:28] = struct.pack('>Q', int(self._virtual_did))
        probe_bytes[28:32] = b'\x00\x00\x00\x00'
        self._probe_msg = bytes(probe_bytes)
        self._read_buffer = bytearray(self.OT_MSG_LEN)
        self._write_buffer = bytearray(self.OT_MSG_LEN)

        self._lan_devices = {}
        self._available_net_ifs = set()
        self._broadcast_socks = {}
        self._local_port = None
        self._scan_timer = None
        self._last_scan_interval = None
        self._msg_id_counter = int(random.random()*0x7FFFFFFF)
        self._pending_requests = {}
        self._device_msg_matcher = MIoTMatcher()
        self._device_state_sub_map = {}
        self._reply_msg_buffer = {}

        self._lan_state_sub_map = {}
        self._lan_ctrl_vote_map = {}

        self._init_done = False

        if (
            len(self._mips_service.get_services()) == 0
            and len(self._net_ifs) > 0
        ):
            _LOGGER.info('no central hub gateway service, init miot lan')
            self._main_loop.call_later(
                0, lambda: self._main_loop.create_task(
                    self.init_async()))

    @ property
    def virtual_did(self) -> str:
        return self._virtual_did

    @ property
    def mev(self) -> MIoTEventLoop:
        return self._mev

    @property
    def init_done(self) -> bool:
        return self._init_done

    async def init_async(self) -> None:
        if self._init_done:
            _LOGGER.info('miot lan already init')
            return
        if len(self._net_ifs) == 0:
            _LOGGER.info('no net_ifs')
            return
        if not any(self._lan_ctrl_vote_map.values()):
            _LOGGER.info('no vote for lan ctrl')
            return
        if len(self._mips_service.get_services()) > 0:
            _LOGGER.info('central hub gateway service exist')
            return
        for if_name in list(self._network.network_info.keys()):
            self._available_net_ifs.add(if_name)
        if len(self._available_net_ifs) == 0:
            _LOGGER.info('no available net_ifs')
            return
        if self._net_ifs.isdisjoint(self._available_net_ifs):
            _LOGGER.info('no valid net_ifs')
            return
        self._mev = MIoTEventLoop()
        self._queue = queue.Queue()
        self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK)
        self._mev.set_read_handler(
            self._cmd_event_fd, self.__cmd_read_handler, None)
        self._thread = threading.Thread(target=self.__lan_thread_handler)
        self._thread.name = 'miot_lan'
        self._thread.daemon = True
        self._thread.start()
        self._init_done = True
        for handler in list(self._lan_state_sub_map.values()):
            self._main_loop.create_task(handler(True))
        _LOGGER.info(
            'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs)

    async def deinit_async(self) -> None:
        if not self._init_done:
            _LOGGER.info('miot lan not init')
            return
        self._init_done = False
        self.__lan_send_cmd(MIoTLanCmdType.DEINIT, None)
        self._thread.join()

        self._lan_devices = {}
        self._broadcast_socks = {}
        self._local_port = None
        self._scan_timer = None
        self._last_scan_interval = None
        self._msg_id_counter = int(random.random()*0x7FFFFFFF)
        self._pending_requests = {}
        self._device_msg_matcher = MIoTMatcher()
        self._device_state_sub_map = {}
        self._reply_msg_buffer = {}
        for handler in list(self._lan_state_sub_map.values()):
            self._main_loop.create_task(handler(False))
        _LOGGER.info('miot lan deinit')

    async def update_net_ifs_async(self, net_ifs: list[str]) -> None:
        _LOGGER.info('update net_ifs, %s', net_ifs)
        if not isinstance(net_ifs, list):
            _LOGGER.error('invalid net_ifs, %s', net_ifs)
            return
        if len(net_ifs) == 0:
            # Deinit lan
            await self.deinit_async()
            self._net_ifs = set(net_ifs)
            return
        available_net_ifs = set()
        for if_name in list(self._network.network_info.keys()):
            available_net_ifs.add(if_name)
        if set(net_ifs).isdisjoint(available_net_ifs):
            _LOGGER.error('no valid net_ifs, %s', net_ifs)
            await self.deinit_async()
            self._net_ifs = set(net_ifs)
            self._available_net_ifs = available_net_ifs
            return
        if not self._init_done:
            self._net_ifs = set(net_ifs)
            await self.init_async()
            return
        self.__lan_send_cmd(
            cmd=MIoTLanCmdType.NET_IFS_UPDATE,
            data=net_ifs)

    async def vote_for_lan_ctrl_async(self, key: str, vote: bool) -> None:
        _LOGGER.info('vote for lan ctrl, %s, %s', key, vote)
        self._lan_ctrl_vote_map[key] = vote
        if not any(self._lan_ctrl_vote_map.values()):
            await self.deinit_async()
            return
        await self.init_async()

    async def update_subscribe_option(self, enable_subscribe: bool) -> None:
        _LOGGER.info('update subscribe option, %s', enable_subscribe)
        if not self._init_done:
            self._enable_subscribe = enable_subscribe
            return
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.OPTIONS_UPDATE,
            data={
                'enable_subscribe': enable_subscribe, })

    def update_devices(self, devices: dict[str, dict]) -> bool:
        _LOGGER.info('update devices, %s', devices)
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.DEVICE_UPDATE,
            data=devices)

    def delete_devices(self, devices: list[str]) -> bool:
        _LOGGER.info('delete devices, %s', devices)
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.DEVICE_DELETE,
            data=devices)

    def sub_lan_state(
        self, key: str, handler: Callable[[bool], asyncio.Future]
    ) -> None:
        self._lan_state_sub_map[key] = handler

    def unsub_lan_state(self, key: str) -> None:
        self._lan_state_sub_map.pop(key, None)

    @final
    def sub_device_state(
        self, key: str, handler: Callable[[str, dict, any], None],
        handler_ctx: any = None
    ) -> bool:
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.SUB_DEVICE_STATE,
            data=MIoTLanSubDeviceState(
                key=key, handler=handler, handler_ctx=handler_ctx))

    @final
    def unsub_device_state(self, key: str) -> bool:
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.UNSUB_DEVICE_STATE,
            data=MIoTLanUnsubDeviceState(key=key))

    @final
    def sub_prop(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, piid: int = None, handler_ctx: any = None
    ) -> bool:
        if not self._enable_subscribe:
            return False
        key = (
            f'{did}/p/'
            f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.REG_BROADCAST,
            data=MIoTLanRegisterBroadcastData(
                key=key, handler=handler, handler_ctx=handler_ctx))

    @final
    def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool:
        if not self._enable_subscribe:
            return False
        key = (
            f'{did}/p/'
            f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.UNREG_BROADCAST,
            data=MIoTLanUnregisterBroadcastData(key=key))

    @final
    def sub_event(
        self, did: str, handler: Callable[[dict, any], None],
        siid: int = None, eiid: int = None, handler_ctx: any = None
    ) -> bool:
        if not self._enable_subscribe:
            return False
        key = (
            f'{did}/e/'
            f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.REG_BROADCAST,
            data=MIoTLanRegisterBroadcastData(
                key=key, handler=handler, handler_ctx=handler_ctx))

    @final
    def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool:
        if not self._enable_subscribe:
            return False
        key = (
            f'{did}/e/'
            f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
        return self.__lan_send_cmd(
            cmd=MIoTLanCmdType.UNREG_BROADCAST,
            data=MIoTLanUnregisterBroadcastData(key=key))

    @final
    async def get_prop_async(
        self, did: str, siid: int, piid: int, timeout_ms: int = 10000
    ) -> any:
        result_obj = await self.__call_api_async(
            did=did, msg={
                'method': 'get_properties',
                'params': [{'did': did, 'siid': siid, 'piid': piid}]
            }, timeout_ms=timeout_ms)

        if (
            result_obj and 'result' in result_obj
            and len(result_obj['result']) == 1
            and 'did' in result_obj['result'][0]
            and result_obj['result'][0]['did'] == did
        ):
            return result_obj['result'][0].get('value', None)
        return None

    @final
    async def set_prop_async(
        self, did: str, siid: int, piid: int, value: any,
        timeout_ms: int = 10000
    ) -> dict:
        result_obj = await self.__call_api_async(
            did=did, msg={
                'method': 'set_properties',
                'params': [{
                    'did': did, 'siid': siid, 'piid': piid, 'value': value}]
            }, timeout_ms=timeout_ms)
        if result_obj:
            if (
                'result' in result_obj
                and len(result_obj['result']) == 1
                and 'did' in result_obj['result'][0]
                and result_obj['result'][0]['did'] == did
                and 'code' in result_obj['result'][0]
            ):
                return result_obj['result'][0]
            if 'code' in result_obj:
                return result_obj
        return {
            'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
            'message': 'Invalid result'}

    @final
    async def action_async(
        self, did: str, siid: int, aiid: int, in_list: list,
        timeout_ms: int = 10000
    ) -> dict:
        result_obj = await self.__call_api_async(
            did=did, msg={
                'method': 'action',
                'params': {
                    'did': did, 'siid': siid, 'aiid': aiid, 'in': in_list}
            }, timeout_ms=timeout_ms)
        if result_obj:
            if 'result' in result_obj and 'code' in result_obj['result']:
                return result_obj['result']
            if 'code' in result_obj:
                return result_obj
        return {
            'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
            'message': 'Invalid result'}

    @final
    async def get_dev_list_async(
        self, timeout_ms: int = 10000
    ) -> dict[str, dict]:
        if not self._init_done:
            return {}

        def get_device_list_handler(msg: dict, fut: asyncio.Future):
            self._main_loop.call_soon_threadsafe(
                fut.set_result, msg)

        fut: asyncio.Future = self._main_loop.create_future()
        if self.__lan_send_cmd(
            MIoTLanCmdType.GET_DEV_LIST,
            MIoTLanGetDevListData(
                handler=get_device_list_handler,
                handler_ctx=fut,
                timeout_ms=timeout_ms)):
            return await fut
        _LOGGER.error('get_dev_list_async error, send cmd failed')
        fut.set_result({})
        return await fut

    def ping(self, if_name: str, target_ip: str) -> None:
        if not target_ip:
            return
        self.__sendto(
            if_name=if_name, data=self._probe_msg, address=target_ip,
            port=self.OT_PORT)

    def send2device(
        self, did: str,
        msg: dict,
        handler: Optional[Callable[[dict, any], None]] = None,
        handler_ctx: any = None,
        timeout_ms: Optional[int] = None
    ) -> None:
        if timeout_ms and not handler:
            raise ValueError('handler is required when timeout_ms is set')
        device: MIoTLanDevice = self._lan_devices.get(did)
        if not device:
            raise ValueError('invalid device')
        if not device.cipher:
            raise ValueError('invalid device cipher')
        if not device.if_name:
            raise ValueError('invalid device if_name')
        if not device.ip:
            raise ValueError('invalid device ip')
        in_msg = {'id': self.__gen_msg_id(), **msg}
        msg_len = device.gen_packet(
            out_buffer=self._write_buffer,
            clear_data=in_msg,
            did=did,
            offset=int(time.time())-device.offset)

        return self.make_request(
            msg_id=in_msg['id'],
            msg=self._write_buffer[0: msg_len],
            if_name=device.if_name,
            ip=device.ip,
            handler=handler,
            handler_ctx=handler_ctx,
            timeout_ms=timeout_ms)

    def make_request(
        self,
        msg_id: int,
        msg: bytearray,
        if_name: str,
        ip: str,
        handler: Callable[[dict, any], None],
        handler_ctx: any = None,
        timeout_ms: Optional[int] = None
    ) -> None:
        def request_timeout_handler(req_data: MIoTLanRequestData):
            self._pending_requests.pop(req_data.msg_id, None)
            if req_data:
                req_data.handler({
                    'code': MIoTErrorCode.CODE_TIMEOUT.value,
                    'error': 'timeout'},
                    req_data.handler_ctx)

        timer: Optional[TimeoutHandle] = None
        request_data = MIoTLanRequestData(
            msg_id=msg_id,
            handler=handler,
            handler_ctx=handler_ctx,
            timeout=timer)
        if timeout_ms:
            timer = self._mev.set_timeout(
                timeout_ms, request_timeout_handler, request_data)
            request_data.timeout = timer
        self._pending_requests[msg_id] = request_data
        self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT)

    def broadcast_device_state(self, did: str, state: dict) -> None:
        for handler in self._device_state_sub_map.values():
            self._main_loop.call_soon_threadsafe(
                self._main_loop.create_task,
                handler.handler(did, state, handler.handler_ctx))

    def __gen_msg_id(self) -> int:
        if not self._msg_id_counter:
            self._msg_id_counter = int(random.random()*0x7FFFFFFF)
        self._msg_id_counter += 1
        if self._msg_id_counter > 0x80000000:
            self._msg_id_counter = 1
        return self._msg_id_counter

    def __lan_send_cmd(self, cmd: MIoTLanCmd, data: any) -> bool:
        try:
            self._queue.put(MIoTLanCmd(type_=cmd, data=data))
            os.eventfd_write(self._cmd_event_fd, 1)
            return True
        except Exception as err:  # pylint: disable=broad-exception-caught
            _LOGGER.error('send cmd error, %s, %s', cmd, err)
        return False

    async def __call_api_async(
        self, did: str, msg: dict, timeout_ms: int = 10000
    ) -> dict:
        def call_api_handler(msg: dict, fut: asyncio.Future):
            self._main_loop.call_soon_threadsafe(
                fut.set_result, msg)

        fut: asyncio.Future = self._main_loop.create_future()
        if self.__lan_send_cmd(
                cmd=MIoTLanCmdType.CALL_API,
                data=MIoTLanCallApiData(
                    did=did,
                    msg=msg,
                    handler=call_api_handler,
                    handler_ctx=fut,
                    timeout_ms=timeout_ms)):
            return await fut

        fut.set_result({
            'code': MIoTErrorCode.CODE_UNAVAILABLE.value,
            'error': 'send cmd error'})
        return await fut

    def __lan_thread_handler(self) -> None:
        _LOGGER.info('miot lan thread start')
        self.__init_socket()
        # Create scan devices timer
        self._scan_timer = self._mev.set_timeout(
            int(3000*random.random()), self.__scan_devices, None)
        self._mev.loop_forever()
        _LOGGER.info('miot lan thread exit')

    def __cmd_read_handler(self, ctx: any) -> None:
        fd_value = os.eventfd_read(self._cmd_event_fd)
        if fd_value == 0:
            return
        while not self._queue.empty():
            mips_cmd: MIoTLanCmd = self._queue.get(block=False)
            if mips_cmd.type_ == MIoTLanCmdType.CALL_API:
                call_api_data: MIoTLanCallApiData = mips_cmd.data
                try:
                    self.send2device(
                        did=call_api_data.did,
                        msg={'from': 'ha.xiaomi_home', **call_api_data.msg},
                        handler=call_api_data.handler,
                        handler_ctx=call_api_data.handler_ctx,
                        timeout_ms=call_api_data.timeout_ms)
                except Exception as err:  # pylint: disable=broad-exception-caught
                    _LOGGER.error('send2device error, %s', err)
                    call_api_data.handler({
                        'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
                        'error': str(err)},
                        call_api_data.handler_ctx)
            elif mips_cmd.type_ == MIoTLanCmdType.SUB_DEVICE_STATE:
                sub_data: MIoTLanSubDeviceState = mips_cmd.data
                self._device_state_sub_map[sub_data.key] = sub_data
            elif mips_cmd.type_ == MIoTLanCmdType.UNSUB_DEVICE_STATE:
                sub_data: MIoTLanUnsubDeviceState = mips_cmd.data
                self._device_state_sub_map.pop(sub_data.key, None)
            elif mips_cmd.type_ == MIoTLanCmdType.REG_BROADCAST:
                reg_data: MIoTLanRegisterBroadcastData = mips_cmd.data
                self._device_msg_matcher[reg_data.key] = reg_data
                _LOGGER.debug('lan register broadcast, %s', reg_data.key)
            elif mips_cmd.type_ == MIoTLanCmdType.UNREG_BROADCAST:
                unreg_data: MIoTLanUnregisterBroadcastData = mips_cmd.data
                if self._device_msg_matcher.get(topic=unreg_data.key):
                    del self._device_msg_matcher[unreg_data.key]
                _LOGGER.debug('lan unregister broadcast, %s', unreg_data.key)
            elif mips_cmd.type_ == MIoTLanCmdType.GET_DEV_LIST:
                get_dev_list_data: MIoTLanGetDevListData = mips_cmd.data
                dev_list = {
                    device.did: {
                        'online': device.online,
                        'push_available': device.subscribed
                    }
                    for device in self._lan_devices.values()
                    if device.online}
                get_dev_list_data.handler(
                    dev_list, get_dev_list_data.handler_ctx)
            elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_UPDATE:
                devices: dict[str, dict] = mips_cmd.data
                for did, info in devices.items():
                    if did not in self._lan_devices:
                        if 'token' not in info:
                            _LOGGER.error(
                                'token not found, %s, %s', did, info)
                            continue
                        if len(info['token']) != 32:
                            _LOGGER.error(
                                'invalid device token, %s, %s', did, info)
                            continue
                        self._lan_devices[did] = MIoTLanDevice(
                            manager=self, did=did, token=info['token'],
                            ip=info.get('ip', None))
                    else:
                        self._lan_devices[did].update_info(info)
            elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_DELETE:
                device_dids: list[str] = mips_cmd.data
                for did in device_dids:
                    lan_device = self._lan_devices.pop(did, None)
                    if not lan_device:
                        continue
                    lan_device.on_delete()
            elif mips_cmd.type_ == MIoTLanCmdType.NET_INFO_UPDATE:
                net_data: MIoTLanNetworkUpdateData = mips_cmd.data
                if net_data.status == InterfaceStatus.ADD:
                    self._available_net_ifs.add(net_data.if_name)
                    if net_data.if_name in self._net_ifs:
                        self.__create_socket(if_name=net_data.if_name)
                elif net_data.status == InterfaceStatus.REMOVE:
                    self._available_net_ifs.remove(net_data.if_name)
                    self.__destroy_socket(if_name=net_data.if_name)
            elif mips_cmd.type_ == MIoTLanCmdType.NET_IFS_UPDATE:
                net_ifs: list[str] = mips_cmd.data
                if self._net_ifs != set(net_ifs):
                    self._net_ifs = set(net_ifs)
                    for if_name in self._net_ifs:
                        self.__create_socket(if_name=if_name)
                    for if_name in list(self._broadcast_socks.keys()):
                        if if_name not in self._net_ifs:
                            self.__destroy_socket(if_name=if_name)
            elif mips_cmd.type_ == MIoTLanCmdType.OPTIONS_UPDATE:
                options: dict = mips_cmd.data
                if 'enable_subscribe' in options:
                    if options['enable_subscribe'] != self._enable_subscribe:
                        self._enable_subscribe = options['enable_subscribe']
                        if not self._enable_subscribe:
                            # Unsubscribe all
                            for device in self._lan_devices.values():
                                device.unsubscribe()
            elif mips_cmd.type_ == MIoTLanCmdType.DEINIT:
                # stop the thread
                if self._scan_timer:
                    self._mev.clear_timeout(self._scan_timer)
                    self._scan_timer = None
                for device in self._lan_devices.values():
                    device.on_delete()
                self._lan_devices.clear()
                for req_data in self._pending_requests.values():
                    self._mev.clear_timeout(req_data.timeout)
                self._pending_requests.clear()
                for timer in self._reply_msg_buffer.values():
                    self._mev.clear_timeout(timer)
                self._reply_msg_buffer.clear()
                self._device_msg_matcher = MIoTMatcher()
                self.__deinit_socket()
                self._mev.loop_stop()

    def __init_socket(self) -> None:
        self.__deinit_socket()
        for if_name in self._net_ifs:
            if if_name not in self._available_net_ifs:
                return
            self.__create_socket(if_name=if_name)

    def __create_socket(self, if_name: str) -> None:
        if if_name in self._broadcast_socks:
            _LOGGER.info('socket already created, %s', if_name)
            return
        # Create socket
        try:
            sock = socket.socket(
                socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            # Set SO_BINDTODEVICE
            sock.setsockopt(
                socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode())
            sock.bind(('', self._local_port or 0))
            self._mev.set_read_handler(
                sock.fileno(), self.__socket_read_handler, (if_name, sock))
            self._broadcast_socks[if_name] = sock
            self._local_port = self._local_port or sock.getsockname()[1]
            _LOGGER.info(
                'created socket, %s, %s', if_name, self._local_port)
        except Exception as err:  # pylint: disable=broad-exception-caught
            _LOGGER.error('create socket error, %s, %s', if_name, err)

    def __deinit_socket(self) -> None:
        for if_name in list(self._broadcast_socks.keys()):
            self.__destroy_socket(if_name)
        self._broadcast_socks.clear()

    def __destroy_socket(self, if_name: str) -> None:
        sock = self._broadcast_socks.pop(if_name, None)
        if not sock:
            return
        self._mev.set_read_handler(sock.fileno(), None, None)
        sock.close()
        _LOGGER.info('destroyed socket, %s', if_name)

    def __socket_read_handler(self, ctx: tuple[str, socket.socket]) -> None:
        try:
            data_len, addr = ctx[1].recvfrom_into(
                self._read_buffer, self.OT_MSG_LEN, socket.MSG_DONTWAIT)
            if data_len < 0:
                # Socket error
                _LOGGER.error('socket read error, %s, %s', ctx[0], data_len)
                return
            if addr[1] != self.OT_PORT:
                # Not ot msg
                return
            self.__raw_message_handler(
                self._read_buffer[:data_len], data_len, addr[0], ctx[0])
        except Exception as err:  # pylint: disable=broad-exception-caught
            _LOGGER.error('socket read handler error, %s', err)

    def __raw_message_handler(
        self, data: bytearray, data_len: int, ip: str, if_name: str
    ) -> None:
        if data[:2] != self.OT_HEADER:
            return
        # Keep alive message
        did: str = str(struct.unpack('>Q', data[4:12])[0])
        device: MIoTLanDevice = self._lan_devices.get(did)
        if not device:
            return
        timestamp: int = struct.unpack('>I', data[12:16])[0]
        device.offset = int(time.time()) - timestamp
        # Keep alive if this is a probe
        if data_len == self.OT_PROBE_LEN or device.subscribed:
            device.keep_alive(ip=ip, if_name=if_name)
        # Manage device subscribe status
        if (
            self._enable_subscribe
            and data_len == self.OT_PROBE_LEN
            and data[16:20] == b'MSUB'
            and data[24:27] == b'PUB'
        ):
            device.supported_wildcard_sub = (
                int(data[28]) == self.OT_SUPPORT_WILDCARD_SUB)
            sub_ts = struct.unpack('>I', data[20:24])[0]
            sub_type = int(data[27])
            if (
                device.supported_wildcard_sub
                and sub_type in [0, 1, 4]
                and sub_ts != device.sub_ts
            ):
                device.subscribed = False
                device.subscribe()
        if data_len > self.OT_PROBE_LEN:
            # handle device message
            try:
                decrypted_data = device.decrypt_packet(data)
                self.__message_handler(did, decrypted_data)
            except Exception as err:   # pylint: disable=broad-exception-caught
                _LOGGER.error('decrypt packet error, %s, %s', did, err)
                return

    def __message_handler(self, did: str, msg: dict) -> None:
        if 'id' not in msg:
            _LOGGER.warning('invalid message, no id, %s, %s', did, msg)
            return
        # Reply
        req: MIoTLanRequestData = self._pending_requests.pop(msg['id'], None)
        if req:
            self._mev.clear_timeout(req.timeout)
            self._main_loop.call_soon_threadsafe(
                req.handler, msg, req.handler_ctx)
            return
        # Handle up link message
        if 'method' not in msg or 'params' not in msg:
            _LOGGER.debug(
                'invalid message, no method or params, %s, %s', did, msg)
            return
        # Filter dup message
        if self.__filter_dup_message(did, msg['id']):
            self.send2device(
                did=did, msg={'id': msg['id'], 'result': {'code': 0}})
            return
        _LOGGER.debug('lan message, %s, %s', did, msg)
        if msg['method'] == 'properties_changed':
            for param in msg['params']:
                if 'siid' not in param and 'piid' not in param:
                    _LOGGER.debug(
                        'invalid message, no siid or piid, %s, %s', did, msg)
                    continue
                key = f'{did}/p/{param["siid"]}/{param["piid"]}'
                subs: list[MIoTLanRegisterBroadcastData] = list(
                    self._device_msg_matcher.iter_match(key))
                for sub in subs:
                    self._main_loop.call_soon_threadsafe(
                        sub.handler, param, sub.handler_ctx)
        elif (
                msg['method'] == 'event_occured'
                and 'siid' in msg['params']
                and 'eiid' in msg['params']
        ):
            key = f'{did}/e/{msg["params"]["siid"]}/{msg["params"]["eiid"]}'
            subs: list[MIoTLanRegisterBroadcastData] = list(
                self._device_msg_matcher.iter_match(key))
            for sub in subs:
                self._main_loop.call_soon_threadsafe(
                    sub.handler, msg['params'], sub.handler_ctx)
        else:
            _LOGGER.debug(
                'invalid message, unknown method, %s, %s', did, msg)
        # Reply
        self.send2device(
            did=did, msg={'id': msg['id'], 'result': {'code': 0}})

    def __filter_dup_message(self, did: str, msg_id: int) -> bool:
        filter_id = f'{did}.{msg_id}'
        if filter_id in self._reply_msg_buffer:
            return True
        self._reply_msg_buffer[filter_id] = self._mev.set_timeout(
            5000,
            lambda filter_id: self._reply_msg_buffer.pop(filter_id, None),
            filter_id)

    def __sendto(
        self, if_name: str, data: bytes, address: str, port: int
    ) -> None:
        if address == '255.255.255.255':
            # Broadcast
            for if_n, sock in self._broadcast_socks.items():
                _LOGGER.debug('send broadcast, %s', if_n)
                sock.sendto(data, socket.MSG_DONTWAIT, (address, port))
        else:
            # Unicast
            sock = self._broadcast_socks.get(if_name, None)
            if not sock:
                _LOGGER.error('invalid socket, %s', if_name)
                return
            sock.sendto(data, socket.MSG_DONTWAIT, (address, port))

    def __scan_devices(self, ctx: any) -> None:
        if self._scan_timer:
            self._mev.clear_timeout(self._scan_timer)
        # Scan devices
        self.ping(if_name=None, target_ip='255.255.255.255')
        scan_time = self.__get_next_scan_time()
        self._scan_timer = self._mev.set_timeout(
            scan_time, self.__scan_devices, None)
        _LOGGER.debug('next scan time: %sms', scan_time)

    def __get_next_scan_time(self) -> int:
        if not self._last_scan_interval:
            self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN
        self._last_scan_interval = min(
            self._last_scan_interval*2, self.OT_PROBE_INTERVAL_MAX)
        return self._last_scan_interval

    async def __on_network_info_change(
        self,
        status: InterfaceStatus,
        info: NetworkInfo
    ) -> None:
        _LOGGER.info(
            'on network info change, status: %s, info: %s', status, info)
        available_net_ifs = set()
        for if_name in list(self._network.network_info.keys()):
            available_net_ifs.add(if_name)
        if len(available_net_ifs) == 0:
            await self.deinit_async()
            self._available_net_ifs = available_net_ifs
            return
        if self._net_ifs.isdisjoint(available_net_ifs):
            _LOGGER.info('no valid net_ifs')
            await self.deinit_async()
            self._available_net_ifs = available_net_ifs
            return
        if not self._init_done:
            self._available_net_ifs = available_net_ifs
            await self.init_async()
            return
        self.__lan_send_cmd(
            MIoTLanCmdType.NET_INFO_UPDATE, MIoTLanNetworkUpdateData(
                status=status, if_name=info.name))

    async def __on_mips_service_change(
        self, group_id: str,  state: MipsServiceState, data: dict
    ) -> None:
        _LOGGER.info(
            'on mips service change, %s, %s, %s',  group_id, state, data)
        if len(self._mips_service.get_services()) > 0:
            _LOGGER.info('find central service, deinit miot lan')
            await self.deinit_async()
        else:
            _LOGGER.info('no central service, init miot lan')
            await self.init_async()