consumers允许使用者更轻松地编写ASGI应用。主要实现如下功能:
将代码结构搭建为针对事件的函数,而不是一整套事件循环;
通过同步或异步的代码处理线程切换。
consumer必须是channels.consumer.AsyncConsumerchannels.consumer.SyncConsumer的子类,用于编写异步或同步的代码。
一个简单的同步consumer示例:

from channels.consumer import SyncConsumerclass EchoConsumer(SyncConsumer):def websocket_connect(self, event):self.send({"type": "websocket.accept",})def websocket_receive(self, event):self.send({"type": "websocket.send","text": event["text"],})

上面定义了一个WebSocket回显服务器——他会接收所有连接进来的WebSocket连接,并回复相同的内容。
consumer围绕着一系列命名的方法构建起来,这些方法与他们要接受的信息的type值相对应,这些方法名称中的’.‘由’_'替换。
ASGI WebSocket规范指导Channels的WebSocket的搭建,并通过路由检查一个websocket的scope的类型,由此确定了接受的event类型以及以及event中含有的键值参数。
self.send(event)接口用于向客户端或协议服务器返回event。
异步consumer的搭建与同步类似,时是内部的方法定义要改为异步协程:

from channels.consumer import AsyncConsumerclass EchoConsumer(AsyncConsumer):async def websocket_connect(self, event):await self.send({"type": "websocket.accept",})async def websocket_receive(self, event):await self.send({"type": "websocket.send","text": event["text"],})

默认推荐使用同步consumer,尤其是调用Django ORM或其他同步程序时,以保持整个consumer在单个线程中并避免ORM查询阻塞整个event。
在异步consumer中调用同步consumer需要asgiref.sync.sync_to_async,该组件用于在线程池中运行同步consumer,将同步consumer作为异步协同程序调用。
在异步consumer中调用Django ORM,可以使用database_sync_to_async

关闭consumer

当连接到consumer的链接关闭时,服务器会收到一个相应的event(比如,http.disconnect或websocket.disconnect),应用接受后需要作相应的处理。处理完成后,应当触发channels.exceptions.StopConsumer以彻底中止ASGI应用。如果不出发并任由应用运行,则服务器会在达到应用关闭时限后(Daphne默认10秒),结束应用并触发警告。
通用型consumer会主动完成以上操作,只有基于AsyncConsumer或``SyncConsumer````自定义consumer时需要注意这些。

通信层

consumer可以通过通信层实现一对一的信息交换或通过groups广播到整个系统。cosumer会调用default通信层,也可以通过channel_layer_alias参数自定义通信层名称:

from channels.consumer import SyncConsumerclass EchoConsumer(SyncConsumer):channel_layer_alias = "echo_alias"

Scope

consumer在初始化时会接受链接的scope,类似于Django中的request对象,可以通过self.scope查看。
scope也是ASGI规范的一部分,有如下常用信息:
scope["path"],请求的路径;
scope["headers"],请求头信息,以键值对的形式返回;
scope["method"],请求的方法(仅HTTP可用)

通用型consumer(GENERIC CONSUMERS)

Channel将常用的函数封装为generic views,类似于Django中的generic.view

WebsocketConsumer

通过channels.generic.websocket.WebsocketConsumer使用,WebsocketConsumer将原始的ASGI消息的收发进行封装,用户仅需处理简单的文本或二进制字符:

from channels.generic.websocket import WebsocketConsumerclass MyConsumer(WebsocketConsumer):groups = ["broadcast"]def connect(self):# 在收到连接时调用# 接受连接:self.accept()# 也可以在接受连接的同时指定子协议# 通过self.scope['subprotocols']访问由客户端指定的子协议列表self.accept("subprotocol")# 拒绝连接则调用:self.close()def receive(self, text_data=None, bytes_data=None):# 调用时需要传入文本或二进制文本# 正常调用:self.send(text_data="Hello world!")# 发送二进制框架:self.send(bytes_data="Hello world!")# 想要强制关闭连接则调用:self.close()# 或添加自定义的WebSocket的错误码:self.close(code=4123)def disconnect(self, close_code):# 套接字关闭时调用

可以在connect方法中触发channels.exceptions.AcceptConnectionchannels.exceptions.DenyConnection以接受或拒绝一个连接。
如果在类属性groups中定义了群组,那么WebSocket的频道会被自动加入或移出这些群组。group属性必须是可迭代对象,同时必须设置支持群组的通信层(channels.layers.InMemoryChannelLayerchannels_redis.core.RedisChannelLayer)作为channel backend。

AsyncWebsocketConsumer

通过channels.generic.websocket.AsyncWebsocketConsumer,其功能和特征与WebSocketConsume完全一样,只是异步实现:

from channels.generic.websocket import AsyncWebsocketConsumerclass MyConsumer(AsyncWebsocketConsumer):groups = ["broadcast"]async def connect(self):# Called on connection.# To accept the connection call:await self.accept()# Or accept the connection and specify a chosen subprotocol.# A list of subprotocols specified by the connecting client# will be available in self.scope['subprotocols']await self.accept("subprotocol")# To reject the connection, call:await self.close()async def receive(self, text_data=None, bytes_data=None):# Called with either text_data or bytes_data for each frame# You can call:await self.send(text_data="Hello world!")# Or, to send a binary frame:await self.send(bytes_data="Hello world!")# Want to force-close the connection? Call:await self.close()# Or add a custom WebSocket error code!await self.close(code=4123)async def disconnect(self, close_code):# Called when the socket closes

JsonWebsocketConsumer

通过channels.generic.websocket.JsonWebsocketConsumer使用,这个consumer会自动对WebSocket文本框进行JSON编解码。
receive_json方法接受唯一的参数content,该方法会解码JSON对象;
send_json方法同样接受唯一参数content,用于编码JSON对象。
也可以重写encode_jsondecode_json类方法自定义JSON的编解码过程。

