以下文章来源于Python Web与Django开发 ,作者大江狗
Django Web项目中,我们经常需要执行耗时的任务比如发送邮件、调用第三方接口、批量处理文件等等,将这些任务异步化放在后台运行可以有效缩短请求响应时间。另外服务器上经常会有定时任务的需求,比如清除缓存、备份数据库等工作。
Celery是一个高效的异步任务队列,基于分布式消息传递的作业队列,可以轻松帮我们在Django项目中设置执行异步和周期性任务。
本文将详细演示如何在Django项目中集成Celery,设置执行异步和周期性任务,并总结下一些高级使用技巧和注意事项。
目录
Celery的工作原理安装项目依赖文件Celery配置测试Celery是否工作正常编写任务异步调用任务查看任务执行状态及结果设置定时和周期性任务配置文件添加任务Django Admin添加周期性任务通过Crontab设置定时任务启动任务调度器beatFlower监控任务执行状态Celery高级用法与注意事项给任务设置最大重试次数不同任务交由不同Queue处理忽略不想要的结果避免启动同步子任务Django的模型对象不应该作为参数传递使用on_commit函数处理事务小结
Celery的工作原理
Celery是一个高效的基于分布式消息传递的作业队列。它主要通过消息(messages)传递任务,通常使用一个叫Broker(中间人)来协调client(任务的发出者)和worker(任务的处理者)。clients发出消息到队列中,broker将队列中的信息派发给 Celery worker来处理。Celery本身不提供消息服务,它支持的消息服务(Broker)有RabbitMQ和Redis。小编一般推荐Redis,因为其在Django项目中还是首选的缓存后台。
整个工作流程如下所示:
安装项目依赖文件
本项目使用了最新Django(3.2)和Celery版本(Celery 5)。因为本项目使用Redis做消息队列的broker,所以还需要安装redis (Windows下安装和启动redis参见菜鸟教程)。另外如果你要设置定时或周期性任务,还需要安装django-celery-beat。
# pip安装必选
Django==3.2
celery==5.0.5
redis==3.5.3
# 可选,windows下运行celery 4以后版本,还需额外安装eventlet库
eventlet
# 推荐安装, 需要设置定时或周期任务时安装,推荐安装
django-celery-beat==2.2.0
# 视情况需要,需要存储任务结果时安装,视情况需要
django-celery-results==2.0.1
# 视情况需要,需要监控celery运行任务状态时安装
folower==0.9.7
Celery配置
在正式使用celery和django-celery-beat之前,你需要做基础的配置。假如你的Django项目文件夹布局如下所示,你首先需要在myproject/myproject目录下新增celery.py并修改__init__.py。
- myproject/
- manage.py
- project/
- __init__.py # 修改这个文件
- celery.py # 新增这个文件
- asgi.py
- settings.py
- urls.py
- wsgi.py
新建celery.py,添加如下代码:
import os
from celery import Celery
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 实例化
app = Celery('myproject')
# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()
# 一个测试任务
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
修改__init__.py,如下所示:
from .celery import app as celery_app
__all__ = ('celery_app',)
接下来修改Django项目的settings.py,添加Celery有关配置选项,如下所示:
# 最重要的配置,设置消息broker,格式为:db://user:password@host:port/dbname
# 如果redis安装在本机,使用localhost
# 如果docker部署的redis,使用redis://redis:6379
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
# celery时区设置,建议与Django settings中TIME_ZONE同样时区,防止时差
# Django设置时区需同时设置USE_TZ=True和TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE
其它Celery常用配置选项包括:
# 为django_celery_results存储Celery任务执行结果设置后台
# 格式为:db+scheme://user:password@host:port/dbname
# 支持数据库django-db和缓存django-cache存储任务状态及结果
CELERY_RESULT_BACKEND = "django-db"
# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT =['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
CELERY_RESULT_EXPIRES = xx
# 任务限流
CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
# Worker并发数量,一般默认CPU核数,可以不设置
CELERY_WORKER_CONCURRENCY = 2
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERY_WORKER_MAX_TASKS_PER_CHILD =200
完整配置选项见:
- https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_expires
注意:
- 在Django中正式编写和执行自己的异步任务前,一定要先测试redis和celery是否安装好并配置成功。
- 一个无限期阻塞的任务会使得工作单元无法再做其他事情,建议给任务设置超时时间。
测试Celery是否工作正常
首先你要启动redis服务。windows进入redis所在目录(比如C:\redis),使用redis-server.exe启动redis。Linux下使用./redis-server redis.conf启动,也可修改redis.conf将daemonize设置为yes, 确保守护进程开启。
启动redis服务后,你要先进入项目所在文件夹运行python manage.py runserver命令启动Django服务器(无需创建任何app),然后再打开一个终端terminal窗口输入celery命令,启动worker。
# Linux下测试,启动Celery
Celery -A myproject worker -l info
# Windows下测试,启动Celery
Celery -A myproject worker -l info -P eventlet
# 如果Windows下Celery不工作,输入如下命令
Celery -A myproject worker -l info --pool=solo
如果你能看到[tasks]下所列异步任务清单如debug_task,以及最后一句celery@xxxx ready, 说明你的redis和celery都配置好了,可以开始正式工作了。
-------------- celery@DESKTOP-H3IHAKQ v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2020-04-24 22:02:38
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: myproject:0x456d1f0
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -Eto monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. myproject.celery.debug_task
[2020-04-24 22:02:38,484: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-04-24 22:02:38,500: INFO/MainProcess] mingle: searching forneighbors
[2020-04-24 22:02:39,544: INFO/MainProcess] mingle: all alone
[2020-04-24 22:02:39,572: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.
[2020-04-24 22:02:39,578: WARNING/MainProcess] c:\users\missenka\pycharmprojects\django-static-html-generator\venv\lib\site-packages\celery\fixups\django.py:203: UserWarning: Using sett
ings.DEBUG leads to a memory
leak, never use this setting inproduction environments!
leak, never use this setting in production environments!''')
[2020-04-24 22:02:39,579: INFO/MainProcess] celery@DESKTOP-H3IHAKQ ready.
编写任务
Celery配置完成后,我们就可以编写任务了。Django项目中所有需要Celery执行的异步或周期性任务都放在tasks.py文件里,该文件可以位于project目录下,也可以位于各个app的目录下。专属于某个Celery实例化项目的task可以使用@app.task装饰器定义,各个app目录下可以复用的task建议使用@shared_task定义。
两个示例如下所示:
# myproject/tasks.py
# 专属于myproject项目的任务
app = Celery('myproject')
@ app.task
def test():
pass
# app/tasks.py, 可以复用的task
from celery import shared_task
import time
@shared_task
def add(x, y):
time.sleep(2)
return x + y
上面我们定义一个名为add的任务,它接收两个参数,并返回计算结果。为了模拟耗时任务,我们中途让其sleep 2秒。现在已经定义了一个耗时任务,我们希望在Django的视图或其它地方中以异步方式调用执行它,应该怎么做呢? 下面我们将给出答案。
注意:
- 使用celery定义任务时,避免在一个任务中调用另一个异步任务,容易造成阻塞。
- 当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject生成的Celery实例。然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。
异步调用任务
Celery提供了2种以异步方式调用任务的方法,delay和apply_async方法,如下所示:
# 方法一:delay方法
task_name.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
# 方法二:apply_async方法,与delay类似,但支持更多参数
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
我们接下来看一个具体的例子。我们编写了一个Django视图函数,使用delay方法调用add任务。
# app/views.py
from .tasks import add
def test_celery(request):
add.delay(3, 5)
return HttpResponse("Celery works")
# app/urls.py
urlpatterns = [
re_path(r'^test/