美文网首页WEB前端
在 Django 项目中使用 Celery

在 Django 项目中使用 Celery

作者: Maslino | 来源:发表于2016-11-20 14:56 被阅读2976次

Celery 先前的版本需要额外安装一个库才能与 Django 集成,但是自3.1版本开始,再也不需要了。现在 Celery 直接支持 Django 了,本文提供一个比较基本的方法将 Celery 集成到 Django 项目中。你将使用与非 Django 用户同样的API,所以在阅读本文之前最好看一下Celery 初步。当你完成一个可以正常运行的例子后,再看看Celery 进阶

为了在 Django 项目中使用 Celery,必须先定义一个 Celery 实例(也叫做 app)。

假如 Django 项目布局是这样的:

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py

那么,推荐的做法是创建一个新的 proj/proj/celery.py 模块,然后在这个模块中定义 Celery 实例。

file: proj/proj/celery.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

接着,需要在 proj/proj/__init__.py 模块中导入这个 Celery 实例(也就是 app)。这样可以确保当 Django 启动时可以加载这个 app,并且 @shared_task 装饰器(后面会提到)也能使用这个 app.

file: proj/proj/__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

需要注意的是,上述项目布局示例适合大型项目。对于简单的项目,你可以在一个模块中同时定义 Celery 实例和任务,就像Celery 初步里面那样。

我们看看在 proj/proj/celery.py 模块中到底做了什么事。首先,从 future 模块导入 absolute_import,这样,celery.py 模块就不会与 Celery 库相冲突:

from __future__ import absolute_import

然后,为 celery 命令行程序设置环境变量 DJANGO_SETTINGS_MODULE 的默认值:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

设置这个环境变量是为了让 celery 命令能找到 Django 项目。这条语句必须出现在 Celery 实例创建之前,也就是接下来要做的:

app = Celery('proj')

这个 app 就是 Celery 实例。可以有很多 Celery 实例,但是当使用 Django 时,似乎没有必要。

我们也将 Django settings 模块作为 Celery 的配置来源。也就是说,不需要使用多个配置文件,直接在 Django settings 里面配置 Celery.

可以将 settings 对象作为参数传入,但是更好的方式是使用字符串,因为当使用 Windows 系统或者 execv 时 celery worker 不需要序列化 settings 对象:

app.config_from_object('django.conf:settings')

为了重用 Django APP,通常是在单独的 tasks.py 模块中定义所有任务。Celery 会自动发现这些模块:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

加上上一句后,Celery 会自动发现 Django APP 中定义的任务,前提是遵循如下 tasks.py 约定:

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

这样就不需要手动把一个个模块加到 CELERY_IMPORTS 配置中。传入的 lambda 函数有如下好处:只在需要的时候才自动发现任务,以及当导入模块时不需要立即对 settings 对象求值。

最后,debug_task 是一个打印本身 request 信息的任务。它使用了在 Celery 3.1引入的任务选项 bind=True,使得引用当前任务实例变得很容易。

使用 @shared_task 装饰器

你很可能在可重用的 Django APP 中编写了一些任务,但是 Django APP 不能依赖于具体的 Django 项目,所以你无法直接导入 Celery 实例。

@shared_task 装饰器能让你在没有具体的 Celery 实例时创建任务:

file: demoapp/tasks.py:

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

另请参阅

你可以在这里找到这个 Django 示例项目的完整源码。

使用 Django ORM/Cache 作为结果存储后端(result backend)

如果想在 Django 数据库中保存任务执行结果,还需要安装 django-celery 库(或者使用 SQLAlchemy 结果存储后端)。

django-celery 库基于 Django ORM和缓存框架实现了结果存储后端.

