Merge branch 'XiaoMi:main' into main

This commit is contained in:
ted 2025-01-13 13:58:58 +08:00 committed by GitHub
commit bf33f0c60d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 652 additions and 62 deletions

View File

@ -426,14 +426,12 @@ class XiaomiMihomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
cloud_server=self._cloud_server, cloud_server=self._cloud_server,
uuid=self._uuid, uuid=self._uuid,
loop=self._main_loop) loop=self._main_loop)
state = hashlib.sha1(
f'd=ha.{self._uuid}'.encode('utf-8')).hexdigest()
self.hass.data[DOMAIN][self._virtual_did]['oauth_state'] = state
self._cc_oauth_auth_url = miot_oauth.gen_auth_url( self._cc_oauth_auth_url = miot_oauth.gen_auth_url(
redirect_url=self._oauth_redirect_url_full, state=state) redirect_url=self._oauth_redirect_url_full)
self.hass.data[DOMAIN][self._virtual_did]['oauth_state'] = (
miot_oauth.state)
_LOGGER.info( _LOGGER.info(
'async_step_oauth, oauth_url: %s', 'async_step_oauth, oauth_url: %s', self._cc_oauth_auth_url)
self._cc_oauth_auth_url)
webhook_async_unregister( webhook_async_unregister(
self.hass, webhook_id=self._virtual_did) self.hass, webhook_id=self._virtual_did)
webhook_async_register( webhook_async_register(
@ -1150,17 +1148,12 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
async def async_step_oauth(self, user_input=None): async def async_step_oauth(self, user_input=None):
try: try:
if self._cc_task_oauth is None: if self._cc_task_oauth is None:
state = hashlib.sha1(
f'd=ha.{self._entry_data["uuid"]}'.encode('utf-8')
).hexdigest()
self.hass.data[DOMAIN][self._virtual_did]['oauth_state'] = state
self._miot_oauth.set_redirect_url(
redirect_url=self._oauth_redirect_url_full)
self._cc_oauth_auth_url = self._miot_oauth.gen_auth_url( self._cc_oauth_auth_url = self._miot_oauth.gen_auth_url(
redirect_url=self._oauth_redirect_url_full, state=state) redirect_url=self._oauth_redirect_url_full)
self.hass.data[DOMAIN][self._virtual_did]['oauth_state'] = (
self._miot_oauth.state)
_LOGGER.info( _LOGGER.info(
'async_step_oauth, oauth_url: %s', 'async_step_oauth, oauth_url: %s', self._cc_oauth_auth_url)
self._cc_oauth_auth_url)
webhook_async_unregister( webhook_async_unregister(
self.hass, webhook_id=self._virtual_did) self.hass, webhook_id=self._virtual_did)
webhook_async_register( webhook_async_register(

View File

@ -47,6 +47,7 @@ MIoT http client.
""" """
import asyncio import asyncio
import base64 import base64
import hashlib
import json import json
import logging import logging
import re import re
@ -76,6 +77,7 @@ class MIoTOauthClient:
_client_id: int _client_id: int
_redirect_url: str _redirect_url: str
_device_id: str _device_id: str
_state: str
def __init__( def __init__(
self, client_id: str, redirect_url: str, cloud_server: str, self, client_id: str, redirect_url: str, cloud_server: str,
@ -98,8 +100,14 @@ class MIoTOauthClient:
else: else:
self._oauth_host = f'{cloud_server}.{DEFAULT_OAUTH2_API_HOST}' self._oauth_host = f'{cloud_server}.{DEFAULT_OAUTH2_API_HOST}'
self._device_id = f'ha.{uuid}' self._device_id = f'ha.{uuid}'
self._state = hashlib.sha1(
f'd={self._device_id}'.encode('utf-8')).hexdigest()
self._session = aiohttp.ClientSession(loop=self._main_loop) self._session = aiohttp.ClientSession(loop=self._main_loop)
@property
def state(self) -> str:
return self._state
async def deinit_async(self) -> None: async def deinit_async(self) -> None:
if self._session and not self._session.closed: if self._session and not self._session.closed:
await self._session.close() await self._session.close()
@ -136,7 +144,8 @@ class MIoTOauthClient:
'redirect_uri': redirect_url or self._redirect_url, 'redirect_uri': redirect_url or self._redirect_url,
'client_id': self._client_id, 'client_id': self._client_id,
'response_type': 'code', 'response_type': 'code',
'device_id': self._device_id 'device_id': self._device_id,
'state': self._state
} }
if state: if state:
params['state'] = state params['state'] = state

View File

@ -117,7 +117,7 @@ class MipsServiceData:
self.type = service_info.type self.type = service_info.type
self.server = service_info.server or '' self.server = service_info.server or ''
# Parse profile # Parse profile
self.did = str(int.from_bytes(self.profile_bin[1:9])) self.did = str(int.from_bytes(self.profile_bin[1:9], byteorder='big'))
self.group_id = binascii.hexlify( self.group_id = binascii.hexlify(
self.profile_bin[9:17][::-1]).decode('utf-8') self.profile_bin[9:17][::-1]).decode('utf-8')
self.role = int(self.profile_bin[20] >> 4) self.role = int(self.profile_bin[20] >> 4)

View File

@ -1,11 +1,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Test rule format.""" """Test rule format."""
import json import json
import logging
from os import listdir, path from os import listdir, path
from typing import Optional from typing import Optional
import pytest import pytest
import yaml import yaml
_LOGGER = logging.getLogger(__name__)
ROOT_PATH: str = path.dirname(path.abspath(__file__)) ROOT_PATH: str = path.dirname(path.abspath(__file__))
TRANS_RELATIVE_PATH: str = path.join( TRANS_RELATIVE_PATH: str = path.join(
ROOT_PATH, '../custom_components/xiaomi_home/translations') ROOT_PATH, '../custom_components/xiaomi_home/translations')
@ -27,10 +30,10 @@ def load_json_file(file_path: str) -> Optional[dict]:
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file) return json.load(file)
except FileNotFoundError: except FileNotFoundError:
print(file_path, 'is not found.') _LOGGER.info('%s is not found.', file_path,)
return None return None
except json.JSONDecodeError: except json.JSONDecodeError:
print(file_path, 'is not a valid JSON file.') _LOGGER.info('%s is not a valid JSON file.', file_path)
return None return None
@ -44,10 +47,10 @@ def load_yaml_file(file_path: str) -> Optional[dict]:
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file) return yaml.safe_load(file)
except FileNotFoundError: except FileNotFoundError:
print(file_path, 'is not found.') _LOGGER.info('%s is not found.', file_path)
return None return None
except yaml.YAMLError: except yaml.YAMLError:
print(file_path, 'is not a valid YAML file.') _LOGGER.info('%s, is not a valid YAML file.', file_path)
return None return None
@ -116,37 +119,43 @@ def bool_trans(d: dict) -> bool:
return False return False
default_trans: dict = d['translate'].pop('default') default_trans: dict = d['translate'].pop('default')
if not default_trans: if not default_trans:
print('default trans is empty') _LOGGER.info('default trans is empty')
return False return False
default_keys: set[str] = set(default_trans.keys()) default_keys: set[str] = set(default_trans.keys())
for key, trans in d['translate'].items(): for key, trans in d['translate'].items():
trans_keys: set[str] = set(trans.keys()) trans_keys: set[str] = set(trans.keys())
if set(trans.keys()) != default_keys: if set(trans.keys()) != default_keys:
print('bool trans inconsistent', key, default_keys, trans_keys) _LOGGER.info(
'bool trans inconsistent, %s, %s, %s',
key, default_keys, trans_keys)
return False return False
return True return True
def compare_dict_structure(dict1: dict, dict2: dict) -> bool: def compare_dict_structure(dict1: dict, dict2: dict) -> bool:
if not isinstance(dict1, dict) or not isinstance(dict2, dict): if not isinstance(dict1, dict) or not isinstance(dict2, dict):
print('invalid type') _LOGGER.info('invalid type')
return False return False
if dict1.keys() != dict2.keys(): if dict1.keys() != dict2.keys():
print('inconsistent key values, ', dict1.keys(), dict2.keys()) _LOGGER.info(
'inconsistent key values, %s, %s', dict1.keys(), dict2.keys())
return False return False
for key in dict1: for key in dict1:
if isinstance(dict1[key], dict) and isinstance(dict2[key], dict): if isinstance(dict1[key], dict) and isinstance(dict2[key], dict):
if not compare_dict_structure(dict1[key], dict2[key]): if not compare_dict_structure(dict1[key], dict2[key]):
print('inconsistent key values, dict, ', key) _LOGGER.info(
'inconsistent key values, dict, %s', key)
return False return False
elif isinstance(dict1[key], list) and isinstance(dict2[key], list): elif isinstance(dict1[key], list) and isinstance(dict2[key], list):
if not all( if not all(
isinstance(i, type(j)) isinstance(i, type(j))
for i, j in zip(dict1[key], dict2[key])): for i, j in zip(dict1[key], dict2[key])):
print('inconsistent key values, list, ', key) _LOGGER.info(
'inconsistent key values, list, %s', key)
return False return False
elif not isinstance(dict1[key], type(dict2[key])): elif not isinstance(dict1[key], type(dict2[key])):
print('inconsistent key values, type, ', key) _LOGGER.info(
'inconsistent key values, type, %s', key)
return False return False
return True return True
@ -239,7 +248,8 @@ def test_miot_lang_integrity():
compare_dict: dict = load_json_file( compare_dict: dict = load_json_file(
path.join(TRANS_RELATIVE_PATH, name)) path.join(TRANS_RELATIVE_PATH, name))
if not compare_dict_structure(default_dict, compare_dict): if not compare_dict_structure(default_dict, compare_dict):
print('compare_dict_structure failed /translations, ', name) _LOGGER.info(
'compare_dict_structure failed /translations, %s', name)
assert False assert False
# Check i18n files structure # Check i18n files structure
default_dict = load_json_file( default_dict = load_json_file(
@ -248,7 +258,8 @@ def test_miot_lang_integrity():
compare_dict: dict = load_json_file( compare_dict: dict = load_json_file(
path.join(MIOT_I18N_RELATIVE_PATH, name)) path.join(MIOT_I18N_RELATIVE_PATH, name))
if not compare_dict_structure(default_dict, compare_dict): if not compare_dict_structure(default_dict, compare_dict):
print('compare_dict_structure failed /miot/i18n, ', name) _LOGGER.info(
'compare_dict_structure failed /miot/i18n, %s', name)
assert False assert False
@ -284,10 +295,10 @@ def test_miot_data_sort():
def test_sort_spec_data(): def test_sort_spec_data():
sort_data: dict = sort_bool_trans(file_path=SPEC_BOOL_TRANS_FILE) sort_data: dict = sort_bool_trans(file_path=SPEC_BOOL_TRANS_FILE)
save_json_file(file_path=SPEC_BOOL_TRANS_FILE, data=sort_data) save_json_file(file_path=SPEC_BOOL_TRANS_FILE, data=sort_data)
print(SPEC_BOOL_TRANS_FILE, 'formatted.') _LOGGER.info('%s formatted.', SPEC_BOOL_TRANS_FILE)
sort_data = sort_multi_lang(file_path=SPEC_MULTI_LANG_FILE) sort_data = sort_multi_lang(file_path=SPEC_MULTI_LANG_FILE)
save_json_file(file_path=SPEC_MULTI_LANG_FILE, data=sort_data) save_json_file(file_path=SPEC_MULTI_LANG_FILE, data=sort_data)
print(SPEC_MULTI_LANG_FILE, 'formatted.') _LOGGER.info('%s formatted.', SPEC_MULTI_LANG_FILE)
sort_data = sort_spec_filter(file_path=SPEC_FILTER_FILE) sort_data = sort_spec_filter(file_path=SPEC_FILTER_FILE)
save_json_file(file_path=SPEC_FILTER_FILE, data=sort_data) save_json_file(file_path=SPEC_FILTER_FILE, data=sort_data)
print(SPEC_FILTER_FILE, 'formatted.') _LOGGER.info('%s formatted.', SPEC_FILTER_FILE)

View File

@ -1,16 +1,38 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Pytest fixtures.""" """Pytest fixtures."""
import logging
import random
import shutil import shutil
import pytest import pytest
from os import path, makedirs from os import path, makedirs
from uuid import uuid4
TEST_ROOT_PATH: str = path.dirname(path.abspath(__file__)) TEST_ROOT_PATH: str = path.dirname(path.abspath(__file__))
TEST_FILES_PATH: str = path.join(TEST_ROOT_PATH, 'miot') TEST_FILES_PATH: str = path.join(TEST_ROOT_PATH, 'miot')
TEST_CACHE_PATH: str = path.join(TEST_ROOT_PATH, 'test_cache') TEST_CACHE_PATH: str = path.join(TEST_ROOT_PATH, 'test_cache')
TEST_OAUTH2_REDIRECT_URL: str = 'http://homeassistant.local:8123'
TEST_LANG: str = 'zh-Hans' TEST_LANG: str = 'zh-Hans'
TEST_UID: str = '123456789' TEST_UID: str = '123456789'
TEST_CLOUD_SERVER: str = 'cn' TEST_CLOUD_SERVER: str = 'cn'
DOMAIN_OAUTH2: str = 'oauth2_info'
DOMAIN_USER_INFO: str = 'user_info'
_LOGGER = logging.getLogger(__name__)
@pytest.fixture(scope='session', autouse=True)
def set_logger():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
_LOGGER.info('set logger, %s', logger)
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def load_py_file(): def load_py_file():
@ -23,6 +45,7 @@ def load_py_file():
'miot_i18n.py', 'miot_i18n.py',
'miot_lan.py', 'miot_lan.py',
'miot_mdns.py', 'miot_mdns.py',
'miot_mips.py',
'miot_network.py', 'miot_network.py',
'miot_spec.py', 'miot_spec.py',
'miot_storage.py'] 'miot_storage.py']
@ -34,31 +57,35 @@ def load_py_file():
TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot', TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot',
file_name), file_name),
path.join(TEST_FILES_PATH, file_name)) path.join(TEST_FILES_PATH, file_name))
print('\nloaded test py files, ', file_list) _LOGGER.info('\nloaded test py files, %s', file_list)
# Copy spec files to test folder # Copy spec files to test folder
shutil.copytree( shutil.copytree(
src=path.join( src=path.join(
TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot/specs'), TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot/specs'),
dst=path.join(TEST_FILES_PATH, 'specs'), dst=path.join(TEST_FILES_PATH, 'specs'),
dirs_exist_ok=True) dirs_exist_ok=True)
print('loaded spec test folder, specs') _LOGGER.info('loaded spec test folder, specs')
# Copy lan files to test folder # Copy lan files to test folder
shutil.copytree( shutil.copytree(
src=path.join( src=path.join(
TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot/lan'), TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot/lan'),
dst=path.join(TEST_FILES_PATH, 'lan'), dst=path.join(TEST_FILES_PATH, 'lan'),
dirs_exist_ok=True) dirs_exist_ok=True)
print('loaded lan test folder, lan') _LOGGER.info('loaded lan test folder, lan')
# Copy i18n files to test folder # Copy i18n files to test folder
shutil.copytree( shutil.copytree(
src=path.join( src=path.join(
TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot/i18n'), TEST_ROOT_PATH, '../custom_components/xiaomi_home/miot/i18n'),
dst=path.join(TEST_FILES_PATH, 'i18n'), dst=path.join(TEST_FILES_PATH, 'i18n'),
dirs_exist_ok=True) dirs_exist_ok=True)
print('loaded i18n test folder, i18n') _LOGGER.info('loaded i18n test folder, i18n')
yield yield
# NOTICE: All test files and data (tokens, device information, etc.) will
# be deleted after the test is completed. For some test cases that
# require caching data, you can comment out the following code.
if path.exists(TEST_FILES_PATH): if path.exists(TEST_FILES_PATH):
shutil.rmtree(TEST_FILES_PATH) shutil.rmtree(TEST_FILES_PATH)
print('\nremoved test files, ', TEST_FILES_PATH) print('\nremoved test files, ', TEST_FILES_PATH)
@ -79,6 +106,11 @@ def test_cache_path() -> str:
return TEST_CACHE_PATH return TEST_CACHE_PATH
@pytest.fixture(scope='session')
def test_oauth2_redirect_url() -> str:
return TEST_OAUTH2_REDIRECT_URL
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def test_lang() -> str: def test_lang() -> str:
return TEST_LANG return TEST_LANG
@ -89,6 +121,33 @@ def test_uid() -> str:
return TEST_UID return TEST_UID
@pytest.fixture(scope='session')
def test_random_did() -> str:
# Gen random did
return str(random.getrandbits(64))
@pytest.fixture(scope='session')
def test_uuid() -> str:
# Gen uuid
return uuid4().hex
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def test_cloud_server() -> str: def test_cloud_server() -> str:
return TEST_CLOUD_SERVER return TEST_CLOUD_SERVER
@pytest.fixture(scope='session')
def test_domain_oauth2() -> str:
return DOMAIN_OAUTH2
@pytest.fixture(scope='session')
def test_name_uuid() -> str:
return f'{TEST_CLOUD_SERVER}_uuid'
@pytest.fixture(scope='session')
def test_domain_user_info() -> str:
return DOMAIN_USER_INFO

502
test/test_cloud.py Executable file
View File

@ -0,0 +1,502 @@
# -*- coding: utf-8 -*-
"""Unit test for miot_cloud.py."""
import asyncio
import logging
import time
import webbrowser
import pytest
# pylint: disable=import-outside-toplevel, unused-argument
_LOGGER = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_oauth_async(
test_cache_path: str,
test_cloud_server: str,
test_oauth2_redirect_url: str,
test_domain_oauth2: str,
test_uuid: str,
test_name_uuid: str
) -> dict:
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTOauthClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
local_uuid = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_name_uuid, type_=str)
uuid = str(local_uuid or test_uuid)
_LOGGER.info('uuid: %s', uuid)
miot_oauth = MIoTOauthClient(
client_id=OAUTH2_CLIENT_ID,
redirect_url=test_oauth2_redirect_url,
cloud_server=test_cloud_server,
uuid=uuid)
oauth_info = None
load_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
if (
isinstance(load_info, dict)
and 'access_token' in load_info
and 'expires_ts' in load_info
and load_info['expires_ts'] > int(time.time())
):
_LOGGER.info('load oauth info, %s', load_info)
oauth_info = load_info
if oauth_info is None:
# gen oauth url
auth_url: str = miot_oauth.gen_auth_url()
assert isinstance(auth_url, str)
_LOGGER.info('auth url: %s', auth_url)
# get code
webbrowser.open(auth_url)
code: str = input('input code: ')
assert code is not None
# get access_token
res_obj = await miot_oauth.get_access_token_async(code=code)
assert res_obj is not None
oauth_info = res_obj
_LOGGER.info('get_access_token result: %s', res_obj)
rc = await miot_storage.save_async(
test_domain_oauth2, test_cloud_server, oauth_info)
assert rc
_LOGGER.info('save oauth info')
rc = await miot_storage.save_async(
test_domain_oauth2, test_name_uuid, uuid)
assert rc
_LOGGER.info('save uuid')
access_token = oauth_info.get('access_token', None)
assert isinstance(access_token, str)
_LOGGER.info('access_token: %s', access_token)
refresh_token = oauth_info.get('refresh_token', None)
assert isinstance(refresh_token, str)
_LOGGER.info('refresh_token: %s', refresh_token)
await miot_oauth.deinit_async()
return oauth_info
@pytest.mark.asyncio
@pytest.mark.dependency(on=['test_miot_oauth_async'])
async def test_miot_oauth_refresh_token(
test_cache_path: str,
test_cloud_server: str,
test_oauth2_redirect_url: str,
test_domain_oauth2: str,
test_name_uuid: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTOauthClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
uuid = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_name_uuid, type_=str)
assert isinstance(uuid, str)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict)
assert 'access_token' in oauth_info
assert 'refresh_token' in oauth_info
assert 'expires_ts' in oauth_info
remaining_time = oauth_info['expires_ts'] - int(time.time())
_LOGGER.info('token remaining valid time: %ss', remaining_time)
# Refresh token
miot_oauth = MIoTOauthClient(
client_id=OAUTH2_CLIENT_ID,
redirect_url=test_oauth2_redirect_url,
cloud_server=test_cloud_server,
uuid=uuid)
refresh_token = oauth_info.get('refresh_token', None)
assert refresh_token
update_info = await miot_oauth.refresh_access_token_async(
refresh_token=refresh_token)
assert update_info
assert 'access_token' in update_info
assert 'refresh_token' in update_info
assert 'expires_ts' in update_info
remaining_time = update_info['expires_ts'] - int(time.time())
assert remaining_time > 0
_LOGGER.info('refresh token, remaining valid time: %ss', remaining_time)
# Save token
rc = await miot_storage.save_async(
test_domain_oauth2, test_cloud_server, update_info)
assert rc
_LOGGER.info('refresh token success, %s', update_info)
await miot_oauth.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_nickname_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Get nickname
user_info = await miot_http.get_user_info_async()
assert isinstance(user_info, dict) and 'miliaoNick' in user_info
nickname = user_info['miliaoNick']
_LOGGER.info('your nickname: %s', nickname)
await miot_http.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_uid_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
uid = await miot_http.get_uid_async()
assert isinstance(uid, str)
_LOGGER.info('your uid: %s', uid)
# Save uid
rc = await miot_storage.save_async(
domain=test_domain_user_info,
name=f'uid_{test_cloud_server}', data=uid)
assert rc
await miot_http.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_homeinfos_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Get homeinfos
homeinfos = await miot_http.get_homeinfos_async()
assert isinstance(homeinfos, dict)
assert 'uid' in homeinfos and isinstance(homeinfos['uid'], str)
assert 'home_list' in homeinfos and isinstance(
homeinfos['home_list'], dict)
assert 'share_home_list' in homeinfos and isinstance(
homeinfos['share_home_list'], dict)
# Get uid
uid = homeinfos.get('uid', '')
# Compare uid with uid in storage
uid2 = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'uid_{test_cloud_server}', type_=str)
assert uid == uid2
_LOGGER.info('your uid: %s', uid)
# Get homes
home_list = homeinfos.get('home_list', {})
_LOGGER.info('your home_list: ,%s', home_list)
# Get share homes
share_home_list = homeinfos.get('share_home_list', {})
_LOGGER.info('your share_home_list: %s', share_home_list)
await miot_http.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_devices_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Get devices
devices = await miot_http.get_devices_async()
assert isinstance(devices, dict)
assert 'uid' in devices and isinstance(devices['uid'], str)
assert 'homes' in devices and isinstance(devices['homes'], dict)
assert 'devices' in devices and isinstance(devices['devices'], dict)
# Compare uid with uid in storage
uid = devices.get('uid', '')
uid2 = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'uid_{test_cloud_server}', type_=str)
assert uid == uid2
_LOGGER.info('your uid: %s', uid)
# Get homes
homes = devices['homes']
_LOGGER.info('your homes: %s', homes)
# Get devices
devices = devices['devices']
_LOGGER.info('your devices count: %s', len(devices))
# Storage homes and devices
rc = await miot_storage.save_async(
domain=test_domain_user_info,
name=f'homes_{test_cloud_server}', data=homes)
assert rc
rc = await miot_storage.save_async(
domain=test_domain_user_info,
name=f'devices_{test_cloud_server}', data=devices)
assert rc
await miot_http.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_devices_with_dids_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Load devices
local_devices = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'devices_{test_cloud_server}', type_=dict)
assert isinstance(local_devices, dict)
did_list = list(local_devices.keys())
assert len(did_list) > 0
# Get device with dids
test_list = did_list[:6]
devices_info = await miot_http.get_devices_with_dids_async(
dids=test_list)
assert isinstance(devices_info, dict)
_LOGGER.info('test did list, %s, %s', len(test_list), test_list)
_LOGGER.info(
'test result: %s, %s', len(devices_info), list(devices_info.keys()))
await miot_http.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_prop_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Load devices
local_devices = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'devices_{test_cloud_server}', type_=dict)
assert isinstance(local_devices, dict)
did_list = list(local_devices.keys())
assert len(did_list) > 0
# Get prop
test_list = did_list[:6]
for did in test_list:
prop_value = await miot_http.get_prop_async(did=did, siid=2, piid=1)
device_name = local_devices[did]['name']
_LOGGER.info('%s(%s), prop.2.1: %s', device_name, did, prop_value)
await miot_http.deinit_async()
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_get_props_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Load devices
local_devices = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'devices_{test_cloud_server}', type_=dict)
assert isinstance(local_devices, dict)
did_list = list(local_devices.keys())
assert len(did_list) > 0
# Get props
test_list = did_list[:6]
prop_values = await miot_http.get_props_async(params=[
{'did': did, 'siid': 2, 'piid': 1} for did in test_list])
_LOGGER.info('test did list, %s, %s', len(test_list), test_list)
_LOGGER.info('test result, %s, %s', len(prop_values), prop_values)
await miot_http.deinit_async()
@pytest.mark.skip(reason='skip danger operation')
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_set_prop_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
"""
WARNING: This test case will control the actual device and is not enabled
by default. You can uncomment @pytest.mark.skip to enable it.
"""
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Load devices
local_devices = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'devices_{test_cloud_server}', type_=dict)
assert isinstance(local_devices, dict)
assert len(local_devices) > 0
# Set prop
# Find central hub gateway, control its indicator light switch
# You can replace it with the device you want to control.
test_did = ''
for did, dev in local_devices.items():
if dev['model'] == 'xiaomi.gateway.hub1':
test_did = did
break
assert test_did != '', 'no central hub gateway found'
result = await miot_http.set_prop_async(params=[{
'did': test_did, 'siid': 3, 'piid': 1, 'value': False}])
_LOGGER.info('test did, %s, prop.3.1=False -> %s', test_did, result)
await asyncio.sleep(1)
result = await miot_http.set_prop_async(params=[{
'did': test_did, 'siid': 3, 'piid': 1, 'value': True}])
_LOGGER.info('test did, %s, prop.3.1=True -> %s', test_did, result)
await miot_http.deinit_async()
@pytest.mark.skip(reason='skip danger operation')
@pytest.mark.asyncio
@pytest.mark.dependency()
async def test_miot_cloud_action_async(
test_cache_path: str,
test_cloud_server: str,
test_domain_oauth2: str,
test_domain_user_info: str
):
"""
WARNING: This test case will control the actual device and is not enabled
by default. You can uncomment @pytest.mark.skip to enable it.
"""
from miot.const import OAUTH2_CLIENT_ID
from miot.miot_cloud import MIoTHttpClient
from miot.miot_storage import MIoTStorage
miot_storage = MIoTStorage(test_cache_path)
oauth_info = await miot_storage.load_async(
domain=test_domain_oauth2, name=test_cloud_server, type_=dict)
assert isinstance(oauth_info, dict) and 'access_token' in oauth_info
miot_http = MIoTHttpClient(
cloud_server=test_cloud_server, client_id=OAUTH2_CLIENT_ID,
access_token=oauth_info['access_token'])
# Load devices
local_devices = await miot_storage.load_async(
domain=test_domain_user_info,
name=f'devices_{test_cloud_server}', type_=dict)
assert isinstance(local_devices, dict)
assert len(local_devices) > 0
# Action
# Find central hub gateway, trigger its virtual events
# You can replace it with the device you want to control.
test_did = ''
for did, dev in local_devices.items():
if dev['model'] == 'xiaomi.gateway.hub1':
test_did = did
break
assert test_did != '', 'no central hub gateway found'
result = await miot_http.action_async(
did=test_did, siid=4, aiid=1,
in_list=[{'piid': 1, 'value': 'hello world.'}])
_LOGGER.info('test did, %s, action.4.1 -> %s', test_did, result)
await miot_http.deinit_async()

