Django+Channels实现websocket通信

前段时间做项目需要实现服务端向客户端实时推送消息的功能,因为服务端是用的Django框架,就考虑用Django + Channels 来实现 websocket通信。

Django自不必多说,本篇文章主要介绍Channels实现websocket通信以及与Django的部分功能的集成

Channels 简介

Channels的主要两个Features:

  1. 一个是能将同步的Django项目转化为异步;
  2. 另一个是允许 Django 项目不仅能处理HTTP请求 , 而且还能处理长连接的协议请求-WebSocketsMQTTchatbotsamateur radio等。

安装及配置

安装

  1. pip安装

    pip install -U channels

  2. 安装包安装

    $ git clone [email protected]:django/channels.git
    $ cd channels
    $ <activate your project’s virtual environment>
    (environment) $ pip install -e .  # the dot specifies the current repo
    

配置

  1. channels加入到INSTALLED_APPS配置中:

    INSTALLED_APPS = (
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.sites',
        ...
        'channels',
    )
    
  2. 创建一个默认路由文件,与Django的路由文件区别,为myproject/routing.py:

    from channels.routing import ProtocolTypeRouter
    
    application = ProtocolTypeRouter({
        # Empty for now (http->django views is added by default)
    })
    
  3. 最后,在配置文件(settings.py)通过ASGI_APPLICATION配置将第二步中设置的ASGI_APPLICATION作为我们的根应用:

    ASGI_APPLICATION = "myproject.routing.application"
    

一旦配置完毕,channel将会被集成在Django中,并且会接管runserve命令。

需求

假设我们现已有一个部分功能开发完成的Django项目,现在要求服务端能够对不同用户打开的web客户端发送实时消息。即建立的websocket链接能够按照已经验证过的user分组,并且能在我们已有的视图文件(views.py)中向建立的websocket发送消息。

接下来开始coding

创建应用

按照模块化的思想,首先创建一个应用来完成我们的功能。这一步和正常的Django创建应用相同

  1. startapp:

    python manage.py startapp websockets

  2. 将应用添加到INSTALLED_APPS

    # myproject/settings.py
    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        ...
        “channels",
        ‘'websockets'
    ]
    

routing

Djangourls.py文件功能类似,我们同样需要将创建的websocket链接绑定到不同的处理函数上。

ProtocolTypeRouter

因为channels支持HTTPWebSockets等协议,所以我们需要在根应用路由中拥有一个ProtocolTypeRouter来将不同的协议通过协议服务器转到不同的子路由中处理:

### myproject/routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import websockets.routing

application = ProtocolTypeRouter({
    #普通的HTTP请求不需要我们手动在这里添加,框架会自动加载过来
    'websocket': AuthMiddlewareStack(
        URLRouter(
            websockets.routing.websocket_urlpatterns
        )
    ),
})

URLRouter

URLRouter通过请求路径路由不同的httpwebsocket链接,是一个Dajngo url列表。

接上一步,需要在websockets应用中创建routing.py文件然后根据路径定义url列表,即websocket_urlpatterns

from django.conf.urls import url

from . import consumers

websocket_urlpatterns = [
    url(r'^ws/$', consumers.WsConsumer),
]

ChannelNameRouter

还有一类路由通过channel keyscope中的值来进行路由,在本例中用不上,可参见官方文档:

# 与上面的例子无关
ChannelNameRouter({
    "thumbnails-generate": some_app,
    "thunbnails-delete": some_other_app,
})

到这一步路由就完成了,接下来就是定义我们的处理类,也就是WsConsumer.

Consumers

websockets目录下新建consumers.py文件:

# websockets.consumers.py
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import json

class WsConsumer(WebsocketConsumer):
    def connect(self):
        self.user = self.scope["user"]
        if self.user.is_anonymous:
            self.close()
            return
        self.group_name = 'task_%s' % str(self.user.id)

        # Join user group
        async_to_sync(self.channel_layer.group_add)(
            self.group_name,
            self.channel_name
        )
        self.accept()

    def disconnect(self, close_code):
        # Leave user group
        if not self.user.is_anonymous:
            async_to_sync(self.channel_layer.group_discard)(
                self.group_name,
                self.channel_name
            )

    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to user group
        async_to_sync(self.channel_layer.group_send)(
            self.group_name,
            {
                'type': 'task_message',
                'message': message
            }
        )

    # Receive message from user group
    def task_message(self, event):
        message = event['message']

        # Send message to WebSocket
        self.send(text_data=json.dumps({
            'message': message
        }))

Consumers中,我们构造了一系列当事件发生时就会对应调用的函数, 而不是去编写事件循环来进行事件处理。

Generic Consumers

在代码中,相比于继承channels.consumer.AsyncConsumerchannels.consumer.SyncConsumer来创建基本的consumer,我们继承了包装更为完善的Generic ConsumersWebsocketConsumer

