Celery实现异步与定时任务
Celery是什么
celery是基于python的分布式任务队列的处理库
Celery的基本架构
- 消息中间件(broker)
- 任务执行单元(worker)
- 执行结果存储(backend)
Celery能做什么
- 异步任务
- 定时任务
异步任务
安装celery
pip install celery
场景模拟
在这里我们模拟一个程序阻塞的场景。新建app.py
# app.py
import time
def foo():
time.sleep(5)
return None
if __name__ == '__main__':
print("starting...")
foo()
print("done.")
这样,当执行app.py时,程序至少会等待5秒才能打印最后一个输出。我们的目的是希望当程序执行到 foo() 时,主线程不必等待,直接执行下一语句。
为此,新建tasks.py
from celery import Celery
import time
# 配置中间件与存储后端
broker = "redis://localhost/6379/0"
backend = "redis://localhost/6379/1"
# 创建celery实例
app = Celery('tasks', broker=broker, backend=backend)
@app.task
def foo():
time.sleep(5)
return None
Celery()的第一个参数表示当前模型。后面的参数表示中间件与存储后端,这里使用的是redis作为存储后端,当然也可以使用RabbitMQ消息中间件:
broker = "amqp://localhost"
通常可以使用RabbitMQ作为中间件、redis作为存储后端的组合方式
此时可以删除app.py中定义的foo()函数,修改为从tasks.py中导入:
from tasks import foo
修改foo()的调用方式
foo.delay()
运行celery
在终端输入:
celery worker -A tasks -l info
注意:确保redis服务已经启动。
运行示例
python app.py
此时可以看到,程序不再阻塞。
注意:如果在没有启动celery服务的情况下运行app.py,那么提交到redis中的任务没有被消费,下一次启动celery服务时,将会先执行redis中存储的任务。
项目结构
以上虽然已经能够正常运行,但是对于tasks.py来说,我们将配置与创建实例等命令都写在了一起,降低了可维护性。官方推荐将配置统一维护到一个配置文件中,通过某种方式加载配置。
当前的项目结构:
mypriject/
app.py
celery_app/
__init__.py
tasks.py
创建celery.py,用于控制celery的实例化:
from celery import Celery
app = Celery("celery_demo")
# 从celery_app/celeryconfig.py中导入配置
app.config_from_object("celery_app.celeryconfig")
注意:也可以在__init__.py中写入
我们将配置剥离出来。创建myproject/celery_app/celeryconfig.py:
from __future__ import absolute_import, unicode_literals
BROKER_URL = "redis://localhost/6379/0" # 中间件
CELERY_RESULT_BACKEND = "redis://localhost/6379/1" # 存储后端
CELERY_TIMEZONE = "Asia/Shanghai" # 时区
# 导入任务
CELERY_IMPORTS = [
"celery_app.tasks"
]
详细配置:https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html#configuration
此时的tasks.py:
from celery_app import app
import time
@app.task
def foo():
time.sleep(5)
return None
app.py也要修改:
# app.py
from celery_app import tasks
if __name__ == '__main__':
print("starting...")
tasks.foo.delay()
print("done.")
此时的项目结构:
myproject/
app.py
celery_app/
__init__.py
celeryconfig.py
tasks.py
运行celery
celery worker -A celery_app -l info
运行app.py
python app.py
定时任务
我们沿用上一个项目的结构,在此基础上加入定时任务。
首先,在tasks.py中添加需要定时的任务
# tasks.py
...
@app.task
def add(x, y):
return x + y
然后在配置文件中加入定时任务的配置。celery_app/celeryconfig.py
# celeryconfig.py
from datetime import timedelta
...
CELERYBEAT_SCHEDULE = {
"periodic_task": {
"task": "celery_app.tasks.add",
"schedule": timedelta(seconds=10),
"args": (3,4)
}
}
注意:定时任务依赖于timezone的设置,默认设置为”UTC”。
调度的时间设置,可以通过crontab类来设置,设置更为灵活。https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules
启动定时任务
celery beat -A celery_app -l info
执行任务
celery worker -A celery_app -l info
在终端输出的信息中,可以看到每十秒输出一个执行信息。
我们还可以将beat嵌入到worker中,一次性启动。但是并不建议在生产环境中使用。
celery worker -A celery_app -B
在Django中使用Celery
在django中使用celery其实仍然是使用和非django相同的API。
异步任务
异步任务有两种方式可以实现
- django-celery
- 直接使用celery
官方推荐直接使用celery,所以这里只记录第二种方法。
环境准备
安装redis
常用的消息中间件包括RabbitMQ、redis等。在这里将redis作为celery的任务队列,同时也能作为存储后端,安装方法略。
安装python的redis库
pip install redis
安装celery
pip install celery
创建项目
django-admin startproject myproject
cd myproject/
django-admin startapp myapp
新建myproject/myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 为celery设置默认的django settings模型
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 载入配置
# namespace表示配置变量需包含"CELERY_"前缀
app.config_from_object('django.conf.settings', namespace='CELERY')
# 从所有已注册的django应用程序中加载任务模块
app.autodiscover_tasks()
在myproject/myproject/__init__.py中导入这个app
from __future__ import absolute_import, unicode_literals
from myproject.celery import app as celery_app
__all__ = ('celery_app', )
为项目中的每一个app新建tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
视图中使用该任务
# views.py
from django.http import JSONResponse
from myapp.tasks import add
def handler(request):
result = add.delay(3,4)
return JSONResponse(result)
启动celery
celery worker -A myproject -l info