apscheduler 的使用??我们项目中总是避免不了要使用一些定时任务 , 比如说最近的项目 , 用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上 , 翻了翻 fastapi 中的实现 , 虽然方法和包也不少 , 但是要不就是太重了(比如需要再开服务 , 还要依赖 redis , 都不好用) , 虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现 , 但是相对简单的功能还行 , 复杂点的代码起来就麻烦了 , 所以还是专人专事找个负责这个额的包吧 。找来找去发现 APScheduler 就挺适合 , 代码简单 , 实现效果也很好 , 这里做个记录!
安装pip install apscheduler主要组成部分【如何让你的简历脱颖而出 如何让你的Python程序,定时定点地去执行任务?】概念性东西 , 混个脸熟 , 代码比这些定义好理解 。
触发器(trigger)包含调度逻辑 , 每一个作业有它自己的触发器 , 用于决定接下来哪一个作业会运行 。除了他们自己初始配置意外 , 触发器完全是无状态的 。说人话就是你指定那种方式触发当前的任务 。
干货主要有:① 200 多本 Python 电子书(和经典的书籍)应该有
② Python标准库资料(最全中文版)
③ 项目源码(四五十个有趣且可靠的练手项目及源码)
④ Python基础入门、爬虫、网络开发、大数据分析方面的视频(适合小白学习)
⑤ Python学习路线图(告别不入流的学习)
Python学习交流Q群101677771
解释
DateTrigger
到期执行(到xxxx年x月x日 x时x分x秒执行) 对应DateTrigger
IntervalTrigger
间隔执行(每5秒执行一次)
CronTrigger
一个crontab类型的条件(这个比较复杂 , 比如周一到周四的4-5点每5秒执行一次)
作业存储(job store)存储被调度的作业 , 默认的作业存储是简单地把作业保存在内存中 , 其他的作业存储是将作业保存在数据库中 。一个作业的数据讲在保存在持久化作业存储时被序列化 , 并在加载时被反序列化 。调度器不能分享同一个作业存储 。
Jobstore在scheduler中初始化 , 另外也可通过scheduler的add_jobstore动态添加Jobstore 。每个jobstore都会绑定一个alias , scheduler在Add Job时 , 根据指定的jobstore在scheduler中找到相应的jobstore , 并将job添加到jobstore中 。Jobstore主要是通过pickle库的loads和dumps【实现核心是通过python的__getstate__和__setstate__重写实现】 , 每次变更时将Job动态保存到存储中 , 使用时再动态的加载出来 , 作为存储的可以是redis , 也可以是数据库【通过sqlarchemy这个库集成多种数据库】 , 也可以是mongodb等目前APScheduler支持的Jobstore:MemoryJobStoreMongoDBJobStoreRedisJobStoreRethinkDBJobStoreSQLAlchemyJobStoreZooKeeperJobStore执行器(executor)处理作业的运行 , 他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行 。当作业完成时 , 执行器将会通知调度器 。- 说人话就是添加任务时候用它来包装的 , executor的种类会根据不同的调度来选择 , 如果选择AsyncIO作为调度的库 , 那么选择AsyncIOExecutor , 如果选择tornado作为调度的库 , 选择TornadoExecutor , 如果选择启动进程作为调度 , 选择ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的选择需要根据实际的scheduler来选择不同的执行器目前APScheduler支持的Executor:AsyncIOExecutorGeventExecutorThreadPoolExecutorProcessPoolExecutorTornadoExecutorTwistedExecutor调度器(scheduler)是其他的组成部分 。你通常在应用只有一个调度器 , 应用的开发者通常不会直接处理作业存储、调度器和触发器 , 相反 , 调度器提供了处理这些的合适的接口 。配置作业存储和执行器可以在调度器中完成 , 例如添加、修改和移除作业.Scheduler是APScheduler的核心 , 所有相关组件通过其定义 。scheduler启动之后 , 将开始按照配置的任务进行调度 。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外 。当发生Job信息变更时也会触发调度 。scheduler可根据自身的需求选择不同的组件 , 如果是使用AsyncIO则选择AsyncIOScheduler , 使用tornado则选择TornadoScheduler 。目前APScheduler支持的Scheduler:AsyncIOSchedulerBackgroundSchedulerBlockingSchedulerGeventSchedulerQtSchedulerTornadoSchedulerTwistedScheduler简单应用import timefrom apscheduler.schedulers.blocking import BlockingScheduler # 引入后台def my_job():print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))sched = BlockingScheduler()sched.add_job(my_job, 'interval', seconds=5)sched.start()完整代码# trigeers 触发器# job stores job 存储# executors 执行器# schedulers 调度器from pytz import utcfrom sqlalchemy import funcfrom apscheduler.schedulers.background import BackgroundScheduler,AsyncIOSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ProcessPoolExecutorjobstores = {# 可以配置多个存储#'mongo': {'type': 'mongodb'},'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')# SQLAlchemyJobStore指定存储链接}executors = {'default': {'type': 'threadpool', 'max_workers': 20},# 最大工作线程数20'processpool': ProcessPoolExecutor(max_workers=5)# 最大工作进程数为5}job_defaults = {'coalesce': False,# 关闭新job的合并 , 当job延误或者异常原因未执行时'max_instances': 3# 并发运行新job默认最大实例多少}scheduler = BackgroundScheduler()# .. do something else here, maybe add jobs etc.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区import osimport timedef print_time(name):print(f'{name} - {time.ctime()}')def add_job(job_id, func, args, seconds):"""添加job"""print(f"添加间隔执行任务job - {job_id}")scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)def add_coun_job(job_id, func, args, start_time):"""添加job"""print(f"添加一次执行任务job - {job_id}")scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time)# scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])def remove_job(job_id):"""移除job"""scheduler.remove_job(job_id)print(f"移除job - {job_id}")def pause_job(job_id):"""停止job"""scheduler.pause_job(job_id)print(f"停止job - {job_id}")def resume_job(job_id):"""恢复job"""scheduler.resume_job(job_id)print(f"恢复job - {job_id}")def get_jobs():"""获取所有job信息,包括已停止的"""res = scheduler.get_jobs()print(f"所有job - {res}")def print_jobs():print(f"详细job信息")scheduler.print_jobs()def start():"""启动调度器"""scheduler.start()def shutdown():"""关闭调度器"""scheduler.shutdown()if __name__ == '__main__':scheduler = BackgroundScheduler()# start()# print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))# add_job('job_A', func=print_time, args=("A", ), seconds=1)# add_job('job_B', func=print_time, args=("B", ), seconds=2)# time.sleep(6)# pause_job('job_A') # 停止a# get_jobs()#得到所有job# time.sleep(6)# print_jobs()# resume_job('job_A')# time.sleep(6)# remove_job('job_A')# time.sleep(6)from datetime import datetimeimport pytzstart()date_temp = datetime(2022, 2, 19, 17, 30, 5)# scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text'])# scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])add_coun_job(job_id="job_C",func=print_time,args=('一次性执行任务',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone())time.sleep(130)try:shutdown()except RuntimeError:pass
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