View File

@ -18,7 +18,7 @@ def test_miot_matcher():
if not matcher.get(topic=f'test/+/{l2}'): if not matcher.get(topic=f'test/+/{l2}'):
matcher[f'test/+/{l2}'] = f'test/+/{l2}' matcher[f'test/+/{l2}'] = f'test/+/{l2}'
# Match # Match
match_result: list[(str, dict)] = list(matcher.iter_all_nodes()) match_result: list[str] = list(matcher.iter_all_nodes())
assert len(match_result) == 120 assert len(match_result) == 120
match_result: list[str] = list(matcher.iter_match(topic='test/1/1')) match_result: list[str] = list(matcher.iter_match(topic='test/1/1'))
assert len(match_result) == 3 assert len(match_result) == 3

View File

@ -1,11 +1,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Unit test for miot_lan.py.""" """Unit test for miot_lan.py."""
import logging
from typing import Any from typing import Any
import pytest import pytest
import asyncio import asyncio
from zeroconf import IPVersion from zeroconf import IPVersion
from zeroconf.asyncio import AsyncZeroconf from zeroconf.asyncio import AsyncZeroconf
_LOGGER = logging.getLogger(__name__)
# pylint: disable=import-outside-toplevel, unused-argument # pylint: disable=import-outside-toplevel, unused-argument
@ -67,7 +70,7 @@ async def test_lan_async(test_devices: dict):
miot_network = MIoTNetwork() miot_network = MIoTNetwork()
await miot_network.init_async() await miot_network.init_async()
print('miot_network, ', miot_network.network_info) _LOGGER.info('miot_network, %s', miot_network.network_info)
mips_service = MipsService( mips_service = MipsService(
aiozc=AsyncZeroconf(ip_version=IPVersion.V4Only)) aiozc=AsyncZeroconf(ip_version=IPVersion.V4Only))
await mips_service.init_async() await mips_service.init_async()
@ -81,7 +84,7 @@ async def test_lan_async(test_devices: dict):
await miot_lan.vote_for_lan_ctrl_async(key='test', vote=True) await miot_lan.vote_for_lan_ctrl_async(key='test', vote=True)
async def device_state_change(did: str, state: dict, ctx: Any): async def device_state_change(did: str, state: dict, ctx: Any):
print('device state change, ', did, state) _LOGGER.info('device state change, %s, %s', did, state)
if did != test_did: if did != test_did:
return return
if ( if (
@ -91,10 +94,10 @@ async def test_lan_async(test_devices: dict):
# Test sub prop # Test sub prop
miot_lan.sub_prop( miot_lan.sub_prop(
did=did, siid=3, piid=1, handler=lambda msg, ctx: did=did, siid=3, piid=1, handler=lambda msg, ctx:
print(f'sub prop.3.1 msg, {did}={msg}')) _LOGGER.info('sub prop.3.1 msg, %s=%s', did, msg))
miot_lan.sub_prop( miot_lan.sub_prop(
did=did, handler=lambda msg, ctx: did=did, handler=lambda msg, ctx:
print(f'sub all device msg, {did}={msg}')) _LOGGER.info('sub all device msg, %s=%s', did, msg))
evt_push_available.set() evt_push_available.set()
else: else:
# miot_lan.unsub_prop(did=did, siid=3, piid=1) # miot_lan.unsub_prop(did=did, siid=3, piid=1)
@ -102,7 +105,7 @@ async def test_lan_async(test_devices: dict):
evt_push_unavailable.set() evt_push_unavailable.set()
async def lan_state_change(state: bool): async def lan_state_change(state: bool):
print('lan state change, ', state) _LOGGER.info('lan state change, %s', state)
if not state: if not state:
return return
miot_lan.update_devices(devices={ miot_lan.update_devices(devices={

View File

@ -1,9 +1,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Unit test for miot_mdns.py.""" """Unit test for miot_mdns.py."""
import logging
import pytest import pytest
from zeroconf import IPVersion from zeroconf import IPVersion
from zeroconf.asyncio import AsyncZeroconf from zeroconf.asyncio import AsyncZeroconf
_LOGGER = logging.getLogger(__name__)
# pylint: disable=import-outside-toplevel, unused-argument # pylint: disable=import-outside-toplevel, unused-argument
@ -13,7 +16,7 @@ async def test_service_loop_async():
async def on_service_state_change( async def on_service_state_change(
group_id: str, state: MipsServiceState, data: MipsServiceData): group_id: str, state: MipsServiceState, data: MipsServiceData):
print( _LOGGER.info(
'on_service_state_change, %s, %s, %s', group_id, state, data) 'on_service_state_change, %s, %s, %s', group_id, state, data)
async with AsyncZeroconf(ip_version=IPVersion.V4Only) as aiozc: async with AsyncZeroconf(ip_version=IPVersion.V4Only) as aiozc:
@ -21,8 +24,9 @@ async def test_service_loop_async():
mips_service.sub_service_change('test', '*', on_service_state_change) mips_service.sub_service_change('test', '*', on_service_state_change)
await mips_service.init_async() await mips_service.init_async()
services_detail = mips_service.get_services() services_detail = mips_service.get_services()
print('get all service, ', services_detail.keys()) _LOGGER.info('get all service, %s', services_detail.keys())
for name, data in services_detail.items(): for name, data in services_detail.items():
print( _LOGGER.info(
'\tinfo, ', name, data['did'], data['addresses'], data['port']) '\tinfo, %s, %s, %s, %s',
name, data['did'], data['addresses'], data['port'])
await mips_service.deinit_async() await mips_service.deinit_async()

