PythonTornado集成Apscheduler定时任务

职场菊菊子 2024-03-09 00:21:30
熟悉Python的人可能都知道,Apscheduler是python里面一款非常优秀的任务调度框架,这个框架是从鼎鼎大名的Quartz移植而来。 之前有用过Flask版本的Apscheduler做定时任务。刚好前不久接触了Tornado,顺便玩玩Tornado版本的Apscheduler。 本篇做了一个简单的Wdb页面,用于添加和删除定时任务,小伙伴们可以基于这个做一些扩展,比如把定时定时任务写入数据库,改变cron规则等等。 主要功能点如下: #新增任务(需要动态改变job_id的值) http://localhost:8888/scheduler?job_id=1&action=add #删除任务(需要动态改变job_id的值) http://localhost:8888/scheduler?job_id=1&action=remov 执行结果可以在console看到 from datetime import datetimefrom tornado.ioloop import IOLoop, PeriodicCallbackfrom tornado.web import RequestHandler, Applicationfrom apscheduler.schedulers.tornado import TornadoSchedulerscheduler = Nonejob_ids = []# 初始化def init_scheduler(): global scheduler scheduler = TornadoScheduler() scheduler.start() print('[Scheduler Init]APScheduler has been started')# 要执行的定时任务在这里def task1(options): print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))class MainHandler(RequestHandler): def get(self): self.write('')class SchedulerHandler(RequestHandler): def get(self): global job_ids job_id = self.get_query_argument('job_id', None) action = self.get_query_argument('action', None) if job_id: # add if 'add' == action: if job_id not in job_ids: job_ids.append(job_id) scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,)) self.write('[TASK ADDED] - {}'.format(job_id)) else: self.write('[TASK EXISTS] - {}'.format(job_id)) # remove elif 'remove' == action: if job_id in job_ids: scheduler.remove_job(job_id) job_ids.remove(job_id) self.write('[TASK REMOVED] - {}'.format(job_id)) else: self.write('[TASK NOT FOUND] - {}'.format(job_id)) else: self.write('[INVALID PARAMS] INVALID job_id or action')if __name__ == "__main__": routes = [ (r"/", MainHandler), (r"/scheduler/?", SchedulerHandler), ] init_scheduler() app = Application(routes, debug=True) app.listen(8888) IOLoop.current().start()
0 阅读:0

职场菊菊子

简介:感谢大家的关注