其他的Generic Consumers还有AsyncWebsocketConsumerJsonWebsocketConsumerAsyncJsonWebsocketConsumerAsyncHttpConsumer,正如字面意义,都有同步异步之分,这里不一一介绍,可详见官方文档。

connect

当前端发送一个新建websocket的请求过来的时候,会自动触发connect()函数事件。

<script>
    var webSocket = new WebSocket(
        'ws://' + window.location.host + '/ws/');
    ...
</script>

disconnet

server 端的 WebSocket 调用 self.close() 关闭时,前端的 webSocket.onclose 会自动触发

<script>
    ....
    webSocket.onclose = function(e) {
        console.error('web socket closed unexpectedly');
    };
    ....
</script>

而当链接断开时,无论是主动的前端页面关闭,还是被动的网络故障,都会触发disconnect()函数事件。

当然若因断电或其他故障导致的服务挂掉,自然也就触发不了disconnect()函数。

receive

当前端或后端将信息发送到websocket中后,服务端的receive()事件就会被触发,可在此函数中对收到的消息进行处理。在上面我们定义的文件中,我们将收到的消息通过group(后续介绍)群发到group包含的通道中,然后每个通道通过task_message函数对消息进行处理。

前端发送请求:

<script>
    ....
    // 点击按键,发送信息是很常见的逻辑
    document.querySelector('#task-message-submit').onclick = function(e) {
        var messageInputDom = document.querySelector('#task-message-input');
        var message = messageInputDom.value;
        webSocket.send(JSON.stringify({
            'message': message
        }));

        messageInputDom.value = '';
    };
</script>

task_message

自定义消息处理函数。当通道收到消息调用receive()函数时,若我们只需要此通道对消息进行处理,自然也可以在receive()函数中直接进行处理。但当我们需要通道所属group的其他成员也能对消息进行处理,那么只在receive()函数中处理就满足不了需要了。

通过将消息群发到每个通道的task_message中进行处理。

Scope

consumer 在初始化连接时收到连接的scope, 其中包含大部分我们能在Django视图中的Request对象上找到的信息。在consumer的方法内通过self.scope进行调用。

scopeASGI规范的一部分, 但以下是我们可能要使用的一些常见内容:

  • scope["path"], the path on the request. (HTTP and WebSocket)
  • scope["headers"], raw name/value header pairs from the request (HTTP and WebSocket)
  • scope["method"], the method name used for the request. (HTTP)

在我们的例子中,因为在路由中使用的是AuthMiddlewareStack来处理websocket请求,所以我们可以直接通过scope["user"]取出当前登录的user信息

Channel Layers

试想想,在项目中,我们不仅需要在收到前端消息时能对消息进行处理,而且在Django的其他业务处理视图中也需要向前端发现消息。比如在Django业务处理逻辑中,新增了某个用户或某类用户的待处理任务,我们怎么在Django的业务逻辑中(脱离了consumer)给通道内广播信息呢?

通道层允许我们在应用程序的不同实例之间进行对话。如果您不想将所有消息或事件都传送到数据库中, 它们是制作分布式实时应用程序的一个有用部分。

配置

channel layers 因为是使用 redis 作为消息通道层,这使得一个应用中两个实例之间可以实现通信。使用channel layers需要redis的支持。首先需要在配置文件配置所需要连接的redis服务。

# settings.py
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis-server-name", 6379)],
        },
    },
}

获取实例

  1. consumer内部使用self.channel_layer来获取channel layers的实例。

  2. 在脱离了consumer的情况下,Channerls提供了get_channel_layer函数接口来获取它。

    from channels.layers import get_channel_layer
    channel_layer = get_channel_layer()
    

异步

一旦你获取到了实例,你就可以调用实例中的方法。但请记住,channel layers仅支持异步方法,因此你可以从自己的异步上下文中调用它:

for group in groups:
    await channel_layer.group_send(
        group_name,
        {"type": "task.message", "message": "Hello there!"},
    )

或者,如果你的上下文是同步环境,可以使用 async_to_sync 来转换调用:

from asgiref.sync import async_to_sync

async_to_sync(channel_layer.group_send)(
    group_name, 
    {"type": "task.message", "message": "Hello there!"}
)

Single Channels

channel_layer.send

当我们不需要群发,而是向某个通道发送消息时。只需要找到通道的名字,使用channel_layer.send:

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()
await channel_layer.send("channel_name", {
    "type": "task.message",
    "message": "Hello there!",
})

type标识处理消息的函数,也就是这里的消息会被consumer中的task_message函数处理。type后的键值对会形成一个字典作为参数传给type所标识的函数

注意:这里使用"type": "task.message""type": "task_message"效果一致,channels会将.转换为_

channel_name

channel_name是每一个websocket请求过来时候, Channels 自动帮我们生成的一个个性化名称,实现代码就是用随机函数组装:

# channels/layers.py 
async def new_channel(self, prefix="specific."):
    """
    Returns a new channel name that can be used by something in our
    process as a specific channel.
    """
    return "%s.inmemory!%s" % (
        prefix,
        "".join(random.choice(string.ascii_letters) for i in range(12)),
    )

