Django+Channels实现websocket通信
前段时间做项目需要实现服务端向客户端实时推送消息的功能,因为服务端是用的Django框架,就考虑用Django
+ Channels
来实现 websocket
通信。
Django
自不必多说,本篇文章主要介绍Channels
实现websocket
通信以及与Django
的部分功能的集成
Channels 简介
Channels
的主要两个Features
:
- 一个是能将同步的
Django
项目转化为异步; - 另一个是允许
Django
项目不仅能处理HTTP
请求 , 而且还能处理长连接的协议请求-WebSockets
、MQTT
、chatbots
、amateur radio
等。
安装及配置
安装
pip
安装pip install -U channels
安装包安装
$ git clone [email protected]:django/channels.git $ cd channels $ <activate your project’s virtual environment> (environment) $ pip install -e . # the dot specifies the current repo
配置
将
channels
加入到INSTALLED_APPS
配置中:INSTALLED_APPS = ( 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.sites', ... 'channels', )
创建一个默认路由文件,与Django的路由文件区别,为
myproject/routing.py
:from channels.routing import ProtocolTypeRouter application = ProtocolTypeRouter({ # Empty for now (http->django views is added by default) })
最后,在配置文件(
settings.py
)通过ASGI_APPLICATION
配置将第二步中设置的ASGI_APPLICATION
作为我们的根应用:ASGI_APPLICATION = "myproject.routing.application"
一旦配置完毕,channel
将会被集成在Django中,并且会接管runserve
命令。
需求
假设我们现已有一个部分功能开发完成的Django项目,现在要求服务端能够对不同用户打开的web客户端发送实时消息。即建立的websocket
链接能够按照已经验证过的user
分组,并且能在我们已有的视图文件(views.py)中向建立的websocket
发送消息。
接下来开始coding
。
创建应用
按照模块化的思想,首先创建一个应用来完成我们的功能。这一步和正常的Django
创建应用相同
startapp:
python manage.py startapp websockets
将应用添加到
INSTALLED_APPS
:# myproject/settings.py INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', ... “channels", ‘'websockets' ]
routing
与Django
的urls.py
文件功能类似,我们同样需要将创建的websocket
链接绑定到不同的处理函数上。
ProtocolTypeRouter
因为channels
支持HTTP
、WebSockets
等协议,所以我们需要在根应用路由中拥有一个ProtocolTypeRouter
来将不同的协议通过协议服务器转到不同的子路由中处理:
### myproject/routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import websockets.routing
application = ProtocolTypeRouter({
#普通的HTTP请求不需要我们手动在这里添加,框架会自动加载过来
'websocket': AuthMiddlewareStack(
URLRouter(
websockets.routing.websocket_urlpatterns
)
),
})
URLRouter
URLRouter
通过请求路径路由不同的http
或websocket
链接,是一个Dajngo url
列表。
接上一步,需要在websockets
应用中创建routing.py
文件然后根据路径定义url
列表,即websocket_urlpatterns
:
from django.conf.urls import url
from . import consumers
websocket_urlpatterns = [
url(r'^ws/$', consumers.WsConsumer),
]
ChannelNameRouter
还有一类路由通过channel key
在scope
中的值来进行路由,在本例中用不上,可参见官方文档:
# 与上面的例子无关
ChannelNameRouter({
"thumbnails-generate": some_app,
"thunbnails-delete": some_other_app,
})
到这一步路由就完成了,接下来就是定义我们的处理类,也就是WsConsumer
.
Consumers
在websockets
目录下新建consumers.py
文件:
# websockets.consumers.py
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import json
class WsConsumer(WebsocketConsumer):
def connect(self):
self.user = self.scope["user"]
if self.user.is_anonymous:
self.close()
return
self.group_name = 'task_%s' % str(self.user.id)
# Join user group
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
self.accept()
def disconnect(self, close_code):
# Leave user group
if not self.user.is_anonymous:
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
# Send message to user group
async_to_sync(self.channel_layer.group_send)(
self.group_name,
{
'type': 'task_message',
'message': message
}
)
# Receive message from user group
def task_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=json.dumps({
'message': message
}))
在Consumers
中,我们构造了一系列当事件发生时就会对应调用的函数, 而不是去编写事件循环来进行事件处理。
Generic Consumers
在代码中,相比于继承channels.consumer.AsyncConsumer
或channels.consumer.SyncConsumer
来创建基本的consumer
,我们继承了包装更为完善的Generic Consumers
:WebsocketConsumer
。
其他的Generic Consumers
还有AsyncWebsocketConsumer
、JsonWebsocketConsumer
、 AsyncJsonWebsocketConsumer
和AsyncHttpConsumer
,正如字面意义,都有同步异步之分,这里不一一介绍,可详见官方文档。
connect
当前端发送一个新建websocket
的请求过来的时候,会自动触发connect()
函数事件。
<script>
var webSocket = new WebSocket(
'ws://' + window.location.host + '/ws/');
...
</script>
disconnet
当 server
端的 WebSocket
调用 self.close()
关闭时,前端的 webSocket.onclose
会自动触发
<script>
....
webSocket.onclose = function(e) {
console.error('web socket closed unexpectedly');
};
....
</script>
而当链接断开时,无论是主动的前端页面关闭,还是被动的网络故障,都会触发disconnect()
函数事件。
当然若因断电或其他故障导致的服务挂掉,自然也就触发不了disconnect()
函数。
receive
当前端或后端将信息发送到websocket
中后,服务端的receive()
事件就会被触发,可在此函数中对收到的消息进行处理。在上面我们定义的文件中,我们将收到的消息通过group
(后续介绍)群发到group
包含的通道中,然后每个通道通过task_message
函数对消息进行处理。
前端发送请求:
<script>
....
// 点击按键,发送信息是很常见的逻辑
document.querySelector('#task-message-submit').onclick = function(e) {
var messageInputDom = document.querySelector('#task-message-input');
var message = messageInputDom.value;
webSocket.send(JSON.stringify({
'message': message
}));
messageInputDom.value = '';
};
</script>
task_message
自定义消息处理函数。当通道收到消息调用receive()
函数时,若我们只需要此通道对消息进行处理,自然也可以在receive()
函数中直接进行处理。但当我们需要通道所属group
的其他成员也能对消息进行处理,那么只在receive()
函数中处理就满足不了需要了。
通过将消息群发到每个通道的task_message
中进行处理。
Scope
consumer
在初始化连接时收到连接的scope
, 其中包含大部分我们能在Django
视图中的Request
对象上找到的信息。在consumer
的方法内通过self.scope
进行调用。
scope
是ASGI
规范的一部分, 但以下是我们可能要使用的一些常见内容:
scope["path"]
, the path on the request. (HTTP and WebSocket)scope["headers"]
, raw name/value header pairs from the request (HTTP and WebSocket)scope["method"]
, the method name used for the request. (HTTP)
在我们的例子中,因为在路由中使用的是AuthMiddlewareStack
来处理websocket
请求,所以我们可以直接通过scope["user"]
取出当前登录的user
信息
Channel Layers
试想想,在项目中,我们不仅需要在收到前端消息时能对消息进行处理,而且在Django
的其他业务处理视图中也需要向前端发现消息。比如在Django
业务处理逻辑中,新增了某个用户或某类用户的待处理任务,我们怎么在Django
的业务逻辑中(脱离了consumer
)给通道内广播信息呢?
通道层允许我们在应用程序的不同实例之间进行对话。如果您不想将所有消息或事件都传送到数据库中, 它们是制作分布式实时应用程序的一个有用部分。
配置
channel layers
因为是使用 redis
作为消息通道层,这使得一个应用中两个实例之间可以实现通信。使用channel layers
需要redis
的支持。首先需要在配置文件配置所需要连接的redis
服务。
# settings.py
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("redis-server-name", 6379)],
},
},
}
获取实例
在
consumer
内部使用self.channel_layer
来获取channel layers
的实例。在脱离了
consumer
的情况下,Channerls
提供了get_channel_layer
函数接口来获取它。from channels.layers import get_channel_layer channel_layer = get_channel_layer()
异步
一旦你获取到了实例,你就可以调用实例中的方法。但请记住,channel layers
仅支持异步方法,因此你可以从自己的异步上下文中调用它:
for group in groups:
await channel_layer.group_send(
group_name,
{"type": "task.message", "message": "Hello there!"},
)
或者,如果你的上下文是同步环境,可以使用 async_to_sync
来转换调用:
from asgiref.sync import async_to_sync
async_to_sync(channel_layer.group_send)(
group_name,
{"type": "task.message", "message": "Hello there!"}
)
Single Channels
channel_layer.send
当我们不需要群发,而是向某个通道发送消息时。只需要找到通道的名字,使用channel_layer.send
:
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()
await channel_layer.send("channel_name", {
"type": "task.message",
"message": "Hello there!",
})
type
标识处理消息的函数,也就是这里的消息会被consumer
中的task_message
函数处理。type
后的键值对会形成一个字典作为参数传给type
所标识的函数
注意:这里使用"type": "task.message"
与"type": "task_message"
效果一致,channels
会将.
转换为_
。
channel_name
channel_name
是每一个websocket请求过来时候, Channels
自动帮我们生成的一个个性化名称,实现代码就是用随机函数组装:
# channels/layers.py
async def new_channel(self, prefix="specific."):
"""
Returns a new channel name that can be used by something in our
process as a specific channel.
"""
return "%s.inmemory!%s" % (
prefix,
"".join(random.choice(string.ascii_letters) for i in range(12)),
)
Group
显然, 发送到单个频道并不是特别有用--在大多数情况下, 我们希望作为广播同时发送给多个频道consumer
。
channel_layer.group_add
将频道加入到某个group
中,如我们的例子中:
async_to_sync(self.channel_layer.group_add)(
self.group_name,
self.channel_name
)
第一个参数为group
的名字,第二个参数为channel_name
.
channel_layer.group_discard
将频道从group
中移除,如我们的例子中:
async_to_sync(self.channel_layer.group_discard)(
self.group_name,
self.channel_name
)
channel_layer.group_send
向group
中的频道群发消息:
async_to_sync(self.channel_layer.group_send)(
self.group_name,
{
'type': 'task_message',
'message': message
}
)
至此,我们的consumer.py
里所牵涉的内容基本都介绍完毕了。
Authentication
Channels
中的AuthMiddleware
支持标准的 Django
身份验证, 其中用户详细信息存储在session
中。它允许对scope
中的用户对象进行只读访问。
若要使用中间件, 请将其包装在路由中适当级别的consumer
周围:
### myproject/routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import websockets.routing
application = ProtocolTypeRouter({
#普通的HTTP请求不需要我们手动在这里添加,框架会自动加载过来
'websocket': AuthMiddlewareStack(
URLRouter(
websockets.routing.websocket_urlpatterns
)
),
})
使用self.scope["user"]
来获取登录的用户:
class WsConsumer(WebsocketConsumer):
def connect(self):
self.user = self.scope["user"]
注意,因为用户详细信息存储在session
中。若我们使用了token
的认证方式,前端通过token
来验证用户,而不是用session
,那么我们需要自定义我们的认证文件。
以jwt
认证方式来举例,新建ws_authentication.py
文件:
from functools import wraps
import jwt
import json
import logging
from django.utils.translation import ugettext_lazy as _
from django.contrib.auth import get_user_model
from rest_framework import exceptions
from rest_framework_jwt.settings import api_settings
import logging
logger = logging.getLogger(__name__) # 为loggers中定义的名称
User = get_user_model()
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER
def token_authenticate(token, message):
"""
Tries to authenticate user based on the supplied token. It also checks
the token structure and validity.
"""
payload = check_payload(token=token, message=message)
user = check_user(payload=payload, message=message)
"""Other authenticate operation"""
return user, token
# 检查负载
def check_payload(token, message):
payload = None
try:
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
logger.warn(msg)
# raise ValueError(msg)
#_close_reply_channel(message)
except jwt.DecodeError:
msg = _('Error decoding signature.')
logger.warn(msg)
#_close_reply_channel(message)
return payload
# 检查用户
def check_user(payload, message):
username = None
user = None
try:
username = payload.get('username')
except Exception:
msg = _('Invalid payload.')
logger.warn(msg)
_close_reply_channel(message)
if not username:
msg = _('Invalid payload.')
logger.warn(msg)
_close_reply_channel(message)
return
# Make sure user exists
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist:
msg = _("User doesn't exist.")
logger.warn(msg)
return user
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
logger.warn(msg)
raise exceptions.AuthenticationFailed(msg)
return user
# 关闭websocket
def _close_reply_channel(message):
#message.send({"close": True})
return
# 验证request中的token
def ws_auth_request_token(func):
"""
Checks the presence of a "token" request parameter and tries to
authenticate the user based on its content.
The request url must include token.
eg: /v1/channel/1/?token=abcdefghijklmn
"""
@wraps(func)
def inner(message, *args, **kwargs):
scope = message.scope
#header= json.loads(scope.get("headers"))
token = scope.get("query_string").decode().split("=", 1)[1]
if token is None:
_close_reply_channel(message)
raise ValueError("Missing token request parameter. Closing channel.")
user, token = token_authenticate(token, message)
message.token = token
message.user = user
return func(message, *args, **kwargs)
return inner
然后在我们需要进行验证的函数前,使用该装饰器:
class WsConsumer(WebsocketConsumer):
@ws_auth_request_token
def connect(self):
该装饰器会将前端对前端新建链接url中的token
参数进行jwt
验证,将验证通过的user
赋值给scope['user']
结语
到这,基本就完成了我们需求中的要求。更多细节就不再介绍。
有关更多channel
细节,可参见官方文档