View File

@ -1,8 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Unit test for miot_network.py.""" """Unit test for miot_network.py."""
import logging
import pytest import pytest
import asyncio import asyncio
_LOGGER = logging.getLogger(__name__)
# pylint: disable=import-outside-toplevel, unused-argument # pylint: disable=import-outside-toplevel, unused-argument
@ -12,16 +15,16 @@ async def test_network_monitor_loop_async():
miot_net = MIoTNetwork() miot_net = MIoTNetwork()
async def on_network_status_changed(status: bool): async def on_network_status_changed(status: bool):
print(f'on_network_status_changed, {status}') _LOGGER.info('on_network_status_changed, %s', status)
miot_net.sub_network_status(key='test', handler=on_network_status_changed) miot_net.sub_network_status(key='test', handler=on_network_status_changed)
async def on_network_info_changed( async def on_network_info_changed(
status: InterfaceStatus, info: NetworkInfo): status: InterfaceStatus, info: NetworkInfo):
print(f'on_network_info_changed, {status}, {info}') _LOGGER.info('on_network_info_changed, %s, %s', status, info)
miot_net.sub_network_info(key='test', handler=on_network_info_changed) miot_net.sub_network_info(key='test', handler=on_network_info_changed)
await miot_net.init_async(3) await miot_net.init_async()
await asyncio.sleep(3) await asyncio.sleep(3)
print(f'net status: {miot_net.network_status}') _LOGGER.info('net status: %s', miot_net.network_status)
print(f'net info: {miot_net.network_info}') _LOGGER.info('net info: %s', miot_net.network_info)
await miot_net.deinit_async() await miot_net.deinit_async()

View File

@ -1,11 +1,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Unit test for miot_spec.py.""" """Unit test for miot_spec.py."""
import json import json
import logging
import random import random
import time import time
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
import pytest import pytest
_LOGGER = logging.getLogger(__name__)
# pylint: disable=import-outside-toplevel, unused-argument # pylint: disable=import-outside-toplevel, unused-argument
@ -79,10 +82,10 @@ async def test_spec_random_parse_async(test_cache_path, test_lang):
storage = MIoTStorage(test_cache_path) storage = MIoTStorage(test_cache_path)
spec_parser = MIoTSpecParser(lang=test_lang, storage=storage) spec_parser = MIoTSpecParser(lang=test_lang, storage=storage)
await spec_parser.init_async() await spec_parser.init_async()
start_ts: int = time.time()*1000 start_ts = time.time()*1000
for index in test_urn_index: for index in test_urn_index:
urn: str = test_urns[int(index)] urn: str = test_urns[int(index)]
result = await spec_parser.parse(urn=urn, skip_cache=True) result = await spec_parser.parse(urn=urn, skip_cache=True)
assert result is not None assert result is not None
end_ts: int = time.time()*1000 end_ts = time.time()*1000
print(f'takes time, {test_count}, {end_ts-start_ts}') _LOGGER.info('takes time, %s, %s', test_count, end_ts-start_ts)