Group

显然, 发送到单个频道并不是特别有用--在大多数情况下, 我们希望作为广播同时发送给多个频道consumer

channel_layer.group_add

将频道加入到某个group中,如我们的例子中:

async_to_sync(self.channel_layer.group_add)(
    self.group_name,
    self.channel_name
)

第一个参数为group的名字,第二个参数为channel_name.

channel_layer.group_discard

将频道从group中移除,如我们的例子中:

async_to_sync(self.channel_layer.group_discard)(
    self.group_name,
    self.channel_name
)

channel_layer.group_send

group中的频道群发消息:

async_to_sync(self.channel_layer.group_send)(
    self.group_name,
    {
        'type': 'task_message',
        'message': message
    }
)

至此,我们的consumer.py里所牵涉的内容基本都介绍完毕了。

Authentication

Channels中的AuthMiddleware支持标准的 Django身份验证, 其中用户详细信息存储在session中。它允许对scope中的用户对象进行只读访问。

若要使用中间件, 请将其包装在路由中适当级别的consumer周围:

### myproject/routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import websockets.routing

application = ProtocolTypeRouter({
    #普通的HTTP请求不需要我们手动在这里添加,框架会自动加载过来
    'websocket': AuthMiddlewareStack(
        URLRouter(
            websockets.routing.websocket_urlpatterns
        )
    ),
})

使用self.scope["user"]来获取登录的用户:

class WsConsumer(WebsocketConsumer):
    def connect(self):
        self.user = self.scope["user"]

注意,因为用户详细信息存储在session中。若我们使用了token的认证方式,前端通过token来验证用户,而不是用session,那么我们需要自定义我们的认证文件。

jwt认证方式来举例,新建ws_authentication.py文件:

from functools import wraps
import jwt
import json
import logging

from django.utils.translation import ugettext_lazy as _
from django.contrib.auth import get_user_model

from rest_framework import exceptions
from rest_framework_jwt.settings import api_settings

import logging

logger = logging.getLogger(__name__)  # 为loggers中定义的名称

User = get_user_model()
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER


def token_authenticate(token, message):
    """
    Tries to authenticate user based on the supplied token. It also checks
    the token structure and validity.
    """

    payload = check_payload(token=token, message=message)
    user = check_user(payload=payload, message=message)

    """Other authenticate operation"""
    return user, token


# 检查负载
def check_payload(token, message):
    payload = None
    try:
        payload = jwt_decode_handler(token)
    except jwt.ExpiredSignature:
        msg = _('Signature has expired.')
        logger.warn(msg)
        # raise ValueError(msg)
        #_close_reply_channel(message)
    except jwt.DecodeError:
        msg = _('Error decoding signature.')
        logger.warn(msg)
        #_close_reply_channel(message)
    return payload


# 检查用户
def check_user(payload, message):
    username = None
    user = None
    try:
        username = payload.get('username')
    except Exception:
        msg = _('Invalid payload.')
        logger.warn(msg)
        _close_reply_channel(message)
    if not username:
        msg = _('Invalid payload.')
        logger.warn(msg)
        _close_reply_channel(message)
        return
        # Make sure user exists
    try:
        user = User.objects.get_by_natural_key(username)
    except User.DoesNotExist:
        msg = _("User doesn't exist.")
        logger.warn(msg)
        return user
        raise exceptions.AuthenticationFailed(msg)

    if not user.is_active:
        msg = _('User account is disabled.')
        logger.warn(msg)
        raise exceptions.AuthenticationFailed(msg)
    return user

# 关闭websocket
def _close_reply_channel(message):
    #message.send({"close": True})
    return

# 验证request中的token
def ws_auth_request_token(func):
    """
    Checks the presence of a "token" request parameter and tries to
    authenticate the user based on its content.
    The request url must include token.
    eg: /v1/channel/1/?token=abcdefghijklmn
    """

    @wraps(func)
    def inner(message, *args, **kwargs):
        scope = message.scope
        #header= json.loads(scope.get("headers"))
        token = scope.get("query_string").decode().split("=", 1)[1]

        if token is None:
            _close_reply_channel(message)
            raise ValueError("Missing token request parameter. Closing channel.")

        user, token = token_authenticate(token, message)

        message.token = token
        message.user = user
        return func(message, *args, **kwargs)

    return inner

然后在我们需要进行验证的函数前,使用该装饰器:

class WsConsumer(WebsocketConsumer):
    @ws_auth_request_token
    def connect(self):

该装饰器会将前端对前端新建链接url中的token参数进行jwt验证,将验证通过的user赋值给scope['user']

结语

到这,基本就完成了我们需求中的要求。更多细节就不再介绍。

有关更多channel细节,可参见官方文档

Copyright © itrunner.cn 2020 all right reserved,powered by Gitbook该文章修订时间: 2024-08-01 14:07:45

results matching ""

    No results matching ""