fix: fix type error, wrong use of any and Any (#338)

* fix: fix type error, wrong use of any and Any

* fix: wrong use of session close

* fix: fix test_lan type error

* fix: remove __del__

* feat: oauth, http add deinit_async
This commit is contained in:
Paul Shawn 2024-12-22 10:46:58 +08:00 committed by GitHub
parent afef709839
commit c1867e2baf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 237 additions and 173 deletions

View File

@ -47,7 +47,7 @@ Climate entities for Xiaomi Home.
"""
from __future__ import annotations
import logging
from typing import Optional
from typing import Any, Optional
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -415,7 +415,7 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity):
return SWING_OFF
return None
def __ac_state_changed(self, prop: MIoTSpecProperty, value: any) -> None:
def __ac_state_changed(self, prop: MIoTSpecProperty, value: Any) -> None:
del prop
if not isinstance(value, str):
_LOGGER.error(

View File

@ -315,6 +315,9 @@ class XiaomiMihomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
_LOGGER.error('task_oauth exception, %s', error)
self._config_error_reason = str(error)
return self.async_show_progress_done(next_step_id='oauth_error')
if self._miot_oauth:
await self._miot_oauth.deinit_async()
self._miot_oauth = None
return self.async_show_progress_done(next_step_id='homes_select')
return self.async_show_progress(
step_id='oauth',
@ -336,10 +339,16 @@ class XiaomiMihomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
try:
auth_info = await self._miot_oauth.get_access_token_async(
code=oauth_code)
self._miot_http = MIoTHttpClient(
cloud_server=self._cloud_server,
client_id=OAUTH2_CLIENT_ID,
access_token=auth_info['access_token'])
if not self._miot_http:
self._miot_http = MIoTHttpClient(
cloud_server=self._cloud_server,
client_id=OAUTH2_CLIENT_ID,
access_token=auth_info['access_token'])
else:
self._miot_http.update_http_header(
cloud_server=self._cloud_server,
client_id=OAUTH2_CLIENT_ID,
access_token=auth_info['access_token'])
self._auth_info = auth_info
# Gen uuid
self._uuid = hashlib.sha256(
@ -449,6 +458,9 @@ class XiaomiMihomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
# Auth success, unregister oauth webhook
webhook_async_unregister(self.hass, webhook_id=self._virtual_did)
if self._miot_http:
await self._miot_http.deinit_async()
self._miot_http = None
_LOGGER.info(
'__check_oauth_async, webhook.async_unregister: %s',
self._virtual_did)

View File

@ -46,6 +46,7 @@ off Xiaomi or its affiliates' products.
Event entities for Xiaomi Home.
"""
from __future__ import annotations
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -84,6 +85,6 @@ class Event(MIoTEventEntity, EventEntity):
# Set device_class
self._attr_device_class = spec.device_class
def on_event_occurred(self, name: str, arguments: list[dict[int, any]]):
def on_event_occurred(self, name: str, arguments: list[dict[int, Any]]):
"""An event is occurred."""
self._trigger_event(event_type=name, event_attributes=arguments)

View File

@ -93,7 +93,7 @@ class Fan(MIoTServiceEntity, FanEntity):
_speed_min: Optional[int]
_speed_max: Optional[int]
_speed_step: Optional[int]
_mode_list: Optional[dict[any, any]]
_mode_list: Optional[dict[Any, Any]]
def __init__(
self, miot_device: MIoTDevice, entity_data: MIoTEntityData

View File

@ -47,7 +47,7 @@ Humidifier entities for Xiaomi Home.
"""
from __future__ import annotations
import logging
from typing import Optional
from typing import Any, Optional
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -97,7 +97,7 @@ class Humidifier(MIoTServiceEntity, HumidifierEntity):
_prop_target_humidity: Optional[MIoTSpecProperty]
_prop_humidity: Optional[MIoTSpecProperty]
_mode_list: dict[any, any]
_mode_list: dict[Any, Any]
def __init__(
self, miot_device: MIoTDevice, entity_data: MIoTEntityData

View File

@ -47,7 +47,7 @@ Light entities for Xiaomi Home.
"""
from __future__ import annotations
import logging
from typing import Optional
from typing import Any, Optional
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -102,7 +102,7 @@ class Light(MIoTServiceEntity, LightEntity):
_prop_mode: Optional[MIoTSpecProperty]
_brightness_scale: Optional[tuple[int, int]]
_mode_list: Optional[dict[any, any]]
_mode_list: Optional[dict[Any, Any]]
def __init__(
self, miot_device: MIoTDevice, entity_data: MIoTEntityData

View File

@ -48,9 +48,9 @@ Common utilities.
import json
from os import path
import random
from typing import Optional
from typing import Any, Optional
import hashlib
from paho.mqtt.client import MQTTMatcher
from paho.mqtt.matcher import MQTTMatcher
import yaml
MIOT_ROOT_PATH: str = path.dirname(path.abspath(__file__))
@ -87,7 +87,7 @@ def randomize_int(value: int, ratio: float) -> int:
class MIoTMatcher(MQTTMatcher):
"""MIoT Pub/Sub topic matcher."""
def iter_all_nodes(self) -> any:
def iter_all_nodes(self) -> Any:
"""Return an iterator on all nodes with their paths and contents."""
def rec(node, path_):
# pylint: disable=protected-access
@ -97,7 +97,7 @@ class MIoTMatcher(MQTTMatcher):
yield from rec(child, path_ + [part])
return rec(self._root, [])
def get(self, topic: str) -> Optional[any]:
def get(self, topic: str) -> Optional[Any]:
try:
return self[topic]
except KeyError:

View File

@ -1,7 +1,52 @@
# -*- coding: utf-8 -*-
"""MIoT client instance."""
"""
Copyright (C) 2024 Xiaomi Corporation.
The ownership and intellectual property rights of Xiaomi Home Assistant
Integration and related Xiaomi cloud service API interface provided under this
license, including source code and object code (collectively, "Licensed Work"),
are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
hereby grants you a personal, limited, non-exclusive, non-transferable,
non-sublicensable, and royalty-free license to reproduce, use, modify, and
distribute the Licensed Work only for your use of Home Assistant for
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
you to use the Licensed Work for any other purpose, including but not limited
to use Licensed Work to develop applications (APP), Web services, and other
forms of software.
You may reproduce and distribute copies of the Licensed Work, with or without
modifications, whether in source or object form, provided that you must give
any other recipients of the Licensed Work a copy of this License and retain all
copyright and disclaimers.
Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied, including, without
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
for any direct, indirect, special, incidental, or consequential damages or
losses arising from the use or inability to use the Licensed Work.
Xiaomi reserves all rights not expressly granted to you in this License.
Except for the rights expressly granted by Xiaomi under this License, Xiaomi
does not authorize you in any form to use the trademarks, copyrights, or other
forms of intellectual property rights of Xiaomi and its affiliates, including,
without limitation, without obtaining other written permission from Xiaomi, you
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
may make the public associate with Xiaomi in any form to publicize or promote
the software or hardware devices that use the Licensed Work.
Xiaomi has the right to immediately terminate all your authorization under this
License in the event:
1. You assert patent invalidation, litigation, or other claims against patents
or other intellectual property rights of Xiaomi or its affiliates; or,
2. You make, have made, manufacture, sell, or offer to sell products that knock
off Xiaomi or its affiliates' products.
MIoT client instance.
"""
from copy import deepcopy
from typing import Callable, Optional, final
from typing import Any, Callable, Optional, final
import asyncio
import json
import logging
@ -36,9 +81,9 @@ _LOGGER = logging.getLogger(__name__)
@dataclass
class MIoTClientSub:
"""MIoT client subscription."""
topic: str = None
handler: Callable[[dict, any], None] = None
handler_ctx: any = None
topic: Optional[str]
handler: Callable[[dict, Any], None]
handler_ctx: Any = None
def __str__(self) -> str:
return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}'
@ -345,6 +390,8 @@ class MIoTClient:
if self._show_devices_changed_notify_timer:
self._show_devices_changed_notify_timer.cancel()
self._show_devices_changed_notify_timer = None
await self._oauth.deinit_async()
await self._http.deinit_async()
# Remove notify
self._persistence_notify(
self.__gen_notify_key('dev_list_changed'), None, None)
@ -526,7 +573,7 @@ class MIoTClient:
return False
async def set_prop_async(
self, did: str, siid: int, piid: int, value: any
self, did: str, siid: int, piid: int, value: Any
) -> bool:
if did not in self._device_list_cache:
raise MIoTClientError(f'did not exist, {did}')
@ -612,7 +659,7 @@ class MIoTClient:
0.2, lambda: self._main_loop.create_task(
self.__refresh_props_handler()))
async def get_prop_async(self, did: str, siid: int, piid: int) -> any:
async def get_prop_async(self, did: str, siid: int, piid: int) -> Any:
if did not in self._device_list_cache:
raise MIoTClientError(f'did not exist, {did}')
@ -717,8 +764,8 @@ class MIoTClient:
return None
def sub_prop(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, piid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, piid: int = None, handler_ctx: Any = None
) -> bool:
if did not in self._device_list_cache:
raise MIoTClientError(f'did not exist, {did}')
@ -741,8 +788,8 @@ class MIoTClient:
return True
def sub_event(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, eiid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, eiid: int = None, handler_ctx: Any = None
) -> bool:
if did not in self._device_list_cache:
raise MIoTClientError(f'did not exist, {did}')
@ -764,8 +811,8 @@ class MIoTClient:
return True
def sub_device_state(
self, did: str, handler: Callable[[str, MIoTDeviceState, any], None],
handler_ctx: any = None
self, did: str, handler: Callable[[str, MIoTDeviceState, Any], None],
handler_ctx: Any = None
) -> bool:
"""Call callback handler in main loop"""
if did not in self._device_list_cache:
@ -1060,7 +1107,7 @@ class MIoTClient:
@final
def __on_cloud_device_state_changed(
self, did: str, state: MIoTDeviceState, ctx: any
self, did: str, state: MIoTDeviceState, ctx: Any
) -> None:
_LOGGER.info('cloud device state changed, %s, %s', did, state)
cloud_device = self._device_list_cloud.get(did, None)
@ -1110,7 +1157,7 @@ class MIoTClient:
@final
async def __on_lan_device_state_changed(
self, did: str, state: dict, ctx: any
self, did: str, state: dict, ctx: Any
) -> None:
_LOGGER.info('lan device state changed, %s, %s', did, state)
lan_state_new: bool = state.get('online', False)
@ -1146,7 +1193,7 @@ class MIoTClient:
self.__request_show_devices_changed_notify()
@final
def __on_prop_msg(self, params: dict, ctx: any) -> None:
def __on_prop_msg(self, params: dict, ctx: Any) -> None:
"""params MUST contain did, siid, piid, value"""
# BLE device has no online/offline msg
try:
@ -1158,7 +1205,7 @@ class MIoTClient:
_LOGGER.error('on prop msg error, %s, %s', params, err)
@final
def __on_event_msg(self, params: dict, ctx: any) -> None:
def __on_event_msg(self, params: dict, ctx: Any) -> None:
try:
subs: list[MIoTClientSub] = list(self._sub_tree.iter_match(
f'{params["did"]}/e/{params["siid"]}/{params["eiid"]}'))

View File

@ -51,7 +51,7 @@ import json
import logging
import re
import time
from typing import Optional
from typing import Any, Optional
from urllib.parse import urlencode
import aiohttp
@ -94,10 +94,11 @@ class MIoTOauthClient:
self._oauth_host = DEFAULT_OAUTH2_API_HOST
else:
self._oauth_host = f'{cloud_server}.{DEFAULT_OAUTH2_API_HOST}'
self._session = aiohttp.ClientSession()
self._session = aiohttp.ClientSession(loop=self._main_loop)
def __del__(self):
self._session.close()
async def deinit_async(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
def set_redirect_url(self, redirect_url: str) -> None:
if not isinstance(redirect_url, str) or redirect_url.strip() == '':
@ -250,10 +251,11 @@ class MIoTHttpClient:
cloud_server=cloud_server, client_id=client_id,
access_token=access_token)
self._session = aiohttp.ClientSession()
self._session = aiohttp.ClientSession(loop=self._main_loop)
def __del__(self):
self._session.close()
async def deinit_async(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
def update_http_header(
self, cloud_server: Optional[str] = None,
@ -581,7 +583,7 @@ class MIoTHttpClient:
self, home_ids: list[str] = None
) -> dict[str, dict]:
homeinfos = await self.get_homeinfos_async()
homes: dict[str, dict[str, any]] = {}
homes: dict[str, dict[str, Any]] = {}
devices: dict[str, dict] = {}
for device_type in ['home_list', 'share_home_list']:
homes.setdefault(device_type, {})
@ -661,7 +663,7 @@ class MIoTHttpClient:
raise MIoTHttpError('invalid response result')
return res_obj['result']
async def __get_prop_async(self, did: str, siid: int, piid: int) -> any:
async def __get_prop_async(self, did: str, siid: int, piid: int) -> Any:
results = await self.get_props_async(
params=[{'did': did, 'siid': siid, 'piid': piid}])
if not results:
@ -722,7 +724,7 @@ class MIoTHttpClient:
async def get_prop_async(
self, did: str, siid: int, piid: int, immediately: bool = False
) -> any:
) -> Any:
if immediately:
return await self.__get_prop_async(did, siid, piid)
key: str = f'{did}.{siid}.{piid}'

View File

@ -47,7 +47,7 @@ MIoT device instance.
"""
import asyncio
from abc import abstractmethod
from typing import Callable, Optional
from typing import Any, Callable, Optional
import logging
from homeassistant.helpers.entity import Entity
@ -103,7 +103,7 @@ _LOGGER = logging.getLogger(__name__)
class MIoTEntityData:
"""MIoT Entity Data."""
platform: str
device_class: any
device_class: Any
spec: MIoTSpecInstance | MIoTSpecService
props: set[MIoTSpecProperty]
@ -243,8 +243,8 @@ class MIoTDevice:
return True
def sub_property(
self, handler: Callable[[dict, any], None], siid: int = None,
piid: int = None, handler_ctx: any = None
self, handler: Callable[[dict, Any], None], siid: int = None,
piid: int = None, handler_ctx: Any = None
) -> bool:
return self.miot_client.sub_prop(
did=self._did, handler=handler, siid=siid, piid=piid,
@ -254,8 +254,8 @@ class MIoTDevice:
return self.miot_client.unsub_prop(did=self._did, siid=siid, piid=piid)
def sub_event(
self, handler: Callable[[dict, any], None], siid: int = None,
eiid: int = None, handler_ctx: any = None
self, handler: Callable[[dict, Any], None], siid: int = None,
eiid: int = None, handler_ctx: Any = None
) -> bool:
return self.miot_client.sub_event(
did=self._did, handler=handler, siid=siid, eiid=eiid,
@ -688,7 +688,7 @@ class MIoTDevice:
return None
def __on_device_state_changed(
self, did: str, state: MIoTDeviceState, ctx: any
self, did: str, state: MIoTDeviceState, ctx: Any
) -> None:
self._online = state
for key, handler in self._device_state_sub_list.items():
@ -704,11 +704,11 @@ class MIoTServiceEntity(Entity):
entity_data: MIoTEntityData
_main_loop: asyncio.AbstractEventLoop
_prop_value_map: dict[MIoTSpecProperty, any]
_prop_value_map: dict[MIoTSpecProperty, Any]
_event_occurred_handler: Callable[[MIoTSpecEvent, dict], None]
_prop_changed_subs: dict[
MIoTSpecProperty, Callable[[MIoTSpecProperty, any], None]]
MIoTSpecProperty, Callable[[MIoTSpecProperty, Any], None]]
_pending_write_ha_state_timer: Optional[asyncio.TimerHandle]
@ -759,7 +759,7 @@ class MIoTServiceEntity(Entity):
def sub_prop_changed(
self, prop: MIoTSpecProperty,
handler: Callable[[MIoTSpecProperty, any], None]
handler: Callable[[MIoTSpecProperty, Any], None]
) -> None:
if not prop or not handler:
_LOGGER.error(
@ -816,13 +816,13 @@ class MIoTServiceEntity(Entity):
self.miot_device.unsub_event(
siid=event.service.iid, eiid=event.iid)
def get_map_description(self, map_: dict[int, any], key: int) -> any:
def get_map_description(self, map_: dict[int, Any], key: int) -> Any:
if map_ is None:
return None
return map_.get(key, None)
def get_map_value(
self, map_: dict[int, any], description: any
self, map_: dict[int, Any], description: Any
) -> Optional[int]:
if map_ is None:
return None
@ -831,7 +831,7 @@ class MIoTServiceEntity(Entity):
return key
return None
def get_prop_value(self, prop: MIoTSpecProperty) -> any:
def get_prop_value(self, prop: MIoTSpecProperty) -> Any:
if not prop:
_LOGGER.error(
'get_prop_value error, property is None, %s, %s',
@ -839,7 +839,7 @@ class MIoTServiceEntity(Entity):
return None
return self._prop_value_map.get(prop, None)
def set_prop_value(self, prop: MIoTSpecProperty, value: any) -> None:
def set_prop_value(self, prop: MIoTSpecProperty, value: Any) -> None:
if not prop:
_LOGGER.error(
'set_prop_value error, property is None, %s, %s',
@ -848,7 +848,7 @@ class MIoTServiceEntity(Entity):
self._prop_value_map[prop] = value
async def set_property_async(
self, prop: MIoTSpecProperty, value: any, update: bool = True
self, prop: MIoTSpecProperty, value: Any, update: bool = True
) -> bool:
value = prop.value_format(value)
if not prop:
@ -875,7 +875,7 @@ class MIoTServiceEntity(Entity):
self.async_write_ha_state()
return True
async def get_property_async(self, prop: MIoTSpecProperty) -> any:
async def get_property_async(self, prop: MIoTSpecProperty) -> Any:
if not prop:
_LOGGER.error(
'get property failed, property is None, %s, %s',
@ -914,7 +914,7 @@ class MIoTServiceEntity(Entity):
f'{e}, {self.entity_id}, {self.name}, {action.name}') from e
return True
def __on_properties_changed(self, params: dict, ctx: any) -> None:
def __on_properties_changed(self, params: dict, ctx: Any) -> None:
_LOGGER.debug('properties changed, %s', params)
for prop in self.entity_data.props:
if (
@ -922,7 +922,7 @@ class MIoTServiceEntity(Entity):
or prop.service.iid != params['siid']
):
continue
value: any = prop.value_format(params['value'])
value: Any = prop.value_format(params['value'])
self._prop_value_map[prop] = value
if prop in self._prop_changed_subs:
self._prop_changed_subs[prop](prop, value)
@ -930,7 +930,7 @@ class MIoTServiceEntity(Entity):
if not self._pending_write_ha_state_timer:
self.async_write_ha_state()
def __on_event_occurred(self, params: dict, ctx: any) -> None:
def __on_event_occurred(self, params: dict, ctx: Any) -> None:
_LOGGER.debug('event occurred, %s', params)
if self._event_occurred_handler is None:
return
@ -988,9 +988,9 @@ class MIoTPropertyEntity(Entity):
_main_loop: asyncio.AbstractEventLoop
# {'min':int, 'max':int, 'step': int}
_value_range: dict[str, int]
# {any: any}
_value_list: dict[any, any]
_value: any
# {Any: Any}
_value_list: dict[Any, Any]
_value: Any
_pending_write_ha_state_timer: Optional[asyncio.TimerHandle]
@ -1054,12 +1054,12 @@ class MIoTPropertyEntity(Entity):
self.miot_device.unsub_property(
siid=self.service.iid, piid=self.spec.iid)
def get_vlist_description(self, value: any) -> str:
def get_vlist_description(self, value: Any) -> str:
if not self._value_list:
return None
return self._value_list.get(value, None)
def get_vlist_value(self, description: str) -> any:
def get_vlist_value(self, description: str) -> Any:
if not self._value_list:
return None
for key, value in self._value_list.items():
@ -1067,7 +1067,7 @@ class MIoTPropertyEntity(Entity):
return key
return None
async def set_property_async(self, value: any) -> bool:
async def set_property_async(self, value: Any) -> bool:
if not self.spec.writable:
raise RuntimeError(
f'set property failed, not writable, '
@ -1084,7 +1084,7 @@ class MIoTPropertyEntity(Entity):
self.async_write_ha_state()
return True
async def get_property_async(self) -> any:
async def get_property_async(self) -> Any:
if not self.spec.readable:
_LOGGER.error(
'get property failed, not readable, %s, %s',
@ -1095,7 +1095,7 @@ class MIoTPropertyEntity(Entity):
did=self.miot_device.did, siid=self.spec.service.iid,
piid=self.spec.iid))
def __on_value_changed(self, params: dict, ctx: any) -> None:
def __on_value_changed(self, params: dict, ctx: Any) -> None:
_LOGGER.debug('property changed, %s', params)
self._value = self.spec.value_format(params['value'])
if not self._pending_write_ha_state_timer:
@ -1135,7 +1135,7 @@ class MIoTEventEntity(Entity):
service: MIoTSpecService
_main_loop: asyncio.AbstractEventLoop
_value: any
_value: Any
_attr_event_types: list[str]
_arguments_map: dict[int, str]
@ -1192,10 +1192,10 @@ class MIoTEventEntity(Entity):
@abstractmethod
def on_event_occurred(
self, name: str, arguments: list[dict[int, any]]
self, name: str, arguments: list[dict[int, Any]]
): ...
def __on_event_occurred(self, params: dict, ctx: any) -> None:
def __on_event_occurred(self, params: dict, ctx: Any) -> None:
_LOGGER.debug('event occurred, %s', params)
trans_arg = {}
try:

View File

@ -46,6 +46,7 @@ off Xiaomi or its affiliates' products.
MIoT error code and exception.
"""
from enum import Enum
from typing import Any
class MIoTErrorCode(Enum):
@ -78,10 +79,10 @@ class MIoTErrorCode(Enum):
class MIoTError(Exception):
"""MIoT error."""
code: MIoTErrorCode
message: any
message: Any
def __init__(
self, message: any, code: MIoTErrorCode = MIoTErrorCode.CODE_UNKNOWN
self, message: Any, code: MIoTErrorCode = MIoTErrorCode.CODE_UNKNOWN
) -> None:
self.message = message
self.code = code

View File

@ -49,7 +49,7 @@ import selectors
import heapq
import time
import traceback
from typing import Callable, TypeVar
from typing import Any, Callable, TypeVar
import logging
import threading
@ -64,17 +64,17 @@ TimeoutHandle = TypeVar('TimeoutHandle')
class MIoTFdHandler:
"""File descriptor handler."""
fd: int
read_handler: Callable[[any], None]
read_handler_ctx: any
write_handler: Callable[[any], None]
write_handler_ctx: any
read_handler: Callable[[Any], None]
read_handler_ctx: Any
write_handler: Callable[[Any], None]
write_handler_ctx: Any
def __init__(
self, fd: int,
read_handler: Callable[[any], None] = None,
read_handler_ctx: any = None,
write_handler: Callable[[any], None] = None,
write_handler_ctx: any = None
read_handler: Callable[[Any], None] = None,
read_handler_ctx: Any = None,
write_handler: Callable[[Any], None] = None,
write_handler_ctx: Any = None
) -> None:
self.fd = fd
self.read_handler = read_handler
@ -87,13 +87,13 @@ class MIoTTimeout:
"""Timeout handler."""
key: TimeoutHandle
target: int
handler: Callable[[any], None]
handler_ctx: any
handler: Callable[[Any], None]
handler_ctx: Any
def __init__(
self, key: str = None, target: int = None,
handler: Callable[[any], None] = None,
handler_ctx: any = None
handler: Callable[[Any], None] = None,
handler_ctx: Any = None
) -> None:
self.key = key
self.target = target
@ -185,8 +185,8 @@ class MIoTEventLoop:
self._timer_handlers = {}
def set_timeout(
self, timeout_ms: int, handler: Callable[[any], None],
handler_ctx: any = None
self, timeout_ms: int, handler: Callable[[Any], None],
handler_ctx: Any = None
) -> TimeoutHandle:
"""Set a timer."""
if timeout_ms is None or handler is None:
@ -211,7 +211,7 @@ class MIoTEventLoop:
heapq.heapify(self._timer_heap)
def set_read_handler(
self, fd: int, handler: Callable[[any], None], handler_ctx: any = None
self, fd: int, handler: Callable[[Any], None], handler_ctx: Any = None
) -> bool:
"""Set a read handler for a file descriptor.
@ -222,7 +222,7 @@ class MIoTEventLoop:
fd, is_read=True, handler=handler, handler_ctx=handler_ctx)
def set_write_handler(
self, fd: int, handler: Callable[[any], None], handler_ctx: any = None
self, fd: int, handler: Callable[[Any], None], handler_ctx: Any = None
) -> bool:
"""Set a write handler for a file descriptor.
@ -233,8 +233,8 @@ class MIoTEventLoop:
fd, is_read=False, handler=handler, handler_ctx=handler_ctx)
def __set_handler(
self, fd, is_read: bool, handler: Callable[[any], None],
handler_ctx: any = None
self, fd, is_read: bool, handler: Callable[[Any], None],
handler_ctx: Any = None
) -> bool:
"""Set a handler."""
if fd is None:

View File

@ -58,7 +58,7 @@ import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Callable, Optional, final
from typing import Any, Callable, Optional, final
from paho.mqtt.client import (
MQTT_ERR_SUCCESS,
@ -173,9 +173,9 @@ class MipsCmdType(Enum):
class MipsCmd:
"""MIoT Pub/Sub command."""
type_: MipsCmdType
data: any
data: Any
def __init__(self, type_: MipsCmdType, data: any) -> None:
def __init__(self, type_: MipsCmdType, data: Any) -> None:
self.type_ = type_
self.data = data
@ -184,8 +184,8 @@ class MipsCmd:
class MipsRequest:
"""MIoT Pub/Sub request."""
mid: int = None
on_reply: Callable[[str, any], None] = None
on_reply_ctx: any = None
on_reply: Callable[[str, Any], None] = None
on_reply_ctx: Any = None
timer: TimeoutHandle = None
@ -194,8 +194,8 @@ class MipsRequestData:
"""MIoT Pub/Sub request data."""
topic: str = None
payload: str = None
on_reply: Callable[[str, any], None] = None
on_reply_ctx: any = None
on_reply: Callable[[str, Any], None] = None
on_reply_ctx: Any = None
timeout_ms: int = None
@ -223,8 +223,8 @@ class MipsApi:
param2: payload
param3: handler_ctx
"""
handler: Callable[[MipsIncomingApiCall, str, any], None] = None
handler_ctx: any = None
handler: Callable[[MipsIncomingApiCall, str, Any], None] = None
handler_ctx: Any = None
class MipsRegApi(MipsApi):
@ -247,8 +247,8 @@ class MipsBroadcast:
param 2: msg payload
param 3: handle_ctx
"""
handler: Callable[[str, str, any], None] = None
handler_ctx: any = None
handler: Callable[[str, str, Any], None] = None
handler_ctx: Any = None
def __str__(self) -> str:
return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}'
@ -265,7 +265,6 @@ class MipsState:
"""
str: key
bool: mips connect state
any: ctx
"""
handler: Callable[[str, bool], asyncio.Future] = None
@ -288,10 +287,10 @@ class MipsDeviceState:
"""handler
str: did
MIoTDeviceState: online/offline/disable
any: ctx
Any: ctx
"""
handler: Callable[[str, MIoTDeviceState, any], None] = None
handler_ctx: any = None
handler: Callable[[str, MIoTDeviceState, Any], None] = None
handler_ctx: Any = None
class MipsRegDeviceState(MipsDeviceState):
@ -512,8 +511,8 @@ class MipsClient(ABC):
@final
def mev_set_timeout(
self, timeout_ms: int, handler: Callable[[any], None],
handler_ctx: any = None
self, timeout_ms: int, handler: Callable[[Any], None],
handler_ctx: Any = None
) -> Optional[TimeoutHandle]:
"""set timeout.
NOTICE: Internal function, only mips threads are allowed to call
@ -534,7 +533,7 @@ class MipsClient(ABC):
@final
def mev_set_read_handler(
self, fd: int, handler: Callable[[any], None], handler_ctx: any
self, fd: int, handler: Callable[[Any], None], handler_ctx: Any
) -> bool:
"""set read handler.
NOTICE: Internal function, only mips threads are allowed to call
@ -546,7 +545,7 @@ class MipsClient(ABC):
@final
def mev_set_write_handler(
self, fd: int, handler: Callable[[any], None], handler_ctx: any
self, fd: int, handler: Callable[[Any], None], handler_ctx: Any
) -> bool:
"""set write handler.
NOTICE: Internal function, only mips threads are allowed to call
@ -604,8 +603,8 @@ class MipsClient(ABC):
@abstractmethod
def sub_prop(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, piid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, piid: int = None, handler_ctx: Any = None
) -> bool: ...
@abstractmethod
@ -615,8 +614,8 @@ class MipsClient(ABC):
@abstractmethod
def sub_event(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, eiid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, eiid: int = None, handler_ctx: Any = None
) -> bool: ...
@abstractmethod
@ -632,11 +631,11 @@ class MipsClient(ABC):
@abstractmethod
async def get_prop_async(
self, did: str, siid: int, piid: int, timeout_ms: int = 10000
) -> any: ...
) -> Any: ...
@abstractmethod
async def set_prop_async(
self, did: str, siid: int, piid: int, value: any,
self, did: str, siid: int, piid: int, value: Any,
timeout_ms: int = 10000
) -> bool: ...
@ -709,7 +708,7 @@ class MipsClient(ABC):
return False
@final
def _mips_send_cmd(self, type_: MipsCmdType, data: any) -> bool:
def _mips_send_cmd(self, type_: MipsCmdType, data: Any) -> bool:
if self._mips_queue is None or self._cmd_event_fd is None:
raise MIoTMipsError('send mips cmd disable')
# Put data to queue
@ -723,7 +722,7 @@ class MipsClient(ABC):
if threading.current_thread() is not self._mips_thread:
raise MIoTMipsError('illegal call')
def __mips_cmd_read_handler(self, ctx: any) -> None:
def __mips_cmd_read_handler(self, ctx: Any) -> None:
fd_value = os.eventfd_read(self._cmd_event_fd)
if fd_value == 0:
return
@ -763,20 +762,20 @@ class MipsClient(ABC):
if self._on_mips_cmd:
self._on_mips_cmd(mips_cmd=mips_cmd)
def __mqtt_read_handler(self, ctx: any) -> None:
def __mqtt_read_handler(self, ctx: Any) -> None:
self.__mqtt_loop_handler(ctx=ctx)
def __mqtt_write_handler(self, ctx: any) -> None:
def __mqtt_write_handler(self, ctx: Any) -> None:
self.mev_set_write_handler(self._mqtt_fd, None, None)
self.__mqtt_loop_handler(ctx=ctx)
def __mqtt_timer_handler(self, ctx: any) -> None:
def __mqtt_timer_handler(self, ctx: Any) -> None:
self.__mqtt_loop_handler(ctx=ctx)
if self._mqtt:
self._mqtt_timer = self.mev_set_timeout(
self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None)
def __mqtt_loop_handler(self, ctx: any) -> None:
def __mqtt_loop_handler(self, ctx: Any) -> None:
try:
if self._mqtt:
self._mqtt.loop_read()
@ -896,7 +895,7 @@ class MipsClient(ABC):
self._mips_reconnect_timer = self.mev_set_timeout(
interval, self.__mips_connect, None)
def __mips_sub_internal_pending_handler(self, ctx: any) -> None:
def __mips_sub_internal_pending_handler(self, ctx: Any) -> None:
subbed_count = 1
for topic in list(self._mips_sub_pending_map.keys()):
if subbed_count > self.MIPS_SUB_PATCH:
@ -923,7 +922,7 @@ class MipsClient(ABC):
else:
self._mips_sub_pending_timer = None
def __mips_connect(self, ctx: any = None) -> None:
def __mips_connect(self, ctx: Any = None) -> None:
result = MQTT_ERR_UNKNOWN
if self._mips_reconnect_timer:
self.mev_clear_timeout(self._mips_reconnect_timer)
@ -1034,8 +1033,8 @@ class MipsCloudClient(MipsClient):
@final
def sub_prop(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, piid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, piid: int = None, handler_ctx: Any = None
) -> bool:
if not isinstance(did, str) or handler is None:
raise MIoTMipsError('invalid params')
@ -1044,7 +1043,7 @@ class MipsCloudClient(MipsClient):
f'device/{did}/up/properties_changed/'
f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
def on_prop_msg(topic: str, payload: str, ctx: any) -> bool:
def on_prop_msg(topic: str, payload: str, ctx: Any) -> bool:
try:
msg: dict = json.loads(payload)
except json.JSONDecodeError:
@ -1077,8 +1076,8 @@ class MipsCloudClient(MipsClient):
@final
def sub_event(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, eiid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, eiid: int = None, handler_ctx: Any = None
) -> bool:
if not isinstance(did, str) or handler is None:
raise MIoTMipsError('invalid params')
@ -1087,7 +1086,7 @@ class MipsCloudClient(MipsClient):
f'device/{did}/up/event_occured/'
f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
def on_event_msg(topic: str, payload: str, ctx: any) -> bool:
def on_event_msg(topic: str, payload: str, ctx: Any) -> bool:
try:
msg: dict = json.loads(payload)
except json.JSONDecodeError:
@ -1122,15 +1121,15 @@ class MipsCloudClient(MipsClient):
@final
def sub_device_state(
self, did: str, handler: Callable[[str, MIoTDeviceState, any], None],
handler_ctx: any = None
self, did: str, handler: Callable[[str, MIoTDeviceState, Any], None],
handler_ctx: Any = None
) -> bool:
"""subscribe online state."""
if not isinstance(did, str) or handler is None:
raise MIoTMipsError('invalid params')
topic: str = f'device/{did}/state/#'
def on_state_msg(topic: str, payload: str, ctx: any) -> None:
def on_state_msg(topic: str, payload: str, ctx: Any) -> None:
msg: dict = json.loads(payload)
# {"device_id":"xxxx","device_name":"米家智能插座3 ","event":"online",
# "model": "cuco.plug.v3","timestamp":1709001070828,"uid":xxxx}
@ -1163,11 +1162,11 @@ class MipsCloudClient(MipsClient):
async def get_prop_async(
self, did: str, siid: int, piid: int, timeout_ms: int = 10000
) -> any:
) -> Any:
raise NotImplementedError('please call in http client')
async def set_prop_async(
self, did: str, siid: int, piid: int, value: any,
self, did: str, siid: int, piid: int, value: Any,
timeout_ms: int = 10000
) -> bool:
raise NotImplementedError('please call in http client')
@ -1199,8 +1198,8 @@ class MipsCloudClient(MipsClient):
self._mips_unsub_internal(topic=unreg_bc.topic)
def __reg_broadcast(
self, topic: str, handler: Callable[[str, str, any], None],
handler_ctx: any = None
self, topic: str, handler: Callable[[str, str, Any], None],
handler_ctx: Any = None
) -> bool:
return self._mips_send_cmd(
type_=MipsCmdType.REG_BROADCAST,
@ -1259,7 +1258,7 @@ class MipsLocalClient(MipsClient):
_device_state_sub_map: dict[str, MipsDeviceState]
_get_prop_queue: dict[str, list]
_get_prop_timer: asyncio.TimerHandle
_on_dev_list_changed: Callable[[any, list[str]], asyncio.Future]
_on_dev_list_changed: Callable[[Any, list[str]], asyncio.Future]
def __init__(
self, did: str, host: str, group_id: str,
@ -1347,14 +1346,14 @@ class MipsLocalClient(MipsClient):
@final
def sub_prop(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, piid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, piid: int = None, handler_ctx: Any = None
) -> bool:
topic: str = (
f'appMsg/notify/iot/{did}/property/'
f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}')
def on_prop_msg(topic: str, payload: str, ctx: any):
def on_prop_msg(topic: str, payload: str, ctx: Any):
msg: dict = json.loads(payload)
if (
msg is None
@ -1380,14 +1379,14 @@ class MipsLocalClient(MipsClient):
@final
def sub_event(
self, did: str, handler: Callable[[dict, any], None],
siid: int = None, eiid: int = None, handler_ctx: any = None
self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, eiid: int = None, handler_ctx: Any = None
) -> bool:
topic: str = (
f'appMsg/notify/iot/{did}/event/'
f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}')
def on_event_msg(topic: str, payload: str, ctx: any):
def on_event_msg(topic: str, payload: str, ctx: Any):
msg: dict = json.loads(payload)
if (
msg is None
@ -1414,7 +1413,7 @@ class MipsLocalClient(MipsClient):
@final
async def get_prop_safe_async(
self, did: str, siid: int, piid: int, timeout_ms: int = 10000
) -> any:
) -> Any:
self._get_prop_queue.setdefault(did, [])
fut: asyncio.Future = self.main_loop.create_future()
self._get_prop_queue[did].append({
@ -1434,7 +1433,7 @@ class MipsLocalClient(MipsClient):
@final
async def get_prop_async(
self, did: str, siid: int, piid: int, timeout_ms: int = 10000
) -> any:
) -> Any:
result_obj = await self.__request_async(
topic='proxy/get',
payload=json.dumps({
@ -1449,7 +1448,7 @@ class MipsLocalClient(MipsClient):
@final
async def set_prop_async(
self, did: str, siid: int, piid: int, value: any,
self, did: str, siid: int, piid: int, value: Any,
timeout_ms: int = 10000
) -> dict:
payload_obj: dict = {
@ -1580,13 +1579,13 @@ class MipsLocalClient(MipsClient):
@final
@property
def on_dev_list_changed(self) -> Callable[[any, list[str]], asyncio.Future]:
def on_dev_list_changed(self) -> Callable[[Any, list[str]], asyncio.Future]:
return self._on_dev_list_changed
@final
@on_dev_list_changed.setter
def on_dev_list_changed(
self, func: Callable[[any, list[str]], asyncio.Future]
self, func: Callable[[Any, list[str]], asyncio.Future]
) -> None:
"""run in main loop."""
self._on_dev_list_changed = func
@ -1731,8 +1730,8 @@ class MipsLocalClient(MipsClient):
def __request(
self, topic: str, payload: str,
on_reply: Callable[[str, any], None],
on_reply_ctx: any = None, timeout_ms: int = 10000
on_reply: Callable[[str, Any], None],
on_reply_ctx: Any = None, timeout_ms: int = 10000
) -> bool:
if topic is None or payload is None or on_reply is None:
raise MIoTMipsError('invalid params')
@ -1745,8 +1744,8 @@ class MipsLocalClient(MipsClient):
return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data)
def __reg_broadcast(
self, topic: str, handler: Callable[[str, str, any], None],
handler_ctx: any
self, topic: str, handler: Callable[[str, str, Any], None],
handler_ctx: Any
) -> bool:
return self._mips_send_cmd(
type_=MipsCmdType.REG_BROADCAST,
@ -1764,7 +1763,7 @@ class MipsLocalClient(MipsClient):
) -> dict:
fut_handler: asyncio.Future = self.main_loop.create_future()
def on_msg_reply(payload: str, ctx: any):
def on_msg_reply(payload: str, ctx: Any):
fut: asyncio.Future = ctx
if fut:
self.main_loop.call_soon_threadsafe(fut.set_result, payload)

View File

@ -49,7 +49,7 @@ import asyncio
import json
import platform
import time
from typing import Optional
from typing import Any, Optional
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import logging
@ -78,9 +78,9 @@ class MIoTSpecBase:
# External params
platform: str
device_class: any
device_class: Any
icon: str
external_unit: any
external_unit: Any
spec_id: str
@ -166,7 +166,7 @@ class MIoTSpecProperty(MIoTSpecBase):
def notifiable(self):
return self._notifiable
def value_format(self, value: any) -> any:
def value_format(self, value: Any) -> Any:
if value is None:
return None
if self.format_ == 'int':
@ -296,7 +296,7 @@ class MIoTSpecInstance:
# External params
platform: str
device_class: any
device_class: Any
icon: str
def __init__(

View File

@ -56,7 +56,7 @@ import hashlib
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from typing import Optional, Union
from typing import Any, Optional, Union
import logging
from urllib.request import Request, urlopen
from cryptography.hazmat.primitives import serialization
@ -419,7 +419,7 @@ class MIoTStorage:
return await fut
def update_user_config(
self, uid: str, cloud_server: str, config: Optional[dict[str, any]],
self, uid: str, cloud_server: str, config: Optional[dict[str, Any]],
replace: bool = False
) -> bool:
if config is not None and len(config) == 0:
@ -443,7 +443,7 @@ class MIoTStorage:
domain=config_domain, name=config_name, data=local_config)
async def update_user_config_async(
self, uid: str, cloud_server: str, config: Optional[dict[str, any]],
self, uid: str, cloud_server: str, config: Optional[dict[str, Any]],
replace: bool = False
) -> bool:
"""Update user configuration.
@ -480,7 +480,7 @@ class MIoTStorage:
def load_user_config(
self, uid: str, cloud_server: str, keys: Optional[list[str]] = None
) -> dict[str, any]:
) -> dict[str, Any]:
if keys is not None and len(keys) == 0:
# Do nothing
return {}
@ -494,7 +494,7 @@ class MIoTStorage:
async def load_user_config_async(
self, uid: str, cloud_server: str, keys: Optional[list[str]] = None
) -> dict[str, any]:
) -> dict[str, Any]:
"""Load user configuration.
Args:
@ -503,7 +503,7 @@ class MIoTStorage:
query key list, return all config item if keys is None
Returns:
dict[str, any]: query result
dict[str, Any]: query result
"""
if keys is not None and len(keys) == 0:
# Do nothing