View File

@ -1,9 +1,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Unit test for miot_storage.py.""" """Unit test for miot_storage.py."""
import asyncio import asyncio
import logging
from os import path from os import path
import pytest import pytest
_LOGGER = logging.getLogger(__name__)
# pylint: disable=import-outside-toplevel, unused-argument # pylint: disable=import-outside-toplevel, unused-argument
@ -101,7 +104,7 @@ async def test_multi_task_load_async(test_cache_path):
for _ in range(task_count): for _ in range(task_count):
task_list.append(asyncio.create_task(storage.load_async( task_list.append(asyncio.create_task(storage.load_async(
domain=test_domain, name=name, type_=dict))) domain=test_domain, name=name, type_=dict)))
print(f'\ntask count, {len(task_list)}') _LOGGER.info('task count, %s', len(task_list))
result: list = await asyncio.gather(*task_list) result: list = await asyncio.gather(*task_list)
assert None not in result assert None not in result
@ -178,28 +181,28 @@ async def test_user_config_async(
config=config_update, replace=True) config=config_update, replace=True)
assert (config_replace := await storage.load_user_config_async( assert (config_replace := await storage.load_user_config_async(
uid=test_uid, cloud_server=test_cloud_server)) == config_update uid=test_uid, cloud_server=test_cloud_server)) == config_update
print('replace result, ', config_replace) _LOGGER.info('replace result, %s', config_replace)
# Test query # Test query
query_keys = list(config_base.keys()) query_keys = list(config_base.keys())
print('query keys, ', query_keys) _LOGGER.info('query keys, %s', query_keys)
query_result = await storage.load_user_config_async( query_result = await storage.load_user_config_async(
uid=test_uid, cloud_server=test_cloud_server, keys=query_keys) uid=test_uid, cloud_server=test_cloud_server, keys=query_keys)
print('query result 1, ', query_result) _LOGGER.info('query result 1, %s', query_result)
assert await storage.update_user_config_async( assert await storage.update_user_config_async(
uid=test_uid, cloud_server=test_cloud_server, uid=test_uid, cloud_server=test_cloud_server,
config=config_base, replace=True) config=config_base, replace=True)
query_result = await storage.load_user_config_async( query_result = await storage.load_user_config_async(
uid=test_uid, cloud_server=test_cloud_server, keys=query_keys) uid=test_uid, cloud_server=test_cloud_server, keys=query_keys)
print('query result 2, ', query_result) _LOGGER.info('query result 2, %s', query_result)
query_result = await storage.load_user_config_async( query_result = await storage.load_user_config_async(
uid=test_uid, cloud_server=test_cloud_server) uid=test_uid, cloud_server=test_cloud_server)
print('query result all, ', query_result) _LOGGER.info('query result all, %s', query_result)
# Remove config # Remove config
assert await storage.update_user_config_async( assert await storage.update_user_config_async(
uid=test_uid, cloud_server=test_cloud_server, config=None) uid=test_uid, cloud_server=test_cloud_server, config=None)
query_result = await storage.load_user_config_async( query_result = await storage.load_user_config_async(
uid=test_uid, cloud_server=test_cloud_server) uid=test_uid, cloud_server=test_cloud_server)
print('remove result, ', query_result) _LOGGER.info('remove result, %s', query_result)
# Remove domain # Remove domain
assert await storage.remove_domain_async(domain='miot_config') assert await storage.remove_domain_async(domain='miot_config')