为了在项目中使用该扩展,遵循如下四步:

  1. 安装 django-celery 库

    <pre class=”brush: bash; gutter: false;”>
    $ pip install django-celery
    </pre>

  2. 把 djcelery 加到 INSTALLED_APPS 中

  3. 创建 celery 用到的数据库表

    当使用数据库作为结果存储后端时,这一步会创建用来保存任务结果的相关数据库表,以及周期任务调度器需要使用的数据库表。如果不使用周期任务和任务结果,可以跳过这一步。

    如果使用 south 来做模式迁移,执行:

    <pre class=”brush: bash; gutter: false;”>
    $ python manage.py migrate djcelery
    </pre>

    如果不使用 south,执行:

    <pre class=”brush: bash; gutter: false;”>
    $ python manage.py syncdb
    </pre>

  4. 配置 celery 使用 django-celery 结果存储后端

    对于数据库后端,使用:

    <pre class=”brush: python; gutter: false;”>
    app.conf.update(
    CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
    )
    </pre>

    对于缓存后端,使用:

    <pre class=”brush: python; gutter: false;”>
    app.conf.update(
    CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend',
    )
    </pre>

    如果你将 Django settings 作为 Celery 的配置来源,可以直接在 settings 模块中加上 CELERY_RESULT_BACKEND 配置项,不需要 app.conf.update

相对导入

在导入任务模块时,必须保持一致性。也就是说,如果在 INSTALLED_APPS 中添加的是 project.app,那么需要以 from project.app 的方式导入任务,否则任务的名称将会不一样。
参见自动命名与相对导入

启动 worker 进程

在生产环境中,你希望在后台以守护进程的方式运行 worker(参见以守护进程运行 worker),但是在测试开发环境中,像 Django 的 runserver 那样使用 celery worker 管理命令来启动一个 worker 实例,就有用多了:

$ celery worker -A proj -l info

要查看 celery 的完整命令行选项,使用 help 命令:

$ celery help

接下来怎么办

如果你想学习更多东西,请参考Celery 进阶教程,然后你可以学习用户指南

相关文章

网友评论

  • 14b12e89657b:博主,怎么设置task_id啊,PeriodicTask这个model并无设置task_id的字段。
    class PeriodicTask(models.Model):
    name = models.CharField(
    _('name'), max_length=200, unique=True,
    help_text=_('Useful description'),
    )
    task = models.CharField(_('task name'), max_length=200)
    interval = models.ForeignKey(
    IntervalSchedule,
    null=True, blank=True, verbose_name=_('interval'),
    on_delete=models.CASCADE,
    )
    crontab = models.ForeignKey(
    CrontabSchedule, null=True, blank=True, verbose_name=_('crontab'),
    on_delete=models.CASCADE,
    help_text=_('Use one of interval/crontab'),
    )
    args = models.TextField(
    _('Arguments'), blank=True, default='[]',
    help_text=_('JSON encoded positional arguments'),
    )
    kwargs = models.TextField(
    _('Keyword arguments'), blank=True, default='{}',
    help_text=_('JSON encoded keyword arguments'),
    )
    queue = models.CharField(
    _('queue'), max_length=200, blank=True, null=True, default=None,
    help_text=_('Queue defined in CELERY_QUEUES'),
    )
    exchange = models.CharField(
    _('exchange'), max_length=200, blank=True, null=True, default=None,
    )
    routing_key = models.CharField(
    _('routing key'), max_length=200, blank=True, null=True, default=None,
    )
    expires = models.DateTimeField(
    _('expires'), blank=True, null=True,
    )
    enabled = models.BooleanField(
    _('enabled'), default=True,
    )
    last_run_at = models.DateTimeField(
    auto_now=False, auto_now_add=False,
    editable=False, blank=True, null=True,
    )
    total_run_count = models.PositiveIntegerField(
    default=0, editable=False,
    )
    date_changed = models.DateTimeField(auto_now=True)
    description = models.TextField(_('description'), blank=True)

    objects = managers.PeriodicTaskManager()
    no_changes = False
  • 大僧鸠摩智:官网的翻译一遍有意思吗
    8a2acd1fd414:兄弟,博主翻译也是付出,要不你写一遍自己总结的呗再来怼

本文标题:在 Django 项目中使用 Celery

本文链接:https://www.haomeiwen.com/subject/pwcypttx.html