python channels 简介
目录
1.Channels概念
2.安装
3.开始使用Channels
Groups
Running with Channels
Persisting Data
Authenticantion
Routing
Models
Enforcing Ording
1.Channels概念
channel是一个队列,每个task最多一个consumer接收
用name字符串辨别channels,可由不同的机器向同一个channel通信
def my_consumer(message):passchannel_routing = {"some-channel": "myapp.consumers.my_consumer",
}
对channel的每个message,Django会用一个message对象调用consumer函数。message对象含一个content属性,是数据的字典,一个channel属性,是channel。
区别于Django的传统request-response模式,channel将Django改编为worker模式。传统Django只工作在与WSGI服务器关联的单进程,而Channel中的Django在三个不同的层运行:
1.服务层:包括WSGI适配器和WebSocket服务器。
2.channel backend
3.workers
# 这部分是伪代码
# Listens on http.request
def my_consumer(message):# Decode the request from message format to aRequest objectdjango_request = AsgiRequest(message)# Run viewdjango_response = view(django_request)# Encode the response into message formatfor chunk in AsgiHandler.encode_response(django_response):message.reply_channel.send(chunk)
事实上channels有两种作用。
- 将work给consumers。
- 用于responses,只有interface server在监听它,每个response channel命名不同,并且客户端终止时会被退回interface server。
两种channel差别不大,不同在于message可以由任意channel传递,但response channel只会将message传到它所监听的channel server。所以将它们视为两种不同的channel,并用!标记response channel,normal channel没有标记,但它只能包含字母和-_
因为channels只分配给一个监听器,所以他们不能广播。若希望给一组client发送消息,需要记录这些response channels。
redis_conn = redis.Redis("localhost", 6379)@receiver(post_save, sender=BlogUpdate)
def send_update(sender, instance, **kwargs):# Loop through all response channels and sendthe updatefor reply_channel in redis_conn.smembers("readers"):Channel(reply_channel).send({"id": instance.id,"content": instance.content,})# Connected towebsocket.connect
def ws_connect(message):# Add to reader setredis_conn.sadd("readers", message.reply_channel.name)
可以通过监听websocket.disconnect将不需要的从readers中移出,对于在调用该方法之前就终止的特殊情况,用Group。Group通常只用于response channel
@receiver(post_save, sender=BlogUpdate)
def send_update(sender, instance, **kwargs):Group("liveblog").send({"id": instance.id,"content": instance.content,})# Connected towebsocket.connect
def ws_connect(message):# Add to reader groupGroup("liveblog").add(message.reply_channel)# Connected towebsocket.disconnect
def ws_disconnect(message):# Remove from reader group on clean disconnectGroup("liveblog").discard(message.reply_channel)
但是channel不保证发送task。如果需要确保可以用专门的系统如celery。
2.安装
pip install -U channels# 然后把 channels 添加到 settings.py文件 的INSTALLED_APPS 里
3.开始使用Channels
下面的例子可能不常用,但是可以很好地说明channels如何在Django的下层发挥作用。
创建过程,将下面代码放入consumer.py
from django.http import HttpResponse
from channels.handler import AsgiHandlerdef http_consumer(message):# Make standard HTTP response -access ASGI path attribute directlyresponse = HttpResponse("Hello world! You asked for %s" %message.content['path'])# Encode that response into messageformat (ASGI)for chunk in AsgiHandler.encode_response(response):message.reply_channel.send(chunk)
最重要的是,message都是可JSON序列化的,所以request和response都是键值对形式。对于ASGI,你只需要知道有一个AsgiRequest类完成从ASGI到Django的request对象翻译, 并且AsgiHandler类负责HttpResponse到ASGI消息的翻译. 通常传统Django的内建代码会帮你完成这些工作.
还有一件事, 需要告诉Django, 这个consumer应该与http.request channel关联. 需要在settings文件中默认的channel layer以及它的路由规则.
# In settings.py
CHANNEL_LAYERS = {"default": {"BACKEND": "asgiref.inmemory.ChannelLayer","ROUTING": "myproject.routing.channel_routing",},
}# In routing.py
from channels.routing import route
channel_routing = [route("http.request", "myapp.consumers.http_consumer"),]
特别注意, 此例及之后大部分样例中都用"in memory"channel layer. 它适合初学者, 但是没有跨进程的channel沟通, 也只能用于"runserver". 实际产品中需要选择其他的backend. 建议将这个文件命名为routing.py,并且放在urls.py的同一目录下.
再来一个简单的聊天服务器,如果不为指定http.request指定consumer,则它会由Django的views处理。
# In consumers.pydef ws_message(message):# ASGI WebSocket packet-received andsend-packet message types# both have a "text" key for their textual data.message.reply_channel.send({"text":message.content['text'],})# In routing.py
from channels.routing import route
from myapp.consumers import ws_messagechannel_routing= [route("websocket.receive",ws_message),
]// Note that the path doesn't matter for routing; any WebSocket
// connection gets bumped over to WebSocket consumers
socket = new WebSocket("ws://" + window.location.host +"/chat/");
socket.onmessage = function(e) {alert(e.data);
}
socket.onopen = function() {socket.send("hello world");
}
Groups
接下来是一个真正实现聊天的服务器。注意,channels 的设计默认允许有一小部分消息不会被正常发送,这样保证当错误出现时不会影响整个系统。
# In consumers.py
from channels import Group# Connected to websocket.connect
def ws_add(message):Group("chat").add(message.reply_channel)# Connected to websocket.receive
def ws_message(message):Group("chat").send({"text": "[user]%s" % message.content['text'],})# Connected to websocket.disconnect
def ws_disconnect(message):Group("chat").discard(message.reply_channel)from channels.routing import route
from myapp.consumers import ws_add, ws_message, ws_disconnectchannel_routing= [route("websocket.connect",ws_add),route("websocket.receive",ws_message),route("websocket.disconnect", ws_disconnect),
]// Note that the path doesn't matter right now; any WebSocket
// connection gets bumped over to WebSocket consumers
socket = new WebSocket("ws://" + window.location.host +"/chat/");
socket.onmessage = function(e) {alert(e.data);
}
socket.onopen = function() {socket.send("helloworld");
}
Running with Channels
因为Channels将Django置入多进程模型,所以不再依靠一个WSGI服务器工作在单进程。而是运行通过channel layer关联起来的多个interface server和多个worker servers。
有许多种interface server,每个都负责一种request。它们与worker server解耦,而由channellayer传输channel的内容。从产品的角度看,你通常会将worker server当作与interfaceservers不同的集群运行,虽然你也可以将他们作为不同的进程运行在同一台机器上。
Django默认没有channel layer。上个例子我们用的in-memory channel layer,它将channel数据存储在内存中的一个字典里,所以不能跨进程。当部署时还是要换成Redis后端asgi_redis等。
第二件事,当我们设置好了channel后端,要确保我们的interface layer可以用于WebSockets。Channels用daphne解决这个问题。它是个可以同时处理HTTP和WebSockets的interfaceserver,并且当你runserver时运行。(其实就是,runserver运行Daphne在一个线程,一个worker在另一个线程,但都在同一个进程)
我们来试试Redis后端。先安装asgi_redis包。然后设置channel layer:
# In settings.py
CHANNEL_LAYERS = {"default": {"BACKEND": "asgi_redis.RedisChannelLayer","CONFIG": {"hosts": [("localhost", 6379)],},"ROUTING": "myproject.routing.channel_routing",},
}
运行runserver,它会和前面的一样运行,你也可以试试跨进程类型。将下面两个命令在两个终端运行:
manage.py runserver --noworker
manage.py runworker
你可能猜到了,它在runserver中禁用了worker并且在另一个进程中处理。如果你想看运行consumers的logging,也可以向runworker传递-v 2参数。
如果Django运行在debug模式,runworker像runserver一样会用于静态文件。就像通常的Django设置,你需要设置Debug模式关掉后你自己的静态文件。
Persisting Data
我们考虑一个基本的聊天网站,基于初始连接。
记住,Channels是网络透明的,并且可以在多个worker运行,所以你不能仅把东西存储在本地的全局变量。正如Django的session框架用cookie作为key,Channels提供channel_session装饰器。它提供message.channel_session属性,就像Django session。
# In consumers.py
from channels import Group
from channels.sessions import channel_session# Connected towebsocket.connect
@channel_session
def ws_connect(message):# Work out room name from path (ignore slashes)room = message.content['path'].strip("/")# Save room in session and add us to the groupmessage.channel_session['room'] = roomGroup("chat-%s" % room).add(message.reply_channel)# Connected towebsocket.receive
@channel_session
def ws_message(message):Group("chat-%s" % message.channel_session['room']).send({"text": message['text'],})# Connected towebsocket.disconnect
@channel_session
def ws_disconnect(message):Group("chat-%s" %message.channel_session['room']).discard(message.reply_channel)# in routing.py
from channels.routing import route
from myapp.consumers import ws_connect, ws_message,ws_disconnectchannel_routing = [route("websocket.connect", ws_connect),route("websocket.receive", ws_message),route("websocket.disconnect", ws_disconnect),
]
Authenticantion
目前的WebSocket无法与网站的其余成员通信。幸运的是,由于Channels在WebSocket和ASGI的底层,它自带了认证和Django session的装饰器。Channels可以通过cookies或session_key属性得到Django sessions。你用http_session装饰器获得Django session(message.http_session属性,就像request.session)。你也可以用http_session_user装饰器获得message.user属性。
注意这些只是WebSocket的HTTP消息的详细信息,并没有消耗多余的带宽。
也意味着我们需要从connection handler中获取user并将它保存在session中,。Channels自带了channel_session_user装饰器,就像http_session_user装饰器,不同的是前者从channel session获取user。还有一个函数transfer_user从一个session复制user到另一个。更棒的是,它把上面两个功能都组合进channnel_session_user_from_http装饰器。
我们实现一个服务器,只能与名字第一个字母相同的人对话。
# In consumers.py
from channels import Channel, Group
from channels.sessions import channel_session
from channels.auth import http_session_user,channel_session_user, channel_session_user_from_http# Connected towebsocket.connect
@channel_session_user_from_http
def ws_add(message):# Add them to the right groupGroup("chat-%s" %message.user.username[0]).add(message.reply_channel)# Connected towebsocket.receive
@channel_session_user
def ws_message(message):Group("chat-%s" % message.user.username[0]).send({"text": message['text'],})# Connected towebsocket.disconnect
@channel_session_user
def ws_disconnect(message):Group("chat-%s" %message.user.username[0]).discard(message.reply_channel)
如果你只是runserver(Daphne),你就可以正常连接并且你的cookies也应该传递你的auth。如果你将WebSockets运行在不同的端口,你必须在URL中提供Django session ID。
socket = newWebSocket("ws://127.0.0.1:9000/?session_key=abcdefg");
你可以在模板中得到session key{{request.session.session_key }}。注意它对于signed cookie sessions无效。
Routing
routing.py文件很像Django的urls.py。也可以用正则。
http_routing = [route("http.request",poll_consumer, path=r"^/poll/$", method=r"^POST$"),
]chat_routing = [route("websocket.connect",chat_connect, path=r"^/(?P<room>[a-zA-Z0-9_]+)/$),route("websocket.disconnect", chat_disconnect),
]routing = [# You can use a string import path asthe first argument as well.include(chat_routing,path=r"^/chat"),include(http_routing),
]
routing按顺序执行,有短路的可能。也可以不以^起始,而用python的re.match函数,但这需要经验。
Models
可以用Django的ORM处理信息的持久化。考虑性能:我们可以为聊天消息定制channel,并且加入保存和发送的步骤,这样发送进程和consumer就可以快速结束而不用等待。
# In consumers.py
from channels import Channel
from channels.sessions import channel_session
from .models import ChatMessage# Connected tochat-messages
def msg_consumer(message):# Save to modelroom = message.content['room']ChatMessage.objects.create(room=room,message=message.content['message'],)# Broadcast to listening socketsGroup("chat-%s" % room).send({"text": message.content['message'],})# Connected towebsocket.connect
@channel_session
def ws_connect(message):# Work out room name from path (ignore slashes)room = message.content['path'].strip("/")# Save room in session and add us to the groupmessage.channel_session['room'] = roomGroup("chat-%s" % room).add(message.reply_channel)# Connected towebsocket.receive
@channel_session
def ws_message(message):# Stick the message onto the processing queueChannel("chat-messages").send({"room": channel_session['room'],"message": message['text'],})# Connected towebsocket.disconnect
@channel_session
def ws_disconnect(message):Group("chat-%s" %message.channel_session['room']).discard(message.reply_channel)
注意我们可以从任何地方将message加入chat-messages channel。
Enforcing Ording
因为Channles是分布式系统,默认它按workers从队列中获得的顺序处理消息。很可能interface server发出非常近的connect和receive消息,connect还未被处理完,receive就被另一个worker处理。
Channels的解决方法是enforce_ordering装饰器。所有websocket消息都包含一个order键,这个装饰器用这个键确保message按顺序处理。有两个模式:
Slightordering:connect先处理,其他无序。
Strictordering:所有都按序。
# In consumers.py
from channels import Channel, Group
from channels.sessions import channel_session, enforce_ordering
from channels.auth import http_session_user,channel_session_user, channel_session_user_from_http# Connected towebsocket.connect
@enforce_ordering(slight=True)
@channel_session_user_from_http
def ws_add(message):# Add them to the right groupGroup("chat-%s" %message.user.username[0]).add(message.reply_channel)# Connected towebsocket.receive
@enforce_ordering(slight=True)
@channel_session_user
def ws_message(message):Group("chat-%s" % message.user.username[0]).send({"text": message['text'],})# Connected towebsocket.disconnect
@enforce_ordering(slight=True)
@channel_session_user
def ws_disconnect(message):Group("chat-%s" %message.user.username[0]).discard(message.reply_channel)python channels笔记-
python channels 简介相关推荐
- 《从问题到程序:用Python学编程和计算》——1.2 Python语言简介
本节书摘来自华章计算机<从问题到程序:用Python学编程和计算>一书中的第1章,第1.2节,作者 裘宗燕,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 1. ...
- Python列表简介
Python列表简介 什么是列表 #普通的变量定义形式 tom ='Tom' jack ='Jack' john ='John'pet1 ='cat' pet2 ='dog' pet3 ='bird' ...
- Python编程简介
Python编程简介 2011年06月23日 NOTE: The following is a short tutorial about python program, for Chinese rea ...
- 大数据教程【05.01】--Python 数据分析简介
更多信息请关注WX搜索GZH:XiaoBaiGPT Python数据分析简介 本教程将介绍如何使用Python进行大数据分析.Python是一种功能强大且易于使用的编程语言,具备丰富的数据分析库和工具 ...
- Python基础知识(Python的简介、Python环境的安装、集成开发环境Pycharm的安装)
1.Python的简介 python是跨平台的计算机语言.解释型语言.交互式语言.面向对象语言.初学者最好学的语言 什么是跨平台:意思就是说可以在很多操作系统中执行.比如:可以在windows操作系统 ...
- Python Notebook简介
windows下面安装和使用Python, IPython NoteBook (详细步骤) Python Notebook简介1 IPython notebook目前已经成为用Python做教学.计算 ...
- python画图简介
python画图简介 1.seaborn学习资料 2.matplotlib学习资料 3.扩展学习资料 4.实际练习 5.常用命令(待补充) 6.常用知识点备查 6.1 plt的默认属性rc参数 6.2 ...
- Python库简介之pylab
Python库简介之pylab 转载于 链接: https://blog.csdn.net/qq_34519492/article/details/96437901 https://blog.csdn ...
- python笔记-简介
一Python的简介 一历史简介 Python诞生于1991年,目前有27年了,比1995年的JAVA语言都早了4年,为何大器晚成? 其一,在1990那个年代,计算机性能相比现在差很多,程序执行速度和 ...
最新文章
- rtsp中的rtp发送和head理解
- java面试题(开发框架)
- Git系列之(七) 常用指令 git reset
- VS2019中配置opencv4.3.0(亲测有效)
- Java 8的6个问题
- ubuntu下面的背光键盘的使用
- 函数使用了堆栈的字节超过_Go语言复习笔记——基本语法三之堆栈与字符串应用...
- 14种模式解决面试算法编程题(PART II)
- C#之Xml去掉前面的空格
- 动产抵押物监控系统/金融抵押监控系统设计与实现
- 演示:扩展ACL的配置与应用技巧
- nodejs后台系列--第二篇--使用Navicat来创建数据库
- 疯狂Java讲义笔记汇总
- dos批处理脚本自动添加网络共享打印机-简单版且亲测可用
- u盘在电脑上读不出来,修复u盘插入电脑无法读取
- 【物联网】物联网开发从入门到精通
- Android城市列表
- 遇到oracle错误1445,sql附加数据库失败
- python腐蚀膨胀代码_Python图像处理--膨胀与腐蚀
- linux 怎么卸载gnome-screenshot,史上最全的使用 gnome-screenshot 获取屏幕快照指南