Django使用Celery
官方文档:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
下载github上Celery源码项目
https://github.com/celery/celery
下载下来之后解压,找到Django的案例:celery-master\examples\django
pycharm打开案例项目
pycharm打开该项目
会提示
sqlalchemy是python的ORM框架。
celery.py
_proj/proj/celery.py_:
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj') # 构建celery对象
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') # 从settings中根据namespace获取配置信息
# Load task modules from all registered Django app configs.
app.autodiscover_tasks() # 自动加载任务
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}') # 格式化打印
__init__.py
是项目的初始化文件
This ensures that the app is loaded when Django starts so that the @shared_task
decorator (mentioned later) will use it,目的是只限制celery被外界访问。
在其中添加:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
settings.py
除了Django原有的配置参数,还有配置Celery的参数:
import os
# ^^^ The above is required if you want to import from the celery
# library. If you don't have this then `from celery.schedules import`
# becomes `proj.celery.schedules` in Python 2.x since it allows
# for relative imports by default.
# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379/1' # 连接消息容器
#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json'] # 接受的数据类型
CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' # 存储任务的结果,执行状态的数据库位置
CELERY_TASK_SERIALIZER = 'json' # 序列化格式是Json
task.py
# Create your tasks here
from time import sleep
from celery import shared_task
from demoapp.models import Widget
@shared_task
def add(x, y):
print('计算中。。。')
sleep(5)
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
urls.py
from django.conf.urls import handler404, handler500, include, url # noqa
# Uncomment the next two lines to enable the admin:
# from django.contrib import admin
# admin.autodiscover()
from demoapp import views
urlpatterns = [
url(r'^index/', views.index),
url(r'^asyncs/', views.asyncs),
]
views.py
# Create your views here.
from django.http import HttpResponse
from demoapp.tasks import add
def index(request): # 非异步
result = add(3,2)
return HttpResponse(result)
def asyncs(request): # 异步调用任务
result = add.delay(3, 4)
return HttpResponse(result)
迁移数据库
启动celery
celery -A proj worker --loglevel=info --pool=solo
(venv) E:\Celery\django>celery -A proj worker --loglevel=info --pool=solo
-------------- celery@DESKTOP-MOK5PHF v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2021-02-06 22:37:31
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x45a5f30
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results: sqlite:///results.sqlite
- *** --- * --- .> concurrency: 12 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. demoapp.tasks.add
. demoapp.tasks.count_widgets
. demoapp.tasks.mul
. demoapp.tasks.rename_widget
. demoapp.tasks.xsum
. proj.celery.debug_task
[2021-02-06 22:37:31,158: INFO/MainProcess] Connected to redis://localhost:6379/1
[2021-02-06 22:37:31,166: INFO/MainProcess] mingle: searching for neighbors
[2021-02-06 22:37:32,192: INFO/MainProcess] mingle: all alone
[2021-02-06 22:37:32,204: WARNING/MainProcess] e:\dj\venv\lib\site-packages\celery\fixups\django.py:204: UserWarning: Usin
g settings.DEBUG leads to a memory
leak, never use this setting in production environments!
leak, never use this setting in production environments!''')
[2021-02-06 22:37:32,205: INFO/MainProcess] celery@DESKTOP-MOK5PHF ready.
- 18-24行显示当前项目有如下任务
- 29行警告:提示不要在生产环境中使用调试状态,需要在settings.py中将
DEBUG
改为False ALLOWED_HOSTS
需要修改成ALLOWED_HOSTS = [
`'*'`]
测试
非异步测试
异步测试
访问asyncs页面,页面直接显示任务对应的唯一id
在run中也可以看到对应的打印字样:
终端中也能看到异步执行的结果:
[2021-02-07 01:04:56,835: INFO/MainProcess] Connected to redis://localhost:6379/1
[2021-02-07 01:04:56,843: INFO/MainProcess] mingle: searching for neighbors
[2021-02-07 01:04:57,869: INFO/MainProcess] mingle: all alone
[2021-02-07 01:04:58,338: INFO/MainProcess] celery@DESKTOP-MOK5PHF ready.
[2021-02-07 01:06:51,033: INFO/MainProcess] Received task: demoapp.tasks.add[c58ae751-9596-4333-90a4-9389016a3681]
[2021-02-07 01:06:51,033: WARNING/MainProcess] 计算中。。。
[2021-02-07 01:06:56,527: INFO/MainProcess] Task demoapp.tasks.add[c58ae751-9596-4333-90a4-9389016a3681] succeeded in 5.48
499999998603s: 7
查看异步任务存储文件
在settings.py中:CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' # 存储任务的结果,执行状态的数据库位置
所以会生成一个数据库,查看:
可以看到这张表,存储了任务id,执行状态,执行结果等信息,并且执行结果不是以直接结果进行保存的,而是经过了某种编码。
下面研究使用异步技术,在RESTEND项目中添加异步邮件服务。
异步邮件服务
创建新app
python manage.py startapp sendemail
将celery导入进新建django应用中
添加celery.py并修改
首先将celery.py放进RESTend的项目目录中:
接着修改celery.py中的设置:
6,8两行需要修改成对应项目名
import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'RESTend.settings') app = Celery('RESTend') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
修改__init__.py
修改RESTend项目下的__init__.py,添加对应的配置:
import pymysql from .celery import app as celery_app __all__ = ('celery_app',) pymysql.install_as_MySQLdb()
安装Celery的Django扩展库
我们需要将异步执行的结果状态等信息通过ORM存储到redis中(默认是sqlite),需要安装额外的扩展库。
The django-celery-results extension provides result backends using either the Django ORM, or the Django Cache framework.
可以将异步任务执行后的结果加入缓存,或者以其他形式保存下来。
安装命令:
pip install django-celery-results
修改setting.py
添加以下关于celery的参数:
# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379/1'
#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' # 这里在后面配置存储形式时会修改,注意。
CELERY_TASK_SERIALIZER = 'json'
在INSTALLED_APPS中添加django-celery-results和刚创建的新app:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'rest_framework',
'django.contrib.staticfiles',
'App',
'django_celery_results',
'sendemail',
]
迁移django_celery_results数据库
python manage.py makemigrations
python manage.py migrate
python manage.py migrate django_celery_results
配置存储形式
Configure Celery to use the django-celery-results backend.
Assuming you are using Django’s settings.py
to also configure Celery, add the following settings:
CELERY_RESULT_BACKEND = 'django-db' # 存储在django-db中
For the cache backend you can use:
CELERY_RESULT_BACKEND = 'django-cache'
这里实验使用存储在django-db中。
新建tasks任务
新建tasks.py:
from time import sleep
from celery import shared_task
@shared_task # 装饰器
def add(a, b):
print('好困啊')
sleep(5)
return a + b
新建urls.py
from django.urls import path
from sendemail import views
urlpatterns = [
path(r'index/', views.index),
path(r'asyncs/', views.asyncs),
]
总urls.py
注册刚刚创建的urls
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('app/', include('App.urls')),
path('send/', include('sendemail.urls')),
]
views
from django.http import HttpResponse
from django.shortcuts import render
# Create your views here.
from sendemail.tasks import add
def index(request):
result = add(5, 6)
return HttpResponse(result)
def asyncs(request):
result_id = add.delay(6, 8)
return HttpResponse(result_id)
测试项目可用性
非异步测试
异步测试
都成功了,并且在mysql数据库中我们也能看到异步执行的结果,并且显示正常(sqlite显示不正常):
编写发送邮件任务
task.py
在tasks.py中写入发送邮件的代码:
from time import sleep
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email(receive):
subject = "老铁你好" # 主题
message = '老铁双击666' # 消息
from_email = 'xxxxxxx@qq.com' # 发件人
recepient_list = (receive, ) # 收件人列表
send_mail(subject, message, from_email)
settings.py配置邮件参数
EMAIL_HOST_USER = 'xxxxxxx@qq.com' # 发送邮箱账户
EMAIL_HOST_PASSWORD = 'xxxxxx' # 授权码
EMAIL_HOST = 'smtp.qq.com'
EMAIL_PORT = 587
EMAIL_USE_TLS = True
urls
urls添加:
path(r'email/', views.email),
views
from django.http import HttpResponse
from django.shortcuts import render
from sendemail.tasks import add, send_email
def email(request):
mail_address = request.GET.get('address')
send_result = send_email.delay(mail_address)
return HttpResponse(send_result)
测试发送邮件功能
重启celery和项目
访问页面:
页面返回id,邮箱也接收到了邮件,异步发送成功。
此处评论已关闭