背景
在Django的开发过程中,难免会遇到一些请求十分地耗时,甚至会导致请求超时,因此决定采用异步任务的方式在后台执行这些任务。了解一番后,决定采用django+celery的方式来实现。
Celery
celery是基于python实现的一个分布式任务框架, 支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。
celery采用典型的生产者-消费者模式,主要由以下部分组成:
task(任务模块): 包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
broker(消息中间件): Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。celery本身不支持消息队列功能,它需要介质,比如rabbitmq、redis、mysql、mongodb都是可以的。推荐使用rabbitmq,他的速度和可用性都很高。本文不介绍相关消息队列介质工具的安装。
worker(任务处理单元): Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
backend(存储结果): Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。
celery在django中的使用
版本
celery截止到本文发布时已更新到4.2.2版本
在celery 3.2.x 以下的版本和celery 4.x 版本涉及到的配置方式以及使用方式有点不同。文章中的内容是介绍celery 3.2.以下版本 在django中如何配置,以及简单的运用。
关于celery4.x版本的配置可参见 Using Celery with Django
安装
requirements.txt:
# 分布式任务队列
celery==3.1.25
django-celery==3.2.2
flower==0.9.3
django-celery是django适配celery的python包,flower可用于监控任务的执行状态(可以不需要)
配置
基本配置
在django的配置文件(一般是settings.py)中进行celery基本的参数配置。
# settings.py from celery import platforms import djcelery djcelery.setup_loader() BROKER_URL= 'amqp://guest@localhost//' # rabbitmq地址,需要安装rabbitmq CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_TRACK_STARTED = True CELERY_SEND_EVENTS = True CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' platforms.C_FORCE_ROOT = True
其中,当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。
BROKER_URL和CELERY_RESULT_BACKEND分别指代你的Broker的代理地址以及Backend(result store)数据存储地址。这里设置的
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
代表着使用了django配置的数据库来进行存储。当然你也可以指定自定义的数据库来存储。注意,此处backend的设置是通过关键字CELERY_RESULT_BACKEND来配置,与一般的.py文件中实现celery的backend设置方式有所不同。一般的.py中是直接通过设置backend关键字来配置,如下所示:
app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')
然后其它参数,看你具体想实现什么功能具体配置。比如我之前的异步任务结果能够存储到数据库表中,但定时任务死活没有反应,任务是执行成功了,但结果没有保存到backend中,于是我加了个
CELERY_SEND_EVENTS = True
, 好像就可以了。将djcelery加入到apps中
INSTALLED_APPS = [ ... 'djcelery', ... ]
数据库表创建
python manage.py migrate
上述在django的数据库中创建celery相关的表,包括存储异步任务结果的celery_taskmeta
表和存储定时任务结果的djcelery_taskstate
。
任务使用
任务创建
在需要使用异步任务的APP目录下新建tasks.py。文件名必须为tasks.py,方能被djcelery.setup_loader()
捕获。
# tasks.py
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
import time
@task #一般的异步任务
def hello_celery(loop):
for i in range(loop):
print('hello')
time.sleep(2)
@periodic_task(run_every=crontab()) # 定时任务
def periodic_hello()
print('helo')
crontab
crontab()代表每分钟0秒时刻执行一次,用于定时任务
crontab一共有7个参数,常用有5个参数分别为:
minute:分钟,范围0-59;
hour:小时,范围0-23;
day_of_week:星期几,范围0-6。以星期天为开始,即0为星期天。这个星期几还可以使用英文缩写表示,例如“sun”表示星期天;
day_of_month:每月第几号,范围1-31;
month_of_year:月份,范围1-12。
- 具体某个值
crontab(minute=15)
,即每小时的15分时刻执行一次任务。直接指定某个时刻crontab(minute=0, hour=0)
,每天0点0分时刻执行任务crontab(minute='0,30')
,即每小时的0分和30分执行一次任务设置范围
crontab(minute='*', hour='9-12')
, 指定9点到12点每个小时的每分钟执行任务crontab(hour='9-12')
,*
是默认值,可以省略crontab(hour='9-12,20')
,指定9点到12点和20点中每分钟执行任务设置步长
crontab(minute='*/2')
orcrontab(minute='0-59/2')
, 每间隔2分钟就执行一次任务
more examples:
1 #每2个小时中每分钟执行1次任务
2 crontab(hour='*/2')
3
4 #每3个小时的0分时刻执行1次任务
5 #即[0,3,6,9,12,15,18,21]点0分
6 crontab(minute=0, hour='*/3')
7
8 #每3个小时或8点到12点的0分时刻执行1次任务
9 #即[0,3,6,9,12,15,18,21]+[8,9,10,11,12]点0分
10 crontab(minute=0, hour='*/3,8-12')
11
12 #每个季度的第1个月中,每天每分钟执行1次任务
13 #月份范围是1-12,每3个月为[1,4,7,10]
14 crontab(month_of_year='*/3')
15
16 #每月偶数天数的0点0分时刻执行1次任务
17 crontab(minute=0, hour=0, day_of_month='2-31/2')
18
19 #每年5月11号的0点0分时刻执行1次任务
20 crontab(0, 0, day_of_month='11', month_of_year='5')
任务调用
创建完任务后,我们可以在应用程序中使用delay()
或apply_async()
方法来调用任务
>>> from tasks import hello
>>> hello.delay()
<AsyncResult: 2272ddce-8be5-493f-b5ff-35a0d9fe600f>
任务队列启动
python manage.py runserver
python manage.py celery worker -c 4 --loglevel=info # 用于异步任务
python manage.py celery beat -l info # 用于定时任务
python manage.py celerycam #用于监测