AsyncJsonWebsocketConsumer

异步版本的JsonWebsocketConsumer,通过channels.generic.websocket.AsyncJsonWebsocketConsumer使用,其中endoce_jsondecode_json都是异步函数。

AsyncHttpConsumer

通过channels.generic.http.AsyncHttpConsumer使用,提供了基础的HTTP处理工具:

from channels.generic.http import AsyncHttpConsumerclass BasicHttpConsumer(AsyncHttpConsumer):async def handle(self, body):await asyncio.sleep(10)await self.send_response(200, b"Your response bytes", headers=[(b"Content-Type", b"text/plain"),])

handle方法接受二进制形式的完整的请求体,请求头以元组组成的列表或字典形式传递。应答内容同样需要二进制形式。
也可以通过send_headerssend_body方法分别定义应答内容。

import json
from channels.generic.http import AsyncHttpConsumerclass LongPollConsumer(AsyncHttpConsumer):async def handle(self, body):await self.send_headers(headers=[(b"Content-Type", b"application/json"),])# Headers are only sent after the first body event.# Set "more_body" to tell the interface server to not# finish the response yet:await self.send_body(b"", more_body=True)async def chat_message(self, event):# Send JSON and finish the response:await self.send_body(json.dumps(event).encode("utf-8"))

也可以通过这个类定义ServerSendEvents:

from datetime import datetime
from channels.generic.http import AsyncHttpConsumerclass ServerSentEventsConsumer(AsyncHttpConsumer):async def handle(self, body):await self.send_headers(headers=[(b"Cache-Control", b"no-cache"),(b"Content-Type", b"text/event-stream"),(b"Transfer-Encoding", b"chunked"),])while True:payload = "data: %s\n\n" % datetime.now().isoformat()await self.send_body(payload.encode("utf-8"), more_body=True)await asyncio.sleep(1)

Channels中的Counsumer,消费者相关推荐

  1. 彻底理解kafka中partition和消费者对应关系

    1个partition只能被同组的一个consumer消费,同组的consumer则起到均衡效果 消费者多于partition topic: test 只有一个partition 创建一个topic- ...

  2. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  3. Disruptor框架中生产者、消费者的各种复杂依赖场景下的使用总结-我见过最好的Disruptor

    更多高并发知识请访问 www.itkc8.com 非常感谢 https://www.cnblogs.com/pku-liuqiang/p/8544700.html Disruptor是一个优秀的并发框 ...

  4. Java中生产者与消费者问题的演变

    想要了解更多关于Java生产者消费者问题的演变吗?那就看看这篇文章吧,我们分别用旧方法和新方法来处理这个问题. 生产者消费者问题是一个典型的多进程同步问题. 对于大多数人来说,这个问题可能是我们在学校 ...

  5. java中的生产者消费者模式详解

    方式 一: Synchronized方式 注:此种方式会造成资源的浪费: 利用锁的notifyAll()方法会将所有的线程都唤醒,会造成资源的浪费 class Resource {private St ...

  6. Java中生产者和消费者总结

    生产者和消费者问题是线程模型中的经典问题,生产者和消费者在同一时间段共用同一个存储空间,这个存储空间是一个缓冲区的仓库,生产者可以将产品放入仓库,消费者可以从仓库中取出产品. 生产者/消费者模型是基于 ...

  7. RabbitMQ中的生产者消费者与订阅发布者两种模式

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件).AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进 ...

  8. Java中的生产消费者问题

    1 package day190109; 2 3 import java.util.LinkedList; 4 import java.util.Queue; 5 import java.util.R ...

  9. day20Java-Thread-多线程中生产者和消费者

    博客 Java-(高级) 文章目录 多线程生产者和消费者 多线程生产者消费者代码版本1 多线程生产者消费者代码版本2-同步解决问题 多线程生产者消费者代码版本3-等待唤醒机制解决问题 多线程生产者消费 ...

最新文章

  1. parallel循环java_Java 8 lambda stream forEach parallel 等循环与Java 7 for each 循环耗时测试...
  2. java 计算运算表达式_java字符串运算表达式的计算
  3. 计算机组成原理重要知识,计算机组成原理重要知识点解析
  4. DOS命令行中用MAVEN构建Java和Java Web项目
  5. 一站式快速自助建站-超低价0代码建站套餐助你轻松拥有自己的网站
  6. pb 动态改变DW的WHERE子句
  7. 冬季,拿什么来温暖你的心情
  8. linux kernel 下的hash 和链表 应用
  9. 域名,端口,IP总结
  10. QUIC 将会是 WebRTC 的未来么?
  11. Fiddler中文乱码解决方法
  12. day12摇色子游戏--笔记
  13. jmeter常见问题总结
  14. 七种PDF转Excel的转换方法,分分钟提高你的工作效率
  15. 自动化测试、自动化测试框架和云测试相关论文列表
  16. JavaScript常用的工具函数,不全面大家补充哦
  17. 瑞吉外卖(1)环境搭建
  18. null不可以toString
  19. 5招在不添加内存、显卡、ssd前提下有效提升windows系统pc性能
  20. 我爱自然语言处理网文章汇总

热门文章

  1. 一束花送给MyLove:Fany~
  2. 关于Accessory
  3. Android下USB Accessory的实现分析 (三)--- Android Open AccessoryProtocol
  4. Lingo运输+选址问题
  5. Android平台的一些常用命令
  6. python wraps_Python functools.wraps 深入理解
  7. Ubuntu安装QT5
  8. QT4程序在QT5环境编译运行
  9. python汉明距离检索_汉明距离(Python3)
  10. java计算自然数对数_JavaScript用Math.log()计算一个数的自然对数