Files
myAssistant/util/scheduler.py
2023-09-09 10:39:05 +08:00

105 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.util import undefined
import logging
import uuid
from flask_apscheduler import APScheduler
class Scheduler_DB_Config(object):
# SCHEDULER_API_ENABLED = True
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore('sqlite:///config/schedule_db/schedule_db.db')
}
class Scheduler():
__logger = None
__scheduler = None
def __init__(self, app, config):
self.__logger = logging.getLogger("scheduler_logger")
self.__logger.setLevel(logging.INFO)
fh = logging.FileHandler(config.get_log_file_path())
fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.__logger.addHandler(fh)
self.__scheduler = APScheduler()
scheduler_db_path = config.get_scheduler_db_file_path()
scheduler_db_config = Scheduler_DB_Config()
scheduler_db_config.SCHEDULER_JOBSTORES['default'] = SQLAlchemyJobStore('sqlite:///%s'%scheduler_db_path)
app.config.from_object(scheduler_db_config)
self.__scheduler.init_app(app)
self.__scheduler.start()
def add_interval_schedule(self, func, **kwargs):
params = {
'args': None,
'kwargs': None,
'name': None,
'misfire_grace_time': undefined,
'coalesce': undefined,
'max_instances': undefined,
'next_run_time': undefined,
'jobstore': 'default',
'executor': 'default',
'trigger': 'interval',
'replace_existing':False}
for k, v in kwargs.items():
if k not in ('trigger','args', 'kwargs', 'id', 'name', 'misfire_grace_time', 'coalesce'
'max_instances', 'next_run_time', 'jobstore', 'executor', 'replace_existing',
'trigger', 'days', 'weeks', 'hours', 'minutes', 'seconds', 'start_date', 'end_date'):
self.__logger.error('%s is not supported'%k)
else:
params[k] = v
if params['name']:
self.__logger.info('定时器\'%s\'的参数为%s' % (params['name'], params))
else:
self.__logger.info('定时器\'%s\'的参数为%s'%(func.__name__, params))
id = str(uuid.uuid1())
try:
self.__scheduler.add_job(id=id, func=func, **params)
return "定时器添加成功"
except Exception as e:
self.__logger.error("定时器添加失败,错误信息:%s"%e)
return "定时器添加失败详细信息请见log文件"
def del_scheduler(self, id):
if self.query_scheduler(id):
try:
self.__scheduler.remove_job(id)
return "已删除:%s"%id
except Exception as e:
self.__logger.error("定时器删除失败,错误信息:%s" % e)
return "定时器删除失败详细信息请见log文件"
else:
return "定时器删除失败无该id的job: %s"%id
logger.info("无该id的job: %s"%id)
def query_scheduler(self, id=None):
db_path = 'config/schedule_db/jobstores.db'
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='apscheduler_jobs';")
is_table_exists = cur.fetchall()
result = []
if is_table_exists and id:
cur.execute("select * from apscheduler_jobs where id='%s'"%id)
result = cur.fetchall()
else:
cur.execute("select * from apscheduler_jobs")
result = cur.fetchall()
if result:
return True
else:
return False
def get_scheduler_list(self):
job_infos = []
for j in self.__scheduler.get_jobs():
job_infos.append({'id': j.id, 'name': j.name})
return job_infos