# -*- coding:utf-8 -*- """ cron: 15 2 * * * new Env('重复任务优化'); """ import json import logging import os import sys,re import time import traceback import requests logger = logging.getLogger(name=None) # 创建一个日志对象 logging.Formatter("%(message)s") # 日志内容格式化 logger.setLevel(logging.INFO) # 设置日志等级 logger.addHandler(logging.StreamHandler()) # 添加控制台日志 # logger.addHandler(logging.FileHandler(filename="text.log", mode="w")) # 添加文件日志 ipport = os.getenv("IPPORT") if not ipport: logger.info( "如果报错请在环境变量中添加你的真实 IP:端口\n名称:IPPORT\t值:\n或在 config.sh 中添加 export IPPORT=''" ) ipport = "localhost:5700" else: ipport = ipport.lstrip("http://").rstrip("/") sub_str = os.getenv("RES_SUB", "shufflewzc_faker3") sub_list = sub_str.split("&") res_only = os.getenv("RES_ONLY", True) headers = { "Accept": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36", } def load_send() -> None: logger.info("加载推送功能中...") global send send = None cur_path = os.path.abspath(os.path.dirname(__file__)) sys.path.append(cur_path) if os.path.exists(cur_path + "/notify.py"): try: from notify import send except Exception: send = None logger.info(f"❌加载通知服务失败!!!\n{traceback.format_exc()}") def get_tasklist() -> list: tasklist = [] t = round(time.time() * 1000) url = f"http://{ipport}/api/crons?searchValue=&t={t}" response = requests.get(url=url, headers=headers) datas = json.loads(response.content.decode("utf-8")) if datas.get("code") == 200: try: tasklist = datas.get("data").get("data") except Exception: tasklist = datas.get("data") return tasklist def filter_res_sub(tasklist: list) -> tuple: filter_list = [] res_list = [] for task in tasklist: for sub in sub_list: if task.get("command").find(sub) == -1: flag = False else: flag = True break if flag: res_list.append(task) else: filter_list.append(task) return filter_list, res_list def get_index(lst: list, item: str) -> list: return [index for (index, value) in enumerate(lst) if value == item] def get_duplicate_list(tasklist: list) -> tuple: logger.info("\n=== 第一轮初筛开始 ===") ids = [] names = [] cmds = [] for task in tasklist: ids.append(task.get("_id",task.get("id"))) names.append(task.get("name")) cmds.append(task.get("command")) name_list = [] for i, name in enumerate(names): if name not in name_list: name_list.append(name) tem_tasks = [] tem_ids = [] dup_ids = [] for name2 in name_list: name_index = get_index(names, name2) for i in range(len(name_index)): if i == 0: logger.info(f"【✅保留】{cmds[name_index[0]]}") tem_tasks.append(tasklist[name_index[0]]) tem_ids.append(ids[name_index[0]]) else: logger.info(f"【🚫禁用】{cmds[name_index[i]]}") dup_ids.append(ids[name_index[i]]) logger.info("") logger.info("=== 第一轮初筛结束 ===") return tem_ids, tem_tasks, dup_ids def reserve_task_only( tem_ids: list, tem_tasks: list, dup_ids: list, res_list: list ) -> list: if len(tem_ids) == 0: return tem_ids logger.info("\n=== 最终筛选开始 ===") task3 = None for task1 in tem_tasks: for task2 in res_list: if task1.get("name") == task2.get("name"): dup_ids.append(task1.get("_id",task1.get("id"))) logger.info(f"【✅保留】{task2.get('command')}") task3 = task1 if task3: logger.info(f"【🚫禁用】{task3.get('command')}\n") task3 = None logger.info("=== 最终筛选结束 ===") return dup_ids def disable_duplicate_tasks(ids: list) -> None: t = round(time.time() * 1000) url = f"http://{ipport}/api/crons/disable?t={t}" data = json.dumps(ids) headers["Content-Type"] = "application/json;charset=UTF-8" response = requests.put(url=url, headers=headers, data=data) datas = json.loads(response.content.decode("utf-8")) if datas.get("code") != 200: logger.info(f"❌出错!!!错误信息为:{datas}") else: logger.info("🎉成功禁用重复任务~") def get_token() -> str or None: if os.path.exists("/ql/data/db/keyv.sqlite"): path="/ql/data/db/keyv.sqlite" elif os.path.exists("/ql/config/auth.json"): path="/ql/config/auth.json" elif os.path.exists("/ql/data/config/auth.json"): path="/ql/data/config/auth.json" try: if 'keyv' in path: with open(path, "r", encoding="latin1") as file: auth = file.read() matches = re.search(r'token":"([^"]+)"', auth) token = matches.group(1) else: with open(path, "r") as file: auth = file.read() auth = json.loads(auth) token = auth["token"] except Exception: logger.info(f"❌无法获取 token!!!\n{traceback.format_exc()}") send("禁用重复任务失败", "无法获取 token!!!") exit(1) return token if __name__ == "__main__": logger.info("===> 禁用重复任务开始 <===") load_send() token = get_token() headers["Authorization"] = f"Bearer {token}" # 获取过滤后的任务列表 sub_str = "\n".join(sub_list) logger.info(f"\n=== 你选择过滤的任务前缀为 ===\n{sub_str}") tasklist = get_tasklist() if len(tasklist) == 0: logger.info("❌无法获取 tasklist!!!") exit(1) filter_list, res_list = filter_res_sub(tasklist) tem_ids, tem_tasks, dup_ids = get_duplicate_list(filter_list) # 是否在重复任务中只保留设置的前缀 if res_only: ids = reserve_task_only(tem_ids, tem_tasks, dup_ids, res_list) else: ids = dup_ids logger.info("你选择保留除了设置的前缀以外的其他任务") sum = f"所有任务数量为:{len(tasklist)}" filter = f"过滤的任务数量为:{len(res_list)}" disable = f"禁用的任务数量为:{len(ids)}" logging.info("\n=== 禁用数量统计 ===\n" + sum + "\n" + filter + "\n" + disable) if len(ids) == 0: logger.info("😁没有重复任务~") else: disable_duplicate_tasks(ids) #if send: #send("💖禁用重复任务成功", f"\n{sum}\n{filter}\n{disable}")