用户登录

功能分析

  • 用户登陆

    • 验证用户名密码
    • 生成用户令牌
    • 注册用户使用post请求,登录也使用post请求,产生冲突

      • 方案一:rest框架的规则是首要实现功能,然后按照rest规定的风格去编写代码,所以可以自己设置规则完成
      • 方案二:强行使用一个post既做注册,也做登录

        • 在request.query_params中添加action(动词参数)
        • path/?action=login
        • path/?action=register
    • 异常捕获尽量精确
  • 用户认证

    • BaseAuthentication

      • authenticate

        • 认证成功会返回 一个元组

          • 第一个元素是user
          • 第二元素是令牌  token,auth

功能实现

按照方案二实现功能,需要在views中重写post请求

当用户登录成功,不是重定向页面,而是返回一个token,用户带着token自行访问其他页面。

新建常量类

HTTP_ACTION_LOGIN = 'login'
HTTP_ACTION_REGISTER = 'register'

重写post请求

class UsersAPIView(ListCreateAPIView):

    serializer_class = UserSerializer

    queryset = UserModel.objects.all()

    def post(self, request, *args, **kwargs):
        # 获取action
        action = request.query_params.get('action')
        # 如果action为注册,就调用创建方法
        if action == HTTP_ACTION_REGISTER:
            return self.create(request, *args, **kwargs)
        # 如果action为登录,就调用登录方法
        elif action == HTTP_ACTION_LOGIN:
            # 获取用户名密码
            u_name = request.data.get('u_name')
            u_password = request.data.get('u_password')
            # 验证
            try:  # 分为用户名找到和用户名找不到
                user = UserModel.objects.get(u_name=u_name)
                if user.u_password == u_password:
                    # 如果密码匹配,就生成token
                    token = uuid.uuid4().hex

                    data = {
                        'msg': 'login success',
                        'status': 200,
                        'token': token,
                    }
                    return Response(data)
                else:
                    # 如果登录失败
                    raise exceptions.AuthenticationFailed
            except UserModel.DoesNotExist:
                # 抛出异常
                raise exceptions.NotFound
        else:
            raise exceptions.ValidationError

另外还需要在serializers,py中添加u_password字段,否则数据库未存入密码。

测试:

可以看到客户端可以接收到token。

token持久化

现在服务端生成token之后保存到缓存中

配置REDIS缓存

settings.py:

CACHES = {
    'default':{
        'BACKEND':'django_redis.cache.RedisCache',
        'LOCATION':'redis://127.0.0.1:6379/1',
        'OPTIONS':{
            'CLIENT_CLASS':'django_redis.client.DefaultClient',
        },
        'TIMEOUT': 60*60*2
    }
}

启动redis:
C:\redis>redis-server.exe

views

在登录时将token存入redis缓存中
导入from `django.core.cache import cache`
第18行添加存储token代码:

    def post(self, request, *args, **kwargs):
        # 获取action
        action = request.query_params.get('action')
        # 如果action为注册,就调用创建方法
        if action == HTTP_ACTION_REGISTER:
            return self.create(request, *args, **kwargs)
        # 如果action为登录,就调用登录方法
        elif action == HTTP_ACTION_LOGIN:
            # 获取用户名密码
            u_name = request.data.get('u_name')
            u_password = request.data.get('u_password')
            # 验证
            try:  # 分为用户名找到和用户名找不到
                user = UserModel.objects.get(u_name=u_name)
                if user.u_password == u_password:
                    # 如果密码匹配,就生成token
                    token = uuid.uuid4().hex
                    cache.set(token,user.id)

                    data = {
                        'msg': 'login success',
                        'status': 200,
                        'token': token,
                    }
                    return Response(data)
                else:
                    # 如果登录失败
                    raise exceptions.AuthenticationFailed
            except UserModel.DoesNotExist:
                # 抛出异常
                raise exceptions.NotFound

之后用户登录时token就能持久化存储。

至此用户登录功能算实现了。

用户认证

功能分析

只有登录成功,才能get查看用户信息,未登录不能查看。

认证类实现

可以在get之前添加认证类,并且rest自带的token认证类只能使用model模型中的,而这里需要和redis中存储的token进行认证,所以需要重写认证方法。

之前讲过,在APIView中的dispatch中的initial中:

    def initial(self, request, *args, **kwargs):
        """
        Runs anything that needs to occur prior to calling the method handler.
        """
        self.format_kwarg = self.get_format_suffix(**kwargs)

        # Perform content negotiation and store the accepted info on the request
        neg = self.perform_content_negotiation(request)
        request.accepted_renderer, request.accepted_media_type = neg

        # Determine the API version, if versioning is in use.
        version, scheme = self.determine_version(request, *args, **kwargs)
        request.version, request.versioning_scheme = version, scheme

        # Ensure that the incoming request is permitted
        self.perform_authentication(request)
        self.check_permissions(request)
        self.check_throttles(request)

第16行这里进行了认证步骤,查看perform_authentication方法:

    def perform_authentication(self, request):
        """
        Perform authentication on the incoming request.

        Note that if you override this and simply 'pass', then authentication
        will instead be performed lazily, the first time either
        `request.user` or `request.auth` is accessed.
        """
        request.user

这里的request.user 执行了认证步骤,继续查看源代码

    @property
    def user(self):
        """
        Returns the user associated with the current request, as authenticated
        by the authentication classes provided to the request.
        """
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():
                self._authenticate()
        return self._user

这个_authenticate()实现认证,继续查看

    def _authenticate(self):
        """
        Attempt to authenticate the request using each authentication instance
        in turn.
        """
        for authenticator in self.authenticators:
            try:
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise

            if user_auth_tuple is not None:
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return

第8行调用了authenticator.authenticate(self)方法,所以需要重写这个方法实现token验证

authenticator.authenticate(self)

这个方法登录成功应该返回一个元组第一个是用户,第二个是token

新建Auth

新建Auth.py文件:

from django.core.cache import cache
from rest_framework.authentication import BaseAuthentication

from UserAuthAndPermission.models import UserModel


class UserAuth(BaseAuthentication):
    # 万能建实现重要方法authenticate
    def authenticate(self, request):
        
        if request.method == 'GET':
            
            token = request.query_params.get('token')
    
            try:
                u_id = cache.get(token)
                user = UserModel.objects.get(pk=u_id)
                # 登录成功返回用户和token
                return user, token
            except:
                return

UserAuth导入views中的认证类:

接着在get方法中过滤未登录用户:

    def get(self, request, *args, **kwargs):
        # 判断登录,如果请求中的user是UserModel的实例
        if isinstance(request.user, UserModel):
            return self.list(request, *args, **kwargs)
        else:
            raise exceptions.NotAuthenticated

测试

未携带token直接使用get访问

可以看到无法访问。

现在登录获取token,然后将token作为参数加入get请求中:

可以看到访问成功。

最后修改:2024 年 03 月 13 日
如果觉得我的文章对你有用,请随意赞赏