动态

详情 返回 返回

日夕如是寒暑不間,基於Python3+Tornado6+APScheduler/Celery打造併發異步動態定時任務輪詢服務 - 动态 详情

原文轉載自「劉悦的技術博客」https://v3u.cn/a_id_220

定時任務的典型落地場景在各行業中都很普遍,比如支付系統中,支付過程中因為網絡或者其他因素導致出現掉單、卡單的情況,賬單變成了“單邊賬”,這種情況對於支付用户來説,毫無疑問是災難級別的體驗,明明自己付了錢,扣了款,但是訂單狀態卻未發生變化。所以,每一筆訂單的支付任務流程中都需要一個定時輪詢的備選方案,一旦支付中發生問題,定時輪詢服務就可以及時發現和更正訂單狀態。

又比如,之前的一篇以寡治眾各個擊破,超大文件分片上傳之構建基於Vue.js3.0+Ant-desgin+Tornado6純異步IO高效寫入服務,在超大型文件分片傳輸任務過程中,一旦分片上傳或者分片合併環節出了問題,就有可能導致超大型文件無法完整的傳輸到服務器中,從而浪費大量的系統帶寬資源,所以每一個分片傳輸任務執行過程中也需要一個對應的定時輪詢來“盯”着,防止過程中出現問題。

在實際業務場景中,定時服務基本都作為主應用的附屬服務而存在,不同定時任務的調度時間可能不一樣,所以如果能夠配合主服務併發異步調用定時任務,則可以單應用能夠支持上萬,甚至十萬以上的定時任務,並且不同任務能夠有獨立的調度時間,這裏通過Tornado配合APScheduler和Celery,分別展示不同的異步定時任務調用邏輯。

APScheduler

APScheduler(advanceded python scheduler)是一款及其優秀的Python3定時任務框架,它不僅支持併發異步調用定時任務,還可以動態地對定時任務進行管理,同時也支持定時任務的持久化。

首先安裝APScheduler以及Tornado6:

pip3 install apscheduler
pip3 install tornado==6.1

隨後導入基於Tornado的異步APScheduler:

from datetime import datetime    
from tornado.ioloop import IOLoop, PeriodicCallback    
from tornado.web import RequestHandler, Application    
from apscheduler.schedulers.tornado import TornadoScheduler

這裏TornadoScheduler實例就具備了Tornado的事件循環特性,隨後聲明異步定時任務:

async def task():    
    print('[APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))

隨後初始化定時任務對象:

scheduler = None  
  
# 初始化  
def init_scheduler():  
  
    global scheduler  
  
    scheduler = TornadoScheduler()  
  
    scheduler.start()  
  
    scheduler.add_job(task,"interval",seconds=3,id="job1",args=())  
  
    print("定時任務啓動")

這裏啓動後就添加一個定時任務,每隔三秒執行一次。

接着main入口啓動服務:

if __name__ == '__main__':  
  
    init_scheduler()

系統返回:

C:\Users\liuyue\www\tornado6>python test_scheduler.py  
定時任務啓動  
[APScheduler][Task]-2022-07-28 22:13:47.792582  
[APScheduler][Task]-2022-07-28 22:13:50.783016  
[APScheduler][Task]-2022-07-28 22:13:53.783362  
[APScheduler][Task]-2022-07-28 22:13:56.775059  
[APScheduler][Task]-2022-07-28 22:13:59.779563

隨後創建Tornado控制器視圖:

class SchedulerHandler(RequestHandler):  
    def get(self):  
        job_id = self.get_query_argument('job_id', None)  
        action = self.get_query_argument('action', None)  
        if job_id:  
            # 添加任務  
            if 'add' == action:  
                if job_id not in job_ids:  
                    job_ids.append(job_id)  
                    scheduler.add_job(task, 'interval', seconds=3, id=job_id, args=(job_id,))  
                    self.write('[TASK ADDED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK EXISTS] - {}'.format(job_id))  
            # 刪除任務  
            elif 'remove' == action:  
                if job_id in job_ids:  
                    scheduler.remove_job(job_id)  
                    self.write('[TASK REMOVED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))  
        else:  
            self.write('[INVALID PARAMS] INVALID job_id or action')

這裏通過傳參來動態的刪減異步定時任務,對於完成輪詢任務的定時任務,完全可以物理刪除,從而節約系統資源,隨後添加路由並且啓動Tornado服務:

if __name__ == '__main__':  
      
    routes = [url(r"/scheduler/",SchedulerHandler)]  
  
    init_scheduler()  
  
    # 聲明tornado對象  
    application = Application(routes,debug=True)  
    application.listen(8888)  
    IOLoop.current().start()

APScheduler定時任務持久化

所謂任務持久化,即任務存儲在諸如文件或者數據庫這樣的持久化容器中,如果APScheduler定時任務服務進程中斷,未執行的任務還會保留,當服務再次啓動時,定時任務可以從數據庫中讀取出來再次被裝載調用,這裏以redis數據庫為例子:

from apscheduler.jobstores.redis import RedisJobStore  
  
# 初始化  
def init_scheduler():  
  
    global scheduler  
  
    jobstores = {  
        'default': RedisJobStore(jobs_key='cron.jobs',run_times_key='cron.run_times',  
                     host='localhost', port=6379,)  
    }  
  
    scheduler = TornadoScheduler(jobstores=jobstores)  
  
    scheduler.start()  
  
    scheduler.add_job(task,"interval",seconds=3,id="job1",args=())  
  
    print("定時任務啓動")

這裏通過jobstores參數將redis裝載到定時任務服務中,當創建任務時,數據庫中會以hash的形式來存儲任務明細:

127.0.0.1:6379> keys *  
1) "cron.run_times"  
2) "cron.jobs"  
127.0.0.1:6379> type cron.jobs  
hash  
127.0.0.1:6379> hgetall cron.jobs  
1) "job1"  
2) "\x80\x05\x95\x14\x02\x00\x00\x00\x00\x00\x00}\x94(\x8c\aversion\x94K\x01\x8c\x02id\x94\x8c\x04job1\x94\x8c\x04func\x94\x8c\x0e__main__:task1\x94\x8c\atrigger\x94\x8c\x1dapscheduler.triggers.interval\x94\x8c\x0fIntervalTrigger\x94\x93\x94)\x81\x94}\x94(h\x01K\x02\x8c\btimezone\x94\x8c\x1bpytz_deprecation_shim._impl\x94\x8c\twrap_zone\x94\x93\x94\x8c\bbuiltins\x94\x8c\agetattr\x94\x93\x94\x8c\bzoneinfo\x94\x8c\bZoneInfo\x94\x93\x94\x8c\t_unpickle\x94\x86\x94R\x94\x8c\x0cAsia/Irkutsk\x94K\x01\x86\x94R\x94h\x19\x86\x94R\x94\x8c\nstart_date\x94\x8c\bdatetime\x94\x8c\bdatetime\x94\x93\x94C\n\a\xe6\a\x1c\x16\x1e&\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94\x8c\bend_date\x94N\x8c\binterval\x94h\x1f\x8c\ttimedelta\x94\x93\x94K\x00K\x03K\x00\x87\x94R\x94\x8c\x06jitter\x94Nub\x8c\bexecutor\x94\x8c\adefault\x94\x8c\x04args\x94)\x8c\x06kwargs\x94}\x94\x8c\x04name\x94\x8c\x05task1\x94\x8c\x12misfire_grace_time\x94K\x01\x8c\bcoalesce\x94\x88\x8c\rmax_instances\x94K\x01\x8c\rnext_run_time\x94h!C\n\a\xe6\a\x1c\x16\x1e,\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94u."

而如果刪除任務,redis數據庫中的任務也會同步刪除。

至此,APScheduler配合Tornado就完成了一個簡單的併發異步定時任務服務。

Celery

celery是一款在Python定時任務領域“開風氣之先”的框架,和APScheduler相比,celery略顯臃腫了一點,同時,celery並不具備任何任務持久化的功能,也需要三方的容器進行支持。

首先安裝5.0以上版本:

pip3 install celery==5.2.7

隨後,初始化任務對象:

from celery import Celery  
from datetime import timedelta  
from redisbeat.scheduler import RedisScheduler  
  
app = Celery("tornado")  
  
  
app.conf["imports"] = ["celery_task"]  
  
# 定義broker  
app.conf.broker_url = "redis://localhost:6379"  
  
# 任務結果  
app.conf.result_backend = "redis://localhost:6379"  
  
# 時區  
app.conf.timezone = "Asia/Shanghai"

這裏任務代理(broker)和任務結果(result\_backend)也都存儲在redis中。

緊接着聲明異步任務方法:

from celery import shared_task  
import asyncio  
  
async def consume():  
  
    return 'test'  
  
@shared_task  
def async_job():  
  
    return asyncio.run(consume())

這裏通過asyncio庫間接調用異步方法。

然後添加定時任務的配置:

from datetime import timedelta   
  
# 需要執行任務的配置  
app.conf.beat_schedule = {  
    "task1": {  
        "task": "celery_task.async_consume",  #執行的方法  
        "schedule": timedelta(seconds=3),   
        "args":()  
    },  
}

隨後啓動worker服務:

celery -A module_name worker --pool=solo -l info

接着啓動beat服務:

celery -A module_name beat -l info

異步定時任務會被裝載執行,系統返回:

C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info  
  
 -------------- celery@LIUYUE354D v5.2.7 (dawn-chorus)  
--- ***** -----  
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 22:55:00  
- *** --- * ---  
- ** ---------- [config]  
- ** ---------- .> app:         tornado:0x23769b40430  
- ** ---------- .> transport:   redis://localhost:6379//  
- ** ---------- .> results:     redis://localhost:6379/  
- *** --- * --- .> concurrency: 4 (solo)  
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)  
--- ***** -----  
 -------------- [queues]  
                .> celery           exchange=celery(direct) key=celery  
  
  
[tasks]  
  . celery_task.async_job  
  . celery_task.job  
  . test_celery.sub  
  
[2022-07-28 22:55:02,234: INFO/MainProcess] Connected to redis://localhost:6379//  
[2022-07-28 22:55:04,267: INFO/MainProcess] mingle: searching for neighbors  
[2022-07-28 22:55:11,552: INFO/MainProcess] mingle: all alone  
[2022-07-28 22:55:21,837: INFO/MainProcess] celery@LIUYUE354D ready.  
[2022-07-28 22:58:26,032: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] received  
[2022-07-28 22:58:28,086: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] succeeded in 2.062999999994645s: 'test'  
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] received  
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] succeeded in 0.0s: 'test'  
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] received  
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] succeeded in 0.0s: 'test'

同時,在redis數據庫中會以列表和字符串的形式存儲任務明細和結果:

127.0.0.1:6379> keys *  
1) "celery-task-meta-f4aa4304-02c3-48ee-8625-fa1fe27b8e98"  
2) "celery-task-meta-bb33981d-0629-4173-8375-128ba84d1f0f"  
3) "_kombu.binding.celery"  
4) "celery-task-meta-b0337808-c90b-450b-98bc-fd577f7039d0"  
5) "cron.run_times"  
6) "cron.jobs"  
7) "celery"

從調度層面上講,celery和APScheduler並無太大的不同,但從使用成本上看,celery比APScheduler多維護一個服務,worker和beat雙服務的形式無形中也增加了系統監控資源的開銷。

動態維護異步定時任務

從任務管理層面上看,celery毫無疑問輸的很徹底,因為原生celery壓根就不支持動態地修改定時任務。但我們可以通過三方庫的形式來曲線救國:

pip3 install redisbeat

這裏通過redis的定時任務服務來取代celery原生的beat服務。

建立redisbeat實例:

from celery import Celery  
from datetime import timedelta  
from redisbeat.scheduler import RedisScheduler  
  
app = Celery("tornado")  
  
  
app.conf["imports"] = ["celery_task"]  
  
# 定義broker  
app.conf.broker_url = "redis://localhost:6379"  
  
# 任務結果  
app.conf.result_backend = "redis://localhost:6379"  
  
