mirror of
https://github.com/XiaoMi/ha_xiaomi_home.git
synced 2025-04-02 07:45:31 +08:00
test: add test case for miot storage
This commit is contained in:
parent
419d76e1ad
commit
2fe47aca61
2
.github/workflows/test.yaml
vendored
2
.github/workflows/test.yaml
vendored
@ -19,7 +19,7 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install pytest pytest-asyncio
|
||||
pip install pytest pytest-asyncio pytest-dependency
|
||||
|
||||
- name: Check rule format with pytest
|
||||
run: |
|
||||
|
@ -5,23 +5,9 @@ import pytest
|
||||
from os import path, makedirs
|
||||
|
||||
TEST_ROOT_PATH: str = path.dirname(path.abspath(__file__))
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_lang() -> str:
|
||||
return 'zh-Hans'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_root_path() -> str:
|
||||
return TEST_ROOT_PATH
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_cache_path() -> str:
|
||||
cache_path: str = path.join(TEST_ROOT_PATH, 'test_cache')
|
||||
makedirs(cache_path, exist_ok=True)
|
||||
return cache_path
|
||||
TEST_LANG: str = 'zh-Hans'
|
||||
TEST_UID: str = '123456789'
|
||||
TEST_CLOUD_SERVER: str = 'cn'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session', autouse=True)
|
||||
@ -68,3 +54,30 @@ def load_py_file():
|
||||
# print('\nremoved test folder, %s', 'miot/specs')
|
||||
# shutil.rmtree(path.join(TEST_ROOT_PATH, 'miot/i18n'))
|
||||
# print('\nremoved test folder, %s', 'miot/i18n')
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_root_path() -> str:
|
||||
return TEST_ROOT_PATH
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_cache_path() -> str:
|
||||
cache_path: str = path.join(TEST_ROOT_PATH, 'test_cache')
|
||||
makedirs(cache_path, exist_ok=True)
|
||||
return cache_path
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_lang() -> str:
|
||||
return TEST_LANG
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_uid() -> str:
|
||||
return TEST_UID
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_cloud_server() -> str:
|
||||
return TEST_CLOUD_SERVER
|
||||
|
222
test/test_storage.py
Executable file
222
test/test_storage.py
Executable file
@ -0,0 +1,222 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Unit test for miot_storage.py."""
|
||||
import asyncio
|
||||
from os import path
|
||||
import pytest
|
||||
|
||||
# pylint: disable=import-outside-toplevel, unused-argument
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.github
|
||||
@pytest.mark.dependency()
|
||||
async def test_variable_async(test_cache_path):
|
||||
from miot.miot_storage import MIoTStorage
|
||||
|
||||
storage = MIoTStorage(test_cache_path)
|
||||
test_domain = 'variable'
|
||||
test_count = 50
|
||||
|
||||
for index in range(test_count):
|
||||
# bytes
|
||||
var_name = f'bytes_var{index}'
|
||||
write_value: bytes = b'bytes value->{index}\n\n\n'
|
||||
assert await storage.save_async(test_domain, var_name, write_value)
|
||||
read_value: bytes = await storage.load_async(
|
||||
test_domain, var_name, type_=bytes)
|
||||
assert read_value == write_value
|
||||
if index > test_count/2:
|
||||
assert await storage.remove_async(
|
||||
test_domain, var_name, type_=bytes)
|
||||
# str
|
||||
var_name = f'str_var{index}'
|
||||
write_value: str = f'str value->{index}\n\n\n'
|
||||
assert await storage.save_async(test_domain, var_name, write_value)
|
||||
read_value: str = await storage.load_async(
|
||||
test_domain, var_name, type_=str)
|
||||
assert read_value == write_value
|
||||
if index >= test_count/2:
|
||||
assert await storage.remove_async(test_domain, var_name, type_=str)
|
||||
# list
|
||||
var_name = f'list_var{index}'
|
||||
write_value: list = [1, 2, 3, 4, 5, 'test_list', index]
|
||||
assert await storage.save_async(test_domain, var_name, write_value)
|
||||
read_value: list = await storage.load_async(
|
||||
test_domain, var_name, type_=list)
|
||||
assert read_value == write_value
|
||||
if index >= test_count/2:
|
||||
assert await storage.remove_async(test_domain, var_name, type_=list)
|
||||
# dict
|
||||
var_name = f'dict_var{index}'
|
||||
write_value: dict = {'k1': 'v1', 'k2': 'v2', 'index': f'index-{index}'}
|
||||
assert await storage.save_async(test_domain, var_name, write_value)
|
||||
read_value: dict = await storage.load_async(
|
||||
test_domain, var_name, type_=dict)
|
||||
assert read_value == write_value
|
||||
if index >= test_count/2:
|
||||
assert await storage.remove_async(test_domain, var_name, type_=dict)
|
||||
|
||||
# Delete all bytes
|
||||
names: list[str] = storage.get_names(domain=test_domain, type_=bytes)
|
||||
for name in names:
|
||||
assert await storage.remove_async(
|
||||
domain=test_domain, name=name, type_=bytes)
|
||||
assert len(storage.get_names(domain=test_domain, type_=bytes)) == 0
|
||||
assert len(storage.get_names(
|
||||
domain=test_domain, type_=str)) == test_count/2
|
||||
assert len(storage.get_names(
|
||||
domain=test_domain, type_=list)) == test_count/2
|
||||
assert len(storage.get_names(
|
||||
domain=test_domain, type_=dict)) == test_count/2
|
||||
|
||||
# Delete domain path
|
||||
# assert await storage.remove_domain_async(test_domain)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.github
|
||||
@pytest.mark.dependency(depends=['test_variable_async'])
|
||||
async def test_load_domain_async(test_cache_path):
|
||||
from miot.miot_storage import MIoTStorage
|
||||
|
||||
storage = MIoTStorage(test_cache_path)
|
||||
test_domain = 'variable'
|
||||
names: list[str] = storage.get_names(domain=test_domain, type_=dict)
|
||||
assert len(names) > 0
|
||||
for name in names:
|
||||
r_data = await storage.load_async(test_domain, name=name, type_=dict)
|
||||
assert r_data
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.github
|
||||
@pytest.mark.dependency(depends=['test_variable_async'])
|
||||
async def test_multi_task_load_async(test_cache_path):
|
||||
from miot.miot_storage import MIoTStorage
|
||||
|
||||
storage = MIoTStorage(test_cache_path)
|
||||
test_domain = 'variable'
|
||||
task_count = 50
|
||||
|
||||
names: list[str] = storage.get_names(domain=test_domain, type_=dict)
|
||||
task_list: list = []
|
||||
for name in names:
|
||||
for _ in range(task_count):
|
||||
task_list.append(asyncio.create_task(storage.load_async(
|
||||
domain=test_domain, name=name, type_=dict)))
|
||||
print(f'\ntask count, {len(task_list)}')
|
||||
result: list = await asyncio.gather(*task_list)
|
||||
assert None not in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.github
|
||||
@pytest.mark.dependency()
|
||||
async def test_file_save_load_async(test_cache_path):
|
||||
from miot.miot_storage import MIoTStorage
|
||||
|
||||
storage = MIoTStorage(test_cache_path)
|
||||
test_count = 50
|
||||
test_domain = 'file'
|
||||
for index in range(test_count):
|
||||
file_name = f'test-{index}.txt'
|
||||
file_content = f'this is a test file, the index={index}\r\r\r'.encode(
|
||||
'utf-8')
|
||||
assert await storage.save_file_async(
|
||||
test_domain, file_name, file_content)
|
||||
read_content = await storage.load_file_async(test_domain, file_name)
|
||||
assert file_content == read_content
|
||||
# Read the contents of the file directly
|
||||
with open(
|
||||
path.join(test_cache_path, test_domain, file_name), 'rb'
|
||||
) as r_file:
|
||||
data = r_file.read()
|
||||
assert data == file_content
|
||||
if index > test_count/2:
|
||||
assert await storage.remove_file_async(
|
||||
domain=test_domain, name_with_suffix=file_name)
|
||||
# Delete domain path
|
||||
assert await storage.remove_domain_async(test_domain)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.github
|
||||
@pytest.mark.dependency()
|
||||
async def test_user_config_async(
|
||||
test_cache_path, test_uid, test_cloud_server):
|
||||
from miot.miot_storage import MIoTStorage
|
||||
|
||||
storage = MIoTStorage(test_cache_path)
|
||||
config_base = {
|
||||
'str': 'test string',
|
||||
'list': ['test', 'list'],
|
||||
'dict': {
|
||||
'test': 'dict',
|
||||
'key1': 'value1'
|
||||
},
|
||||
'bool': False,
|
||||
'number_int': 123456,
|
||||
'number_float': 123.456
|
||||
}
|
||||
config = config_base.copy()
|
||||
assert await storage.update_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server, config=config)
|
||||
# test load all
|
||||
assert (await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server)) == config
|
||||
# test update
|
||||
config_update = {
|
||||
'test_str': 'test str',
|
||||
'number_float': 456.123
|
||||
}
|
||||
assert await storage.update_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server, config=config_update)
|
||||
config.update(config_update)
|
||||
assert (await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server)) == config
|
||||
# test replace
|
||||
config_replace = None
|
||||
assert await storage.update_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server,
|
||||
config=config_update, replace=True)
|
||||
assert (config_replace := await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server)) == config_update
|
||||
print('\nreplace result, ', config_replace)
|
||||
# test query
|
||||
query_keys = list(config_base.keys())
|
||||
print('\nquery keys, %s', query_keys)
|
||||
query_result = await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server, keys=query_keys)
|
||||
print('\nquery result 1, ', query_result)
|
||||
assert await storage.update_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server,
|
||||
config=config_base, replace=True)
|
||||
query_result = await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server, keys=query_keys)
|
||||
print('\nquery result 2, ', query_result)
|
||||
query_result = await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server)
|
||||
print('\nquery result all, ', query_result)
|
||||
# remove config
|
||||
assert await storage.update_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server, config=None)
|
||||
query_result = await storage.load_user_config_async(
|
||||
uid=test_uid, cloud_server=test_cloud_server)
|
||||
print('\nremove result, ', query_result)
|
||||
# remove domain
|
||||
assert await storage.remove_domain_async(domain='miot_config')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.github
|
||||
@pytest.mark.dependency(depends=[
|
||||
'test_variable_async',
|
||||
'test_load_domain_async',
|
||||
'test_multi_task_load_async',
|
||||
'test_file_save_load_async',
|
||||
'test_user_config_async'])
|
||||
async def test_clear_async(test_cache_path):
|
||||
from miot.miot_storage import MIoTStorage
|
||||
|
||||
storage = MIoTStorage(test_cache_path)
|
||||
assert await storage.clear_async()
|
Loading…
x
Reference in New Issue
Block a user