View File

@ -47,6 +47,7 @@ Sensor entities for Xiaomi Home.
"""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -107,7 +108,7 @@ class Sensor(MIoTPropertyEntity, SensorEntity):
self._attr_icon = spec.icon
@property
def native_value(self) -> any:
def native_value(self) -> Any:
"""Return the current value of the sensor."""
if self._value_range and isinstance(self._value, (int, float)):
if (

View File

@ -47,7 +47,7 @@ Water heater entities for Xiaomi Home.
"""
from __future__ import annotations
import logging
from typing import Optional
from typing import Any, Optional
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -93,7 +93,7 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity):
_prop_target_temp: Optional[MIoTSpecProperty]
_prop_mode: Optional[MIoTSpecProperty]
_mode_list: Optional[dict[any, any]]
_mode_list: Optional[dict[Any, Any]]
def __init__(
self, miot_device: MIoTDevice, entity_data: MIoTEntityData
@ -164,7 +164,7 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity):
"""Turn the water heater off."""
await self.set_property_async(prop=self._prop_on, value=False)
async def async_set_temperature(self, **kwargs: any) -> None:
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set the temperature the water heater should heat water to."""
await self.set_property_async(
prop=self._prop_target_temp, value=kwargs[ATTR_TEMPERATURE])

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Unit test for miot_lan.py."""
from typing import Any
import pytest
import asyncio
from zeroconf import IPVersion
@ -79,7 +80,7 @@ async def test_lan_async(test_devices: dict):
evt_push_unavailable = asyncio.Event()
await miot_lan.vote_for_lan_ctrl_async(key='test', vote=True)
async def device_state_change(did: str, state: dict, ctx: any):
async def device_state_change(did: str, state: dict, ctx: Any):
print('device state change, ', did, state)
if did != test_did:
return