From 1bb48093ca8ce5708412cf3fb7daa4984f51a618 Mon Sep 17 00:00:00 2001 From: lzybetter Date: Thu, 2 May 2024 00:01:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E7=9A=84=E5=BC=82=E6=AD=A5=E8=B0=83=E7=94=A8=E5=92=8C?= =?UTF-8?q?=E5=90=91telegram=20bot=E5=8F=91=E9=80=81=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 18 ++++++-- util/PluginManager.py | 62 ++++++++++++++++++++++--- util/config.py | 13 ++++-- util/scheduler.py | 105 ------------------------------------------ 4 files changed, 80 insertions(+), 118 deletions(-) delete mode 100644 util/scheduler.py diff --git a/main.py b/main.py index 37b32a4..59e5e28 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,20 @@ +from util.config import Config from util.PluginManager import PluginManager - +from flask import Flask if __name__ == "__main__": - manager = PluginManager() + + ## 初始化 + ## flask初始化 + app = Flask(__name__) + ## 配比初始化 + config = Config() + ## 插件管理器 + manager = PluginManager(config) # Load and execute plugins manager.load_plugins() - plugin_names = manager.get_plugin_name_list() - print(plugin_names) \ No newline at end of file + + manager.start_scheduler() + print("初始化完成") + app.run() diff --git a/util/PluginManager.py b/util/PluginManager.py index e9dcfd1..7dfeefc 100644 --- a/util/PluginManager.py +++ b/util/PluginManager.py @@ -1,15 +1,23 @@ +import asyncio import importlib.util import os +import threading from util.base_plugin import BasePlugin +import telegram class PluginManager: PLUGIN_DIR = "custom_plugins" - def __init__(self): + def __init__(self, config): self.plugins = {} self.plugin_dir = PluginManager.PLUGIN_DIR + self.running_tasks = [] + # 插件所在目录 + self.plugins_path = {} + # self.loop = asyncio.new_event_loop() + self.config = config def load_plugins(self): for root, dirs, files in os.walk(self.plugin_dir): @@ -17,10 +25,9 @@ class PluginManager: if file.endswith(".py"): plugin_name = os.path.splitext(file)[0] plugin_path = os.path.join(root, file) - print(plugin_name) - self.load_plugin(plugin_name, plugin_path) + self.load_plugin(plugin_name, plugin_path, root) - def load_plugin(self, plugin_name, plugin_path): + def load_plugin(self, plugin_name, plugin_path, root): spec = importlib.util.spec_from_file_location(plugin_name, plugin_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) @@ -31,6 +38,7 @@ class PluginManager: if callable(member) and hasattr(member, "__bases__") and BasePlugin in member.__bases__: print(member) self.plugins[plugin_name] = member() + self.plugins_path[plugin_name] = root print(f"Plugin '{plugin_name}' loaded successfully.") def execute_plugins(self, *args, **kwargs): @@ -41,18 +49,60 @@ class PluginManager: def execute_plugin(self, plugin_name, *args, **kwargs): if plugin_name in self.plugins: print(f"Executing plugin '{plugin_name}':") - self.plugins[plugin_name].run_plugin(self.callback, *args, **kwargs) + return self.plugins[plugin_name].run_plugin(self.callback, *args, **kwargs) + else: print(f"Plugin '{plugin_name}' not found.") + return f"Plugin '{plugin_name}' not found." def get_plugin_name_list(self): return [plugin[0] for plugin in self.plugins.items()] def callback(self, result): + print(f"Plugin '{result['plugin_name']}' returned:") print(f"Success: {result['success']}") if result['success']: print(f"Result: {result['result']}") else: - print(f"Error: {result['error']}") \ No newline at end of file + print(f"Error: {result['error']}") + + def start_scheduler(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + def scheduler(): + asyncio.set_event_loop(self.loop) + for name in self.get_plugin_name_list(): + try: + schedule_time = int( + self.config.get_plugin_config(config_name='SCHEDULER_TIME', config_path=self.plugins_path[name])) + except: + schedule_time = 10 * 60 # 默认十分钟运行一次 + self.loop.create_task(self.execute_plugin_async(name, schedule_time)) + + self.loop.run_forever() + + thread = threading.Thread(target=scheduler) + thread.daemon = True + thread.start() + + + async def execute_plugin_async(self, plugin_name, interval, *args, **kwargs): + + while True: + if plugin_name in self.plugins: + print(f"Executing plugin '{plugin_name}':") + result = self.execute_plugin(plugin_name, *args, **kwargs) + token = self.config.get_config('TELEGRAM')['BOT_TOKEN'] + chat_id = self.config.get_config('TELEGRAM')['CHAT_ID'] + await self.send_message_async(token, chat_id, result) + else: + print(f"Plugin '{plugin_name}' not found.") + print(interval) + await asyncio.sleep(interval) + + async def send_message_async(self, bot_token, chat_id, text): + proxy = telegram.request.HTTPXRequest(proxy_url='http://127.0.0.1:8889') + bot = telegram.Bot(token=bot_token, request=proxy) + await bot.send_message(chat_id=chat_id, text=text) diff --git a/util/config.py b/util/config.py index 431faad..6137676 100644 --- a/util/config.py +++ b/util/config.py @@ -31,7 +31,7 @@ class Config: def get_scheduler_db_file_path(self): try: - return os.path.join(self.__CONFIG_PATH, self.__CONFIG_DICT['SCHEDULER_DB_PATH'], + return os.path.join(self.__BASE_PATH, self.__CONFIG_DICT['SCHEDULER_DB_PATH'], self.__SCHEDULER_DB_FILE_NAME) except: return os.path.join(self.__CONFIG_PATH, 'schedule_db', self.__SCHEDULER_DB_FILE_NAME) @@ -69,7 +69,14 @@ class Config: except: return default - def set_config(self, config_name, value): + def set_config(self, config_name, value, config_path = ''): self.__CONFIG_DICT[config_name] = value with open(os.path.join(self.__CONFIG_PATH, self.__CONFIG_NAME), 'w') as f: - f.write(yaml.safe_dump(self.__CONFIG_DICT, sort_keys=False)) \ No newline at end of file + f.write(yaml.safe_dump(self.__CONFIG_DICT, sort_keys=False)) + + def get_plugin_config(self, config_name, config_path): + print(config_path) + if config_path != '': + with open(os.path.join(config_path, self.__CONFIG_NAME)) as f: + config_d = yaml.safe_load(f) + return config_d[config_name] \ No newline at end of file diff --git a/util/scheduler.py b/util/scheduler.py deleted file mode 100644 index dc5024f..0000000 --- a/util/scheduler.py +++ /dev/null @@ -1,105 +0,0 @@ -import sqlite3 - -from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore -from apscheduler.util import undefined -import logging -import uuid -from flask_apscheduler import APScheduler - -class Scheduler_DB_Config(object): - # SCHEDULER_API_ENABLED = True - SCHEDULER_JOBSTORES = { - 'default': SQLAlchemyJobStore('sqlite:///config/schedule_db/schedule_db.db') - } - -class Scheduler(): - - __logger = None - __scheduler = None - def __init__(self, app, config): - - self.__logger = logging.getLogger("scheduler_logger") - self.__logger.setLevel(logging.INFO) - fh = logging.FileHandler(config.get_log_file_path()) - fh.setLevel(logging.INFO) - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - fh.setFormatter(formatter) - self.__logger.addHandler(fh) - self.__scheduler = APScheduler() - scheduler_db_path = config.get_scheduler_db_file_path() - scheduler_db_config = Scheduler_DB_Config() - scheduler_db_config.SCHEDULER_JOBSTORES['default'] = SQLAlchemyJobStore('sqlite:///%s'%scheduler_db_path) - app.config.from_object(scheduler_db_config) - self.__scheduler.init_app(app) - self.__scheduler.start() - - def add_interval_schedule(self, func, **kwargs): - - params = { - 'args': None, - 'kwargs': None, - 'name': None, - 'misfire_grace_time': undefined, - 'coalesce': undefined, - 'max_instances': undefined, - 'next_run_time': undefined, - 'jobstore': 'default', - 'executor': 'default', - 'trigger': 'interval', - 'replace_existing':False} - for k, v in kwargs.items(): - if k not in ('trigger','args', 'kwargs', 'id', 'name', 'misfire_grace_time', 'coalesce' - 'max_instances', 'next_run_time', 'jobstore', 'executor', 'replace_existing', - 'trigger', 'days', 'weeks', 'hours', 'minutes', 'seconds', 'start_date', 'end_date'): - self.__logger.error('%s is not supported'%k) - else: - params[k] = v - - if params['name']: - self.__logger.info('定时器\'%s\'的参数为%s' % (params['name'], params)) - else: - self.__logger.info('定时器\'%s\'的参数为%s'%(func.__name__, params)) - - id = str(uuid.uuid1()) - try: - self.__scheduler.add_job(id=id, func=func, **params) - return "定时器添加成功" - except Exception as e: - self.__logger.error("定时器添加失败,错误信息:%s"%e) - return "定时器添加失败,详细信息请见log文件" - - def del_scheduler(self, id): - if self.query_scheduler(id): - try: - self.__scheduler.remove_job(id) - return "已删除:%s"%id - except Exception as e: - self.__logger.error("定时器删除失败,错误信息:%s" % e) - return "定时器删除失败,详细信息请见log文件" - else: - return "定时器删除失败,无该id的job: %s"%id - logger.info("无该id的job: %s"%id) - - def query_scheduler(self, id=None): - db_path = 'config/schedule_db/jobstores.db' - conn = sqlite3.connect(db_path) - cur = conn.cursor() - cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='apscheduler_jobs';") - is_table_exists = cur.fetchall() - result = [] - if is_table_exists and id: - cur.execute("select * from apscheduler_jobs where id='%s'"%id) - result = cur.fetchall() - else: - cur.execute("select * from apscheduler_jobs") - result = cur.fetchall() - if result: - return True - else: - return False - - def get_scheduler_list(self): - job_infos = [] - for j in self.__scheduler.get_jobs(): - job_infos.append({'id': j.id, 'name': j.name}) - return job_infos \ No newline at end of file