# 時區  
app.conf.timezone = "Asia/Shanghai"  
  
@app.task  
def sub():  
  
    return "test"  
  
  
schduler = RedisScheduler(app=app)  
schduler.add(**{  
        'name': 'job1',  
        'task': 'test_celery.sub',  
        'schedule': timedelta(seconds=3),  
        'args': ()  
})

通過schduler.add方法就可以動態地添加定時任務,隨後以redisbeat的形式啓動celery服務:

celery -A test_celery beat -S redisbeat.RedisScheduler -l INFO

此時經過改造的系統接受動態任務調用而執行:

C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info  
  
 -------------- celery@LIUYUE354D v5.2.7 (dawn-chorus)  
--- ***** -----  
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 23:09:50  
- *** --- * ---  
- ** ---------- [config]  
- ** ---------- .> app:         tornado:0x19c1a1f0040  
- ** ---------- .> transport:   redis://localhost:6379//  
- ** ---------- .> results:     redis://localhost:6379/  
- *** --- * --- .> concurrency: 4 (solo)  
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)  
--- ***** -----  
 -------------- [queues]  
                .> celery           exchange=celery(direct) key=celery  
  
  
[tasks]  
  . celery_task.async_job  
  . celery_task.job  
  . test_celery.sub  
  
[2022-07-28 23:09:52,916: INFO/MainProcess] Connected to redis://localhost:6379//  
[2022-07-28 23:09:54,971: INFO/MainProcess] mingle: searching for neighbors  
[2022-07-28 23:10:02,140: INFO/MainProcess] mingle: all alone  
[2022-07-28 23:10:12,427: INFO/MainProcess] celery@LIUYUE354D ready.  
[2022-07-28 23:10:12,440: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] received  
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] succeeded in 2.0780000000013388s: 'test'  
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] received  
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] succeeded in 0.0s: 'test'  
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] received  
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] succeeded in 0.0s: 'test'  
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] received  
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] succeeded in 0.0s: 'test'

響應的,也可以通過remove方法和任務id進行刪除操作:

schduler.remove('job1')

任務明細的存儲形式上,也由列表升級成為了有序集合,提高了效率:

127.0.0.1:6379> type celery:beat:order_tasks  
zset  
127.0.0.1:6379> zrange celery:beat:order_tasks 0 -1  
1) "{\"py/reduce\": [{\"py/type\": \"celery.beat.ScheduleEntry\"}, {\"py/tuple\": [\"job1\", \"test_celery.sub\", {\"__reduce__\": [{\"py/type\": \"datetime.datetime\"}, [\"B+YHHBcMDgfyGg==\", {\"py/reduce\": [{\"py/function\": \"pytz._p\"}, {\"py/tuple\": [\"Asia/Shanghai\", 28800, 0, \"CST\"]}]}]], \"py/object\": \"datetime.datetime\"}, 43, {\"py/reduce\": [{\"py/type\": \"celery.schedules.schedule\"}, {\"py/tuple\": [{\"py/reduce\": [{\"py/type\": \"datetime.timedelta\"}, {\"py/tuple\": [0, 3, 0]}]}, false, null]}]}, {\"py/tuple\": []}, {}, {}]}]}"

至此,celery配合tornado打造異步定時任務就完成了。

結語

APScheduler長於靈活機動並可以依附於Tornado事件循環體系中,Celery則嫺於調度和分佈式的支持並相對獨立,二者不分軒輊,各擅勝場,適合不同的業務應用場景,當然,在異步定時任務執行異常時的處理策略也有很多方面需要完善,比如由於實例夯死導致的過時觸發問題、任務追趕和任務堆積問題、工作流場景下任務異常後是整體重試還是斷點續傳重試等,都需要具體問題具體分析。

原文轉載自「劉悦的技術博客」 https://v3u.cn/a_id_220

user avatar cyzf 头像 u_17400586 头像 openeuler 头像 seact 头像 woniuseo 头像 guixiangyyds 头像 yixiyidong 头像 weidewei 头像 romanticcrystal 头像 youyoufei 头像 yejianfeixue 头像 junxiudetuoba 头像
点赞 46 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.