python 异步执行框架celery实战

发布一下 0 0

介绍

Celery 分布式任务队列

Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具。

Celery 是一款消息队列工具,可用于处理实时数据以及任务调度。

Celery 拥有大量的用户和贡献者社区,您可以通过IRC频道或 邮件列表 加入我们。

Celery 是根据 BSD License 进行开源的

初次使用

选择并且安装一个消息中间件(Broker)

安装 Celery 并且创建第一个任务

运行职程(Worker)以及调用任务

跟踪任务的情况以及返回值

安装中间件【redis】


docker 安装

docker run -d -p 6379:6379 redis

安装celery

pip install celery

简单应用

创建tasks.py 内容如下

from celery import Celeryapp = Celery('tasks',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2')@app.taskdef add(x, y):    return x+y

启动后端Worker 守护进程

$ celery -A tasks worker --loglevel=info-------------- celery@it-04-liumeng-Mac-min-2018.local v4.4.7 (cliffs)--- ***** ----- -- ******* ---- macOS-10.14.6-x86_64-i386-64bit 2022-03-08 11:11:40- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         tasks:0x10b95cfd0- ** ---------- .> transport:   redis://127.0.0.1:6379/1- ** ---------- .> results:     redis://127.0.0.1:6379/2- *** --- * --- .> concurrency: 6 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** -----  -------------- [queues]                .> celery           exchange=celery(direct) key=celery                [tasks]  . tasks.add[2022-03-08 11:11:40,339: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2022-03-08 11:11:40,348: INFO/MainProcess] mingle: searching for neighbors[2022-03-08 11:11:41,375: INFO/MainProcess] mingle: all alone[2022-03-08 11:11:41,403: INFO/MainProcess] celery@it-04-liumeng-Mac-min-2018.local ready.

执行创建的任务

python>>> from tasks import add>>> add.deley(4,6)

回去查看执行的结果

[2022-03-08 11:31:37,539: INFO/ForkPoolWorker-4] Task tasks.add[145a9a49-0eb1-4bad-aa95-d825d3bea3f6] succeeded in 0.0025149129999135766s: 10

高级应用

celery 结合 schedule 实现定时执行

创建结构目录如下

91t_tools--tools----tasks.pyconfig.pymain.py

创建main.py 文件

from celery import Celery# Celery的参数是当前项目的名称app = Celery('91t_tools')# 加载配置文件app.config_from_object('config')

创建config.py文件

from celery.schedules import crontab#配置链接地址broker_url ='redis://127.0.0.1:6379/1'#导入执行的模块的tasks.py文件imports = [    'tools.tasks']#配置schedules参数beat_schedule = {    'add': {        'task': 'tools.tasks.add', #tasks中定义的函数        'schedule': crontab(minute='*/1'),#每分钟执行        'args': (4,4)    }}

tools模块中的tasks.py 文件

from main import app@app.taskdef add(x, y):    return x+y

执行启动命令如果计划任务一起启动必须加上--beat

celery -A main worker  -l info --beat

celery 监控 flower

安装 flower

 pip install flower

启动哨兵监控模式

celery -A main flower

启动成功后如下:

python 异步执行框架celery实战

版权声明:内容来源于互联网和用户投稿 如有侵权请联系删除

本文地址:http://0561fc.cn/71612.html