Django使用Celery

官方文档:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html

下载github上Celery源码项目

https://github.com/celery/celery
下载下来之后解压,找到Django的案例:celery-master\examples\django

pycharm打开案例项目

pycharm打开该项目

会提示

sqlalchemy是python的ORM框架。

celery.py

_proj/proj/celery.py_:

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') 

app = Celery('proj')  # 构建celery对象

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.

app.config_from_object('django.conf:settings', namespace='CELERY')  # 从settings中根据namespace获取配置信息

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()  # 自动加载任务


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')  # 格式化打印

__init__.py

是项目的初始化文件
This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it,目的是只限制celery被外界访问。

在其中添加:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

settings.py

除了Django原有的配置参数,还有配置Celery的参数:

import os

# ^^^ The above is required if you want to import from the celery
# library.  If you don't have this then `from celery.schedules import`
# becomes `proj.celery.schedules` in Python 2.x since it allows
# for relative imports by default.

# Celery settings

CELERY_BROKER_URL = 'redis://localhost:6379/1'  # 连接消息容器

#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']   # 接受的数据类型
CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'  # 存储任务的结果,执行状态的数据库位置
CELERY_TASK_SERIALIZER = 'json'  # 序列化格式是Json

task.py

# Create your tasks here
from time import sleep

from celery import shared_task
from demoapp.models import Widget


@shared_task
def add(x, y):
    print('计算中。。。')
    sleep(5)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

urls.py

from django.conf.urls import handler404, handler500, include, url  # noqa

# Uncomment the next two lines to enable the admin:
# from django.contrib import admin
# admin.autodiscover()
from demoapp import views

urlpatterns = [
    url(r'^index/', views.index),
    url(r'^asyncs/', views.asyncs),
]

views.py

# Create your views here.
from django.http import HttpResponse

from demoapp.tasks import add


def index(request):  # 非异步
    result = add(3,2)
    return HttpResponse(result)


def asyncs(request):  # 异步调用任务
    result = add.delay(3, 4)
    return HttpResponse(result)

迁移数据库

生成迁移文件,迁移数据库

启动celery

celery -A proj worker --loglevel=info --pool=solo

(venv) E:\Celery\django>celery -A proj worker --loglevel=info --pool=solo

 -------------- celery@DESKTOP-MOK5PHF v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2021-02-06 22:37:31
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x45a5f30
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     sqlite:///results.sqlite
- *** --- * --- .> concurrency: 12 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . demoapp.tasks.add
  . demoapp.tasks.count_widgets
  . demoapp.tasks.mul
  . demoapp.tasks.rename_widget
  . demoapp.tasks.xsum
  . proj.celery.debug_task

