1 minute read

Celery是什么

celery是基于python的分布式任务队列的处理库

Celery的基本架构

  • 消息中间件(broker)
  • 任务执行单元(worker)
  • 执行结果存储(backend)

Celery能做什么

  • 异步任务
  • 定时任务

异步任务

安装celery

pip install celery

场景模拟

在这里我们模拟一个程序阻塞的场景。新建app.py

# app.py

import time

def foo():
    time.sleep(5)
    return None

if __name__ == '__main__':
    print("starting...")
    foo()
    print("done.")

这样,当执行app.py时,程序至少会等待5秒才能打印最后一个输出。我们的目的是希望当程序执行到 foo() 时,主线程不必等待,直接执行下一语句。

为此,新建tasks.py

from celery import Celery
import time

# 配置中间件与存储后端
broker = "redis://localhost/6379/0"
backend = "redis://localhost/6379/1"

# 创建celery实例
app = Celery('tasks', broker=broker, backend=backend)

@app.task
def foo():
    time.sleep(5)
    return None

Celery()的第一个参数表示当前模型。后面的参数表示中间件与存储后端,这里使用的是redis作为存储后端,当然也可以使用RabbitMQ消息中间件:

broker = "amqp://localhost"

通常可以使用RabbitMQ作为中间件、redis作为存储后端的组合方式

此时可以删除app.py中定义的foo()函数,修改为从tasks.py中导入:

from tasks import foo

修改foo()的调用方式

foo.delay()

运行celery

在终端输入:
celery worker -A tasks -l info

注意:确保redis服务已经启动。

运行示例

python app.py

此时可以看到,程序不再阻塞。

注意:如果在没有启动celery服务的情况下运行app.py,那么提交到redis中的任务没有被消费,下一次启动celery服务时,将会先执行redis中存储的任务。

项目结构

以上虽然已经能够正常运行,但是对于tasks.py来说,我们将配置与创建实例等命令都写在了一起,降低了可维护性。官方推荐将配置统一维护到一个配置文件中,通过某种方式加载配置。

当前的项目结构:

mypriject/
    app.py
    celery_app/
        __init__.py
        tasks.py

创建celery.py,用于控制celery的实例化:

from celery import Celery

app = Celery("celery_demo")

# 从celery_app/celeryconfig.py中导入配置
app.config_from_object("celery_app.celeryconfig")

注意:也可以在__init__.py中写入

我们将配置剥离出来。创建myproject/celery_app/celeryconfig.py:

from __future__ import absolute_import, unicode_literals

BROKER_URL = "redis://localhost/6379/0"     # 中间件
CELERY_RESULT_BACKEND = "redis://localhost/6379/1"  # 存储后端
CELERY_TIMEZONE = "Asia/Shanghai"       # 时区

# 导入任务
CELERY_IMPORTS = [
    "celery_app.tasks"
]

详细配置:https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html#configuration

此时的tasks.py:

from celery_app import app
import time

@app.task
def foo():
    time.sleep(5)
    return None

app.py也要修改:

# app.py

from celery_app import tasks

if __name__ == '__main__':
    print("starting...")
    tasks.foo.delay()
    print("done.")

此时的项目结构:

myproject/
    app.py
    celery_app/
        __init__.py
        celeryconfig.py
        tasks.py

运行celery

celery worker -A celery_app -l info

运行app.py

python app.py

定时任务

我们沿用上一个项目的结构,在此基础上加入定时任务。

首先,在tasks.py中添加需要定时的任务

# tasks.py

...

@app.task
def add(x, y):
    return x + y

然后在配置文件中加入定时任务的配置。celery_app/celeryconfig.py

# celeryconfig.py
from datetime import timedelta

...

CELERYBEAT_SCHEDULE = {
    "periodic_task": {
        "task": "celery_app.tasks.add",
        "schedule": timedelta(seconds=10),
        "args": (3,4)
    }
}

注意:定时任务依赖于timezone的设置,默认设置为”UTC”。

调度的时间设置,可以通过crontab类来设置,设置更为灵活。https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules

启动定时任务

celery beat -A celery_app -l info

执行任务

celery worker -A celery_app -l info

在终端输出的信息中,可以看到每十秒输出一个执行信息。

我们还可以将beat嵌入到worker中,一次性启动。但是并不建议在生产环境中使用。

celery worker -A celery_app -B

在Django中使用Celery

在django中使用celery其实仍然是使用和非django相同的API。

异步任务

异步任务有两种方式可以实现

  • django-celery
  • 直接使用celery

官方推荐直接使用celery,所以这里只记录第二种方法。

环境准备

安装redis

常用的消息中间件包括RabbitMQredis等。在这里将redis作为celery的任务队列,同时也能作为存储后端,安装方法略。

安装python的redis库

pip install redis

安装celery

pip install celery

创建项目

django-admin startproject myproject
cd myproject/
django-admin startapp myapp

新建myproject/myproject/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 为celery设置默认的django settings模型
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 载入配置
# namespace表示配置变量需包含"CELERY_"前缀
app.config_from_object('django.conf.settings', namespace='CELERY')

# 从所有已注册的django应用程序中加载任务模块
app.autodiscover_tasks()

myproject/myproject/__init__.py中导入这个app

from __future__ import absolute_import, unicode_literals
from myproject.celery import app as celery_app

__all__ = ('celery_app', )

为项目中的每一个app新建tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

视图中使用该任务

# views.py
from django.http import JSONResponse
from myapp.tasks import add

def handler(request):
    result = add.delay(3,4)
    return JSONResponse(result)

启动celery

celery worker -A myproject -l info