上一篇<Flask-APScheduler基本功能:作业的新增、起、停介绍>, 介绍了Flask-APScheduler的基本的操作,但是默认情况下job信息是存到内存里面,服务重启后会消失。
我需要的场景job是需要反复运行的,这就需要将job信息持久化。
我使用了mongodb作为存储,也可以是用关系数据库,具体看大家习惯了,具体可以看APScheduler文档
1.配置持久化存储
#Config增加如下内容即可
class Config(object):
JOBS = []
SCHEDULER_JOBSTORES = {
'default': MongoDBJobStore(host='mongoserver', port=27017)
}
SCHEDULER_EXECUTORS = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
SCHEDULER_JOB_DEFAULTS = {
'coalesce': False,
'max_instances': 3
}
SCHEDULER_API_ENABLED = True
2.动态增加job注意replace_existing参数
由于我的场景需要动态添加job在调用 add_job的时候注意使用replace_existing=True
job = scheduler.add_job(func=func,id=id, args=args,trigger=trigger,seconds=seconds,replace_existing=True)
3.使用curl传递字符串的转义
argu如果使用字符串,注意在shell中使用\"转义,并且一个参数会报错,这点有些奇怪,不过注意绕过去就好了。
curl -i -X POST -H "'Content-type':'appon/x-www-form-urlencoded', 'charset':'utf-8', 'Accept': 'text/plain'" -d '{"id":"job5","func": "test:job1","args":"(\"aaa\",\"aaadd\")","trigger":"interval","seconds":60}' http://127.0.0.1:5000/addjob
4.mongo中job信息查询
每一个job具体就3个字段:'_id、next_run_time、job_state
id: job5 next_run_time: 2017-05-02 12:06:19
job detail: {'args': ('aaa', 'aaadd'), 'id': 'job5', 'func': 'test:job1', 'coalesce': False, 'kwargs': {}, 'max_instances': 3, 'version': 1, 'trigger': <IntervalTrigger (interval=datetime.timedelta(0, 60), start_date='2017-05-02 11:31:19 CST', timezone='Asia/Shanghai')>, 'next_run_time': datetime.datetime(2017, 5, 2, 12, 6, 19, 141192, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>), 'name': 'job5', 'executor': 'default', 'misfire_grace_time': 1}
读取的代码如下:
from pymongo import MongoClient
import pickle
import time
client = MongoClient('mongoserver', 27017)
collection = client.apscheduler.jobs
for post in collection.find({}).limit(12):
print('id: ',post['_id'],'next_run_time: ',time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(post['next_run_time'])))
print('job detail: ',pickle.loads(post['job_state']))
client.close()
示例:
2020-04-15增加 由于附件不能访问
#
#test script
#curl -i -X POST -H "'Content-type':'appon/x-www-form-urlencoded', 'charset':'utf-8', 'Accept': 'text/plain'" -d '{"id":"job5","func": "test:job1","args":"(\"aaa\",\"aaadd\")","trigger":"interval","seconds":60}' http://127.0.0.1:5000/addjob
#
from flask import Flask
from flask_apscheduler import APScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from flask import request
import os
import time
app = Flask(__name__)
scheduler = APScheduler()
class Config(object):
JOBS = []
SCHEDULER_JOBSTORES = {
'default': MongoDBJobStore(host='mongoserver', port=27017)
}
SCHEDULER_EXECUTORS = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
SCHEDULER_JOB_DEFAULTS = {
'coalesce': False,
'max_instances': 3
}
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b)+' '+time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
def job2(a):
py='python wx_post_test.py '+a
os.system(py)
def jobfromparm(**jobargs):
id = jobargs['id']
func = jobargs['func']
args = eval(jobargs['args'])
trigger = jobargs['trigger']
seconds = jobargs['seconds']
print('add job: ',id)
job = scheduler.add_job(func=func,id=id, args=args,trigger=trigger,seconds=seconds,replace_existing=True)
return 'sucess'
@app.route('/pause')
def pausejob():
scheduler.pause_job('job1')
return "Success!"
@app.route('/resume')
def resumejob():
scheduler.resume_job('job1')
return "Success!"
@app.route('/addjob', methods=['GET', 'POST'])
def addjob():
data = request.get_json(force=True)
print(data)
job = jobfromparm(**data)
return 'sucess'
if __name__ == '__main__':
#app = Flask(__name__)
app.config.from_object(Config())
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
app.run(host='0.0.0.0')
#app.run()