开发者

Python定时库APScheduler的原理以及用法示例

开发者 https://www.devze.com 2022-12-07 10:06 出处:网络 作者: 战渣渣
目录1. APScheduler简介2. APScheduler组件2.1. APScheduler中几个重要的概念2.1.1. Job 作业2.1.2. Trigger 触发器2.1.3. Executor 执行器2.1.4. Jobstore 作业存储2.1.5. Event 事件2.1.6. Listener 监听事件2.1.7
目录
  • 1. APScheduler简介
  • 2. APScheduler组件
    • 2.1. APScheduler中几个重要的概念
      • 2.1.1. Job 作业
      • 2.1.2. Trigger 触发器
      • 2.1.3. Executor 执行器
      • 2.1.4. Jobstore 作业存储
      • 2.1.5. Event 事件
      • 2.1.6. Listener 监听事件
      • 2.1.7. Scheduler 调度器
    • 2.2. Scheduler工作流程图
      • 2.2.1. Scheduler添加job流程
      • 2.2.2 Scheduler调度流程
  • 3. APScheduler使www.cppcns.com用示例
    • 总结

      1. APScheduler简介

      APscheduler全称Advanced python Scheduler

      作用为在指定的时间规则执行指定的作业。

      • 指定时间规则的方式可以是间隔多久执行,可以是指定日期时间的执行,也可以类似linux系统中Crontab中的方式执行任务。
      • 指定的任务就是一个Python函数。

      2. APScheduler组件

      APScheduler版本 3.6.3

      2.1. APScheduler中几个重要的概念

      2.1.1. Job 作业

      作用

      Job作为APScheduler最小执行单位。

      创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。

      构建说明

      id:指定作业的唯一ID

      name:指定作业的名字

      trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行

      executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数

      max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行

      next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间

      misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job

      coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行

      func:Job执行的函数

      args:Job执行函数需要的位置参数

      kwargs:Job执行函数需要的关键字参数

      2.1.2. Trigger 触发器

      Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较

      确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。

      Trigger有多种种类,指定时间的DateTrigger,指定间隔时间的IntervalTrigger,像Linux的crontab一样的CronTrigger

      目前APScheduler支持触发器:

      DateTrigger

      IntervalTrigger

      CronTrigger

      2.1.3. Executor 执行器

      Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。

      每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job

      Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择T编程客栈ornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以

      Executor的选择需要根据实际的scheduler来选择不同的执行器

      目前APScheduler支持的Executor:

      AsyncIOExecutor

      GeventExecutor

      ThreadPoolExecutor

      ProcessPoolExecutor

      TornadoExecutor

      TwistedExecutor

      2.1.4. Jobstore 作业存储

      Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。

      Jobstore主要是通过pickle库的loads和dumps【实现核心是通过python的__getstate__和__setstate__重写实现】,每次变更时将Job动态保存到存储中,使用时再动态的加载出来,作为存储的可以是redis,也可以是数据库【通过sqlarchemy这个库集成多种数据库】,也可以是mongodb等

      目前APScheduler支持的Jobstore:

      MemoryJobStore

      MongoDBJobStore

      RedisJobStore

      RethinkDBJobStore

      SQLAlchemyJobStore

      ZooKeeperJobStore

      2.1.5. Event 事件

      Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作

      常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。

      目前APScheduler定义的Event

      EVENT_SCHEDULER_STARTED

      EVENT_SCHEDULER_START

      EVENT_SCHEDULER_SHUTDOWN

      EVENT_SCHEDULER_PAUSED

      EVENT_SCHEDULER_RESUMED

      EVENT_EXECUTOR_ADDED

      EVENT_EXECUTOR_REMOVED

      EVENT_JOBSTORE_ADDED

      EVENT_JOBSTORE_REMOVED

      EVENT_ALL_JOBS_REMOVED

      EVENT_JOB_ADDED

      EVENT_JOB_REMOVED

      EVENT_JOB_MODIFIED

      EVENT_JOB_EXECUTED

      EVENT_JOB_ERROR

      EVENT_JOB_MISSED

      EVENT_JOB_SUBMITTED

      EVENT_JOB_MAX_INSTANCES

      2.1.6. Listener 监听事件

      Listener表示用户自定义监听的一些Event,当Job触发了EVENT_JOB_MISSED事件时

      可以根据需求做一些其他处理。

      2.1.7. Scheduler 调度器

      Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。

      除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

      scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则

      选择TornadoScheduler。

      目前APScheduler支持的Scheduler:

      AsyncIOScheduler

      BackgroundScheduler

      BlockingScheduler

      GeventScheduler

      QtScheduler

      TornadoScheduler

      TwistedScheduler

      2.2. Scheduler工作流程图

      这里重点挑选两个重要的流程画一个简陋的流程图,来看一下scheduler的工作原理。其一个是添加add job,另一是scheduler每次唤醒调度时的执行过程

      2.2.1. Scheduler添加job流程

      Python定时库APScheduler的原理以及用法示例

      2.2.2 Scheduler调度流程

      Python定时库APScheduler的原理以及用法示例

      3. APScheduler使用示例

      AsyncIO调度示例

      import asyncio
      
      import datetime
      
      from apscheduler.events import EVENT_JOB_EXECUTED
      
      from apscheduler.executors.asyncio import AsyncIOExecutor
      
      from apscheduler.jobstores.redis import RedisJobStore  # 需要安装redis
      
      from apscheduler.schedulers.asyncio import AsyncIOScheduler
      
      from apscheduler.triggers.interval import IntervalTrigger
      
      from apscheduler.triggers.cron import CronTrigger
      
      # 定义jobstore  使用redis 存储job信息
      
      default_redis_jobstore = RedisJobStore(
      
          db=2,
      
          jobs_key="apschedulers.default_jobs",
      
          run_times_key="apschedulers.default_run_times",
      
          host="127.0.0.1",
      
          port=6379,
      
          password="tes编程客栈t"
      
      )
      
      # 定义executor 使用asyncio是的调度执行规则
      
      first_executor = AsyncIOExecutor()
      
      # 初始化scheduler时,可以直接指定jobstore和executor
      
      init_scheduler_options = {
      
          "jobstores": {
      
              # first 为 jobstore的名字,在创建Job时直接直接此名字即可
      
              "default": default_redis_jobstore
      
          },
      
          "executors": {
      
              # first 为 executor 的名字,在创建Job时直接直接此名字,执行时则会使用此executor执行
      
              "first": first_executor
      
          },
      
          # 创建job时的默认参数
      
          "job_defaults": {
      
              'coalesce': False,  # 是否合并执行
      
              'max_instances': 1  # 最大实例数
      
          }
      
      }
      
      # 创建scheduler
      
      scheduler = AsyncIOScheduler(**init_scheduler_options)
      
      # 启动调度
      
      scheduler.start()
      
      second_redis_jobstore = RedisJobStore(
      
          db=2,
      
          jobs_key="apschedulers.second_jobs",
      
          run_times_key="apschedulers.second_run_times",
      
          host="127.0.0.1",
      
          port=6379,
      
          password="test"
      
      )
      
      scheduler.add_jobstore(second_redis_jobstore, 'second')
      
      # 定义executor 使用asyncio是的调度执行规则
      
      second_executor = AsyncIOExecutor()
      
      scheduler.add_executor(second_executor, "second")
      
      # ***********               关于 APScheduler中有关Event相关使用示例               *************
      
      # 定义函数监听事件
      
      def job_execute(event):
      
          """
      
          监听事件处理
      
          :param event:
      
          :return:
      
          """
      
          print(
      
              "job执行job:\ncode => {}\njob.id => {}\njobstore=>{}".format(
      
                  event.code,
      
                  event.job_id,
      
                  event.jobstore
      
              ))
      
      # 给EVENT_JOB_EXECUTED[执行完成job事件]添加回调,这里就是每次Job执行完成了我们就输出一些信息
      
      scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED)
      
      # ***********               关于 APScheduler中有关Job使用示例               *************
      
      # 使用的是asyncio,所以job执行的函数可以是一个协程,也可以是一个普通函数,AsyncIOExecutor会根据配置的函数来进行调度,
      
      # 如果是协程则会直接丢入到loop中,如果是普通函数则会启用线程处理
      
      # 我们定义两个函数来看看执行的结果
      
      def interval_func(message):
      
          print("现在时间: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
      
          print("我是普通函数")
      
          print(message)
      
      async def async_func(message):
      
          print("现在时间: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
      
          print("我是协程")
      
          print(message)
      
      # 将上述的两个函数按照不同的方式创造触发器来执行
      
      # ***********               关于 APScheduler中有关Trigger使用示例               *************
      
      # 使用Trigger有两种方式,一种是用类创建使用,另一个是使用字符串的方式
      
      # 使用字符串指定别名, scheduler初始化时已将其定义的trigger加载,所以指定字符串可以直接使用
      
      if scheduler.get_job("interval_func_test", "default"):
      
          # 存在的话,先删除
      
          scheduler.remove_job("interval_func_test", "default")
      
      # 立马开始 2分钟后结束, 每10s执行一次 存储到first jobstore  second执行
      
      scheduler.add_job(interval_func, "interval",
      
                        args=["我是10s执行一次,存放在jobstore default, executor default"],
      
                        seconds=10,
      
                        id="interval_func_test",
      
                        jobstore="default",
      
                        executor="default",
      
                        start_date=datetime.datetime.now(),
      
                        end_date=datetime.datetime.now() + datetime.timedelta(seconds=240))
      
      # 先创建tigger
      
      trigger = IntervalTrigger(seconds=5)
      
      if scheduler.get_job("interval_func_test_2", "second"):
      
          # 存在的话,先删除
      
          scheduler.remove_job("interval_func_test_2", "second")
      
      # 每隔5s执行一次
      
      scheduler.add编程客栈_job(async_func, trigger, args=["我是每隔5s执行一次,存放在jobstore second, executor = second"],
      
                        id="interval_func_test_2",
      
                        jobstore="second",
      
                        executor="second")
      
      # 使用协程的函数执行,且使用cron的方式配置触发器
      
      if scheduler.get_job("cron_func_test", "default"):
      
          # 存在的话,先删除
      
          scheduler.remove_job("cron_func_test", "default")
      
      # 立马开始 每10s执行一次
      
      scheduler.add_job(async_func, "cron",
      
                        args=["我是 每分钟 30s  时执行一次,存放在jobstore default, executor default"],
      
                        second='30',
      
                        id="cron_func_test",
      
                        jobstore="default",
      
                        executor="default")
      
      # 先创建tigger
      
      trigger = CronTrigger(second='20,40')
      
      if scheduler.get_job("cron_func_test_2", "second"):
      
          # 存在的话,先删除
      
          scheduler.remove_job("cron_func_test_2", "second")
      
      # 每隔5s执行一次
      
      scheduler.add_job(async_func, trigger, args=["我是每分钟 20s  40s时各执行一次,存放在jobstore second, executor = second"],
      
                        id="cron_func_test_2",
      
                        jobstore="second",
      
                        executor="second")
      
      # 使用创建trigger对象直接创建
      
      print("启动: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
      
      asyncio.get_event_loop().run_forever()

      输出结果部分截取

      启动之后,每隔5s运行一次的JOB

      启动: 2019-12-05 14:13:11

      【这部分是定义的协程函数输出的内容】

      现在时间: 2019-12-05 14:13:16

      我是协程

      我是每隔5s执行一次,存放在jobstore second, executor = second

      【这部分是监听job执行完成之后的回调输出】

      job执行job: code => 4096

      job.id => interval_func_test_2

      jobstore=>second

      在20s和40s时各执行一次的Job

      现在时间: 2019-12-05 14:13:20

      我是协程

      我是每分钟 20s 40s时各执行一次,存放在jobstore second, executor = second

      job执行

      job: code => 4096

      job.id => cron_func_test_2

      jobstore=>second

      每隔10s执行一次的job

      现在时间: 2019-12-05 14:13:21

      我是普通函数

      我是10s执行一次,存放在jobstore default, executor default

      现在时间: 2019-12-05 14:13:21

      我是协程

      我是每隔5s执行一次,存放在jobstore second, executor = second

      job执行job: code => 4096

      job.id => interval_fhttp://www.cppcns.comunc_test

      jobstore=>default

      job执行

      job: code => 4096

      job.id => interval_func_test_2

      jobstore=>second

      每隔5s执行一次的Job

      现在时间: 2019-12-05 14:13:26

      我是协程

      我是每隔5s执行一次,存放在jobstore second, executor = second

      job执行

      job: code => 4096

      job.id => interval_func_test_2

      jobstore=>second

      每分钟30s时执行一次

      现在时间: 2019-12-05 14:13:30

      我是协程

      我是 每分钟 30s 时执行一次,存放在jobstore default, executor default

      job执行

      job: code => 4096

      job.id => cron_func_test

      jobstore=>default

      总结

      apscheduler的工作原理及用法基本这样。

      apscheduler强大的地方是可以集成到tornado,django,flask等框架,也可以单独运行。比如CronTrigger还有更强大的用法,可以参照官网的cron用法

      上面例子只列举了一些常规用法,其实还有一些更切合实际的用法,利用APSchedulder的特性,动态的添加Job,暂停Job,删除Job,重启Job等。先按照功能性质定义好不同的函数,然后开发一个web服务。在web服务中动态操作各种Job,可以想象在监控系统中根据需求添加一些任务,岂不美哉。

      有时间将这部分做一个例子再来分享。

      到此这篇关于Python定时库APScheduler的原理以及用法的文章就介绍到这了,更多相关Python定时库APScheduler用法内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号