[2021-02-06 22:37:31,158: INFO/MainProcess] Connected to redis://localhost:6379/1
[2021-02-06 22:37:31,166: INFO/MainProcess] mingle: searching for neighbors
[2021-02-06 22:37:32,192: INFO/MainProcess] mingle: all alone
[2021-02-06 22:37:32,204: WARNING/MainProcess] e:\dj\venv\lib\site-packages\celery\fixups\django.py:204: UserWarning: Usin
g settings.DEBUG leads to a memory
            leak, never use this setting in production environments!
  leak, never use this setting in production environments!''')

[2021-02-06 22:37:32,205: INFO/MainProcess] celery@DESKTOP-MOK5PHF ready.
  • 18-24行显示当前项目有如下任务
  • 29行警告:提示不要在生产环境中使用调试状态,需要在settings.py中将DEBUG改为False
  • ALLOWED_HOSTS需要修改成ALLOWED_HOSTS = [`'*'`]

测试

启动Django项目

非异步测试

访问index页面:


首先加载等待5s


之后才返回计算结果。

异步测试



访问asyncs页面,页面直接显示任务对应的唯一id


在run中也可以看到对应的打印字样:


终端中也能看到异步执行的结果:

[2021-02-07 01:04:56,835: INFO/MainProcess] Connected to redis://localhost:6379/1
[2021-02-07 01:04:56,843: INFO/MainProcess] mingle: searching for neighbors
[2021-02-07 01:04:57,869: INFO/MainProcess] mingle: all alone
[2021-02-07 01:04:58,338: INFO/MainProcess] celery@DESKTOP-MOK5PHF ready.
[2021-02-07 01:06:51,033: INFO/MainProcess] Received task: demoapp.tasks.add[c58ae751-9596-4333-90a4-9389016a3681]
[2021-02-07 01:06:51,033: WARNING/MainProcess] 计算中。。。
[2021-02-07 01:06:56,527: INFO/MainProcess] Task demoapp.tasks.add[c58ae751-9596-4333-90a4-9389016a3681] succeeded in 5.48
499999998603s: 7

查看异步任务存储文件

在settings.py中:CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'  # 存储任务的结果,执行状态的数据库位置
所以会生成一个数据库,查看:


可以看到这张表,存储了任务id,执行状态,执行结果等信息,并且执行结果不是以直接结果进行保存的,而是经过了某种编码。

下面研究使用异步技术,在RESTEND项目中添加异步邮件服务。

异步邮件服务

打开RESTEND项目

创建新app

python manage.py startapp sendemail

将celery导入进新建django应用中

添加celery.py并修改

首先将celery.py放进RESTend的项目目录中:


接着修改celery.py中的设置:

  • 6,8两行需要修改成对应项目名

    import os
    
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'RESTend.settings')
    
    app = Celery('RESTend')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
      print(f'Request: {self.request!r}')
    

    修改__init__.py

    修改RESTend项目下的__init__.py,添加对应的配置:

    import pymysql
    
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    pymysql.install_as_MySQLdb()

    安装Celery的Django扩展库

    我们需要将异步执行的结果状态等信息通过ORM存储到redis中(默认是sqlite),需要安装额外的扩展库。
    The django-celery-results extension provides result backends using either the Django ORM, or the Django Cache framework.
    可以将异步任务执行后的结果加入缓存,或者以其他形式保存下来。

安装命令:

pip install django-celery-results

修改setting.py

添加以下关于celery的参数:

# Celery settings

CELERY_BROKER_URL = 'redis://localhost:6379/1'

#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'  # 这里在后面配置存储形式时会修改,注意。
CELERY_TASK_SERIALIZER = 'json'

在INSTALLED_APPS中添加django-celery-results和刚创建的新app:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'rest_framework',
    'django.contrib.staticfiles',
    'App',
    'django_celery_results',
    'sendemail',
]

迁移django_celery_results数据库

python manage.py makemigrations
python manage.py migrate
python manage.py migrate django_celery_results

配置存储形式

Configure Celery to use the django-celery-results backend.

Assuming you are using Django’s settings.py to also configure Celery, add the following settings:
CELERY_RESULT_BACKEND = 'django-db'  # 存储在django-db中
For the cache backend you can use:
CELERY_RESULT_BACKEND = 'django-cache'

这里实验使用存储在django-db中。

下面首先写一些简单功能测试项目能否正常运行。

新建tasks任务

新建tasks.py:

from time import sleep

from celery import shared_task


@shared_task  # 装饰器
def add(a, b):
    print('好困啊')
    sleep(5)

    return a + b

新建urls.py

from django.urls import path

from sendemail import views

urlpatterns = [
    path(r'index/', views.index),
    path(r'asyncs/', views.asyncs),
]

总urls.py

注册刚刚创建的urls

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('app/', include('App.urls')),
    path('send/', include('sendemail.urls')),
]

views

from django.http import HttpResponse
from django.shortcuts import render

# Create your views here.
from sendemail.tasks import add

def index(request):
    result = add(5, 6)
    return HttpResponse(result)

def asyncs(request):
    result_id = add.delay(6, 8)
    return HttpResponse(result_id)

测试项目可用性

非异步测试



异步测试




都成功了,并且在mysql数据库中我们也能看到异步执行的结果,并且显示正常(sqlite显示不正常):

编写发送邮件任务

task.py

在tasks.py中写入发送邮件的代码:

from time import sleep

from celery import shared_task
from django.core.mail import send_mail


@shared_task
def send_email(receive):
    subject = "老铁你好"  # 主题
    message = '老铁双击666'  # 消息
    from_email = 'xxxxxxx@qq.com'  #  发件人
    recepient_list = (receive, )  # 收件人列表
    send_mail(subject, message, from_email)

settings.py配置邮件参数

EMAIL_HOST_USER = 'xxxxxxx@qq.com'  # 发送邮箱账户
EMAIL_HOST_PASSWORD = 'xxxxxx'  # 授权码
EMAIL_HOST = 'smtp.qq.com'
EMAIL_PORT = 587
EMAIL_USE_TLS = True

urls

urls添加:

path(r'email/', views.email),

views

from django.http import HttpResponse
from django.shortcuts import render
from sendemail.tasks import add, send_email

def email(request):
    mail_address = request.GET.get('address')
    send_result = send_email.delay(mail_address)
    return HttpResponse(send_result)

测试发送邮件功能

重启celery和项目

访问页面:

页面返回id,邮箱也接收到了邮件,异步发送成功。

最后修改:2024 年 03 月 13 日
如果觉得我的文章对你有用,请随意赞赏