介绍
Celery 分布式任务队列
Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具。
Celery 是一款消息队列工具,可用于处理实时数据以及任务调度。
Celery 拥有大量的用户和贡献者社区,您可以通过IRC频道或 邮件列表 加入我们。
Celery 是根据 BSD License 进行开源的
初次使用
选择并且安装一个消息中间件(Broker)
安装 Celery 并且创建第一个任务
运行职程(Worker)以及调用任务
跟踪任务的情况以及返回值
安装中间件【redis】
docker 安装
docker run -d -p 6379:6379 redis
安装celery
pip install celery
简单应用
创建tasks.py 内容如下
from celery import Celeryapp = Celery('tasks',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2')@app.taskdef add(x, y): return x+y
启动后端Worker 守护进程
$ celery -A tasks worker --loglevel=info-------------- celery@it-04-liumeng-Mac-min-2018.local v4.4.7 (cliffs)--- ***** ----- -- ******* ---- macOS-10.14.6-x86_64-i386-64bit 2022-03-08 11:11:40- *** --- * --- - ** ---------- [config]- ** ---------- .> app: tasks:0x10b95cfd0- ** ---------- .> transport: redis://127.0.0.1:6379/1- ** ---------- .> results: redis://127.0.0.1:6379/2- *** --- * --- .> concurrency: 6 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add[2022-03-08 11:11:40,339: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2022-03-08 11:11:40,348: INFO/MainProcess] mingle: searching for neighbors[2022-03-08 11:11:41,375: INFO/MainProcess] mingle: all alone[2022-03-08 11:11:41,403: INFO/MainProcess] celery@it-04-liumeng-Mac-min-2018.local ready.
执行创建的任务
python>>> from tasks import add>>> add.deley(4,6)
回去查看执行的结果
[2022-03-08 11:31:37,539: INFO/ForkPoolWorker-4] Task tasks.add[145a9a49-0eb1-4bad-aa95-d825d3bea3f6] succeeded in 0.0025149129999135766s: 10
高级应用
celery 结合 schedule 实现定时执行
创建结构目录如下
91t_tools--tools----tasks.pyconfig.pymain.py
创建main.py 文件
from celery import Celery# Celery的参数是当前项目的名称app = Celery('91t_tools')# 加载配置文件app.config_from_object('config')
创建config.py文件
from celery.schedules import crontab#配置链接地址broker_url ='redis://127.0.0.1:6379/1'#导入执行的模块的tasks.py文件imports = [ 'tools.tasks']#配置schedules参数beat_schedule = { 'add': { 'task': 'tools.tasks.add', #tasks中定义的函数 'schedule': crontab(minute='*/1'),#每分钟执行 'args': (4,4) }}
tools模块中的tasks.py 文件
from main import app@app.taskdef add(x, y): return x+y
执行启动命令如果计划任务一起启动必须加上--beat
celery -A main worker -l info --beat
celery 监控 flower
安装 flower
pip install flower
启动哨兵监控模式
celery -A main flower
启动成功后如下:
版权声明:内容来源于互联网和用户投稿 如有侵权请联系删除