前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

Celery

作者头像
HammerZe
发布2022-05-09 19:03:13
4290
发布2022-05-09 19:03:13
举报
文章被收录于专栏:Hammer随笔Hammer随笔

Celery

官网

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

代码语言:javascript
复制
# 官网解释
"""
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
"""

Celery异步任务框架

代码语言:javascript
复制
"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

1
1
消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

Celery的安装配置

安装:pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

注意如果是windows平台还需要安装:pip install eventlet

两种celery任务结构:提倡用包管理,结构更清晰

方式一:简单使用

image-20220502204920533
image-20220502204920533
代码语言:javascript
复制
# 第一步:定义一个py文件(名字随意,celery_task) 
"""celery_task.py"""
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'   # 结果存储
broker = 'redis://127.0.0.1:6379/2'    # 消息中间件
app = Celery(__name__,broker=broker,backend=backend)  # __name__区分__main__

# 被它修饰,就变成了celery的任务
@app.task
def add(a,b):
    return a+b

# 第二步:提交任务(新建一个py文件:submit_task)
"""submit_task.py"""
from celery_task import add
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)
print(res)  # 2ddb35df-25f2-4f7c-8405-0bd7b1fa5645


# 第三步:任务执行单元执行,使用命令启动worker   
格式:celery -A 文件名  worker  -l 日志输出级别   (win平台+-P eventlet)
celery -A celery_task worker -l info -P eventlet
'''
celery_task:py文件的名字
-l info:日志输出级别是info 
-P eventlet  在win平台需要下载,pip  install  eventlet
'''
#如果队列里有任务,就会执行,如果没有任务,worker就等在这                                    
                                    
# 第四步:查询结果是否执行完成  get_result.py   
"""get_result.py"""
from celery_task import app

from celery.result import AsyncResult

id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful():
        result = asy.get()
        print(result)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

方法二:包管理结构(推荐)

随便定义包名,但是包内必须要有celery.py

代码语言:javascript
复制
1 包结构
	celery_task
        __init__.py
        celery.py
        course_task.py
        home_task.py
        user_task.py

步骤

  • 创建包,包下写celery.py文件,文件内写celery任务
代码语言:javascript
复制
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务
  • 任意位置提交任务
代码语言:javascript
复制
from celery_task.add_task import add
from celery_task.celery import app
res=add.delay(100,200)
print(res)
# 提交任务delay在任意位置提交就可以,只需将celery任务导过来即可
  • 启动worker包管理只需去包所在的根路径启动就可以了,不需要切换路径到包内去启动worker,因为包下有celery.py了
代码语言:javascript
复制
scripts> celery -A celery_task worker -l info -P eventlet
  • 查看结果
代码语言:javascript
复制
from celery_task import app

from celery.result import AsyncResult

id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful():
        result = asy.get()
        print(result)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

Celery主要处理三种任务

异步任务,延迟任务,定时任务

delay提交异步任务

上面的示例就是

apply_async提交延迟任务

代码语言:javascript
复制
# 其他不变,提交任务的时候,如下:
from celery_task.user_task import add
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=10)
# 参数传递需要使用args,传时间要使用时间对象eta,使用的是utc时间
mul.apply_async(args=(20, 50), eta=eta)

beat_schedule提交定时任务

定时任务需要启动beatworker

  • beat负责提交定时任务
  • worker负责提交celery任务
代码语言:javascript
复制
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务

# 时区
app.conf.timezone='Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc=False



#第一步:在celery.py中配置
# celery任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    '''
    任务名:{
    task:任务
    schedule:时间
    args:参数(函数参数)
    }
    '''
    'task-mul': {
        'task': 'celery_task.user_task.mul',
        'schedule': timedelta(seconds=3),  # 3s后
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 15),
    },
    'task-add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=10),  # 10s后
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 5),
    },

}


#第二步:启动beat(beat负责定时提交任务)
celery -A celery_task beat -l info
# 第三步:启动worker,任务就会被worker执行了
celery -A celery_task worker -l info -P eventlet
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Celery
    • 官网
      • Celery异步任务框架
        • Celery架构
          • 使用场景
            • Celery的安装配置
              • 两种celery任务结构:提倡用包管理,结构更清晰
                • 方式一:简单使用
                • 方法二:包管理结构(推荐)
            • Celery主要处理三种任务
              • delay提交异步任务
                • apply_async提交延迟任务
                  • beat_schedule提交定时任务
                  相关产品与服务
                  消息队列 TDMQ
                  消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                  http://www.vxiaotou.com