mirror of
https://github.com/XiaoMi/ha_xiaomi_home.git
synced 2025-06-21 14:20:17 +08:00
refactor: refactor miot mips & fix type errors (#365)
* remove use of tev & fix type errors * lint fix * make private classes private * simplify inheritance * fix thread naming * fix the deleted public data class * remove tev * fix access violation * style: format code * style: param init * fix: fix event async set * fix: fix mips re-connect error --------- Co-authored-by: topsworld <sworldtop@gmail.com>
This commit is contained in:
@ -20,7 +20,6 @@ def load_py_file():
|
||||
'const.py',
|
||||
'miot_cloud.py',
|
||||
'miot_error.py',
|
||||
'miot_ev.py',
|
||||
'miot_i18n.py',
|
||||
'miot_lan.py',
|
||||
'miot_mdns.py',
|
||||
|
@ -1,55 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Unit test for miot_ev.py."""
|
||||
import os
|
||||
import pytest
|
||||
|
||||
# pylint: disable=import-outside-toplevel, disable=unused-argument
|
||||
|
||||
|
||||
@pytest.mark.github
|
||||
def test_mev_timer_and_fd():
|
||||
from miot.miot_ev import MIoTEventLoop, TimeoutHandle
|
||||
|
||||
mev = MIoTEventLoop()
|
||||
assert mev
|
||||
event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK)
|
||||
assert event_fd
|
||||
timer4: TimeoutHandle = None
|
||||
|
||||
def event_handler(event_fd):
|
||||
value: int = os.eventfd_read(event_fd)
|
||||
if value == 1:
|
||||
mev.clear_timeout(timer4)
|
||||
print('cancel timer4')
|
||||
elif value == 2:
|
||||
print('event write twice in a row')
|
||||
elif value == 3:
|
||||
mev.set_read_handler(event_fd, None, None)
|
||||
os.close(event_fd)
|
||||
event_fd = None
|
||||
print('close event fd')
|
||||
|
||||
def timer1_handler(event_fd):
|
||||
os.eventfd_write(event_fd, 1)
|
||||
|
||||
def timer2_handler(event_fd):
|
||||
os.eventfd_write(event_fd, 1)
|
||||
os.eventfd_write(event_fd, 1)
|
||||
|
||||
def timer3_handler(event_fd):
|
||||
os.eventfd_write(event_fd, 3)
|
||||
|
||||
def timer4_handler(event_fd):
|
||||
raise ValueError('unreachable code')
|
||||
|
||||
mev.set_read_handler(
|
||||
event_fd, event_handler, event_fd)
|
||||
|
||||
mev.set_timeout(500, timer1_handler, event_fd)
|
||||
mev.set_timeout(1000, timer2_handler, event_fd)
|
||||
mev.set_timeout(1500, timer3_handler, event_fd)
|
||||
timer4 = mev.set_timeout(2000, timer4_handler, event_fd)
|
||||
|
||||
mev.loop_forever()
|
||||
# Loop will exit when there are no timers or fd handlers.
|
||||
mev.loop_stop()
|
Reference in New Issue
Block a user