Python定时任务工具Flask-APScheduler基本功能:job信息持久化

浏览: 10253

上一篇<Flask-APScheduler基本功能:作业的新增、起、停介绍>, 介绍了Flask-APScheduler的基本的操作,但是默认情况下job信息是存到内存里面,服务重启后会消失。

我需要的场景job是需要反复运行的,这就需要将job信息持久化。

我使用了mongodb作为存储,也可以是用关系数据库,具体看大家习惯了,具体可以看APScheduler文档

1.配置持久化存储

#Config增加如下内容即可

class Config(object):
    JOBS = []
    SCHEDULER_JOBSTORES = {
        'default': MongoDBJobStore(host='mongoserver', port=27017)
    }
    SCHEDULER_EXECUTORS = {
        'default': {'type': 'threadpool', 'max_workers': 20}
    }
    SCHEDULER_JOB_DEFAULTS = {
        'coalesce': False,
        'max_instances': 3
    }
    SCHEDULER_API_ENABLED = True

2.动态增加job注意replace_existing参数

 由于我的场景需要动态添加job在调用 add_job的时候注意使用replace_existing=True

 job = scheduler.add_job(func=func,id=id, args=args,trigger=trigger,seconds=seconds,replace_existing=True)

3.使用curl传递字符串的转义

argu如果使用字符串,注意在shell中使用\"转义,并且一个参数会报错,这点有些奇怪,不过注意绕过去就好了。

curl -i -X POST -H "'Content-type':'appon/x-www-form-urlencoded', 'charset':'utf-8', 'Accept': 'text/plain'" -d '{"id":"job5","func": "test:job1","args":"(\"aaa\",\"aaadd\")","trigger":"interval","seconds":60}' http://127.0.0.1:5000/addjob

4.mongo中job信息查询

每一个job具体就3个字段:'_id、next_run_time、job_state

id:  job5 next_run_time:  2017-05-02 12:06:19
job detail:  {'args': ('aaa', 'aaadd'), 'id': 'job5', 'func': 'test:job1', 'coalesce': False, 'kwargs': {}, 'max_instances': 3, 'version': 1, 'trigger': <IntervalTrigger (interval=datetime.timedelta(0, 60), start_date='2017-05-02 11:31:19 CST', timezone='Asia/Shanghai')>, 'next_run_time': datetime.datetime(2017, 5, 2, 12, 6, 19, 141192, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), 'name': 'job5', 'executor': 'default', 'misfire_grace_time': 1}

读取的代码如下:

from pymongo import MongoClient
import pickle
import time
client = MongoClient('mongoserver', 27017)
collection = client.apscheduler.jobs
for post in collection.find({}).limit(12):
    print('id: ',post['_id'],'next_run_time: ',time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(post['next_run_time'])))
    print('job detail: ',pickle.loads(post['job_state']))
client.close()

示例:

2020-04-15增加 由于附件不能访问

#
#test script
#curl -i -X POST -H "'Content-type':'appon/x-www-form-urlencoded', 'charset':'utf-8', 'Accept': 'text/plain'" -d '{"id":"job5","func": "test:job1","args":"(\"aaa\",\"aaadd\")","trigger":"interval","seconds":60}' http://127.0.0.1:5000/addjob
#
from flask import Flask
from flask_apscheduler import APScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from flask import request
import os
import time
app = Flask(__name__)
scheduler = APScheduler()
class Config(object):
    JOBS = []
    SCHEDULER_JOBSTORES = {
        'default': MongoDBJobStore(host='mongoserver', port=27017)
    }
    SCHEDULER_EXECUTORS = {
        'default': {'type': 'threadpool', 'max_workers': 20}
    }
    SCHEDULER_JOB_DEFAULTS = {
        'coalesce': False,
        'max_instances': 3
    }
    SCHEDULER_API_ENABLED = True
def job1(a, b):
    print(str(a) + ' ' + str(b)+'   '+time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
def job2(a):
    py='python wx_post_test.py '+a
    os.system(py)
def jobfromparm(**jobargs):
    id = jobargs['id']
    func = jobargs['func']
    args = eval(jobargs['args'])
    trigger = jobargs['trigger']
    seconds = jobargs['seconds']
    print('add job: ',id)
    job = scheduler.add_job(func=func,id=id, args=args,trigger=trigger,seconds=seconds,replace_existing=True)
    return 'sucess'
@app.route('/pause')
def pausejob():
    scheduler.pause_job('job1')
    return "Success!"
@app.route('/resume')
def resumejob():
    scheduler.resume_job('job1')
    return "Success!"
@app.route('/addjob', methods=['GET', 'POST'])
def addjob():
    data = request.get_json(force=True)
    print(data)
    job = jobfromparm(**data)
    return 'sucess'
if __name__ == '__main__':
    #app = Flask(__name__)
    app.config.from_object(Config())
    # it is also possible to enable the API directly
    # scheduler.api_enabled = True
    scheduler = APScheduler()
    scheduler.init_app(app)
    scheduler.start()
    app.run(host='0.0.0.0')
    #app.run()
推荐 4
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

5 个评论

太给了了 这两个脚本解决了我头痛好多天的问题
谢谢博主
优秀
附件已经不能查看了,请问可以再上传一份吗
附件加不了, 我贴在最后了

要回复文章请先登录注册