文章目录

  • WebSocket协议
  • 服务端开发
  • 客户端
  • 运行
  • 测试全双工--netty服务端主动推送数据
    • 定时器推送消息
    • 维护channel关系
    • 运行

注:更多netty相关文章请访问博主专栏: netty专栏

WebSocket协议

一般web应用都是使用的HTTP协议。HTTP协议有以下特点:

  • 支持客户端-服务端模式
  • 使用简单:只需要知道服务端URL,携带参数发送请求即可
  • 支持多种传输数据类型,由消息头中content-type标识
  • 无状态,使得HTTP服务轻量级

HTTP协议也存在一些缺点:

  • 半双工通信:同一时刻,数据只能往同一方向传输。比如向服务器发送消息时,服务器此时不可以向客户端发送消息。(不过目前HTTP2已经支持了全双工通信)
  • HTTP消息冗长:HTTP消息包含消息头,消息体,换行符等。且大多采用文本传输。所以HTTP消息会有很多冗余消息并且消息占用字节数大,消耗过多的带宽
  • 针对服务器推送的黑客攻击,如长时间轮询

因此在一些场景使用WebSocket来实现更加妥当,比如需要全双工通信的需求(聊天室、消息推送)。WebSocket有以下特点:

  • 底层采用单一TCP链接,全双工通信
  • 对代理、防火墙、路由器透明
  • 无头部消息,cookie,身份验证
  • 无安全开销
  • 通过ping/pong消息保持心跳

WebSocket稳定性很高,它实质是一个TCP链接,且支持全双工。WebSocket是当下必学知识点。

服务端开发

package com.example;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;/*** netty  http 文件下载 服务器*/
public class MyWebSocketServer {int port;public MyWebSocketServer(int port) {this.port = port;}public void start() {ServerBootstrap bootstrap = new ServerBootstrap();EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup work = new NioEventLoopGroup();try {bootstrap.group(boss, work).handler(new LoggingHandler(LogLevel.DEBUG)).channel(NioServerSocketChannel.class).childHandler(new WebSocketServerInitializer());ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();System.out.println("http server started . port : " + port);f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {boss.shutdownGracefully();work.shutdownGracefully();}}public static void main(String[] args) {MyWebSocketServer server = new MyWebSocketServer(8080);// 8081为启动端口server.start();}
}class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new HttpServerCodec())// http 编解器// http 消息聚合器  512*1024为接收的最大contentlength.addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))// 支持异步发送大的码流(大的文件传输),但不占用过多的内存,防止java内存溢出.addLast("http-chunked", new ChunkedWriteHandler()).addLast(new WebSocketRequestHandler());// 请求处理器}
}class WebSocketRequestHandler extends SimpleChannelInboundHandler<Object> {private WebSocketServerHandshaker handshaker;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("收到消息:" + msg);if (msg instanceof FullHttpRequest) {//以http请求形式接入,但是走的是websockethandleHttpRequest(ctx, (FullHttpRequest) msg);} else if (msg instanceof WebSocketFrame) {//处理websocket客户端的消息handlerWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}/*对WebSocket请求进行处理*/private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {// 判断是否关闭链路的指令if (frame instanceof CloseWebSocketFrame) {handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());return;}// 判断是否ping消息,如果是,则构造pong消息返回。用于心跳检测if (frame instanceof PingWebSocketFrame) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}// 本例程仅支持文本消息,不支持二进制消息if (!(frame instanceof TextWebSocketFrame)) {System.out.println("本例程仅支持文本消息,不支持二进制消息");throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));}//处理客户端请求并返回应答消息String request = ((TextWebSocketFrame) frame).text();System.out.println("服务端收到:" + request);SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");JSONObject jsonObject = new JSONObject();jsonObject.put("time",format.format(new Date()));jsonObject.put("channelId",ctx.channel().id().asShortText());jsonObject.put("request",request);TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());// 返回【谁发的发给谁】ctx.channel().writeAndFlush(tws);}/*** 唯一的一次http请求。* 该方法用于处理websocket握手请求*/private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {//如果HTTP解码失败,返回异常。要求Upgrade为websocket,过滤掉get/Postif (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {//若不是websocket方式,则创建BAD_REQUEST(400)的req,返回给客户端sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}//        构造握手响应返回,本机测试WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);//通过工厂来创建WebSocketServerHandshaker实例handshaker = wsFactory.newHandshaker(req);if (handshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {/*通过WebSocketServerHandshaker来构建握手响应消息返回给客户端。同时将WebSocket相关编解码类添加到ChannelPipeline中,该功能需要阅读handshake的源码。*/handshaker.handshake(ctx.channel(), req);}}/*** 拒绝不合法的请求,并返回错误信息*/private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req, DefaultFullHttpResponse res) {// 返回应答给客户端if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);res.content().writeBytes(buf);buf.release();HttpUtil.setContentLength(res, res.content().readableBytes());}ChannelFuture f = ctx.channel().writeAndFlush(res);// 如果是非Keep-Alive,关闭连接if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE);}}
}

注释都已经写好,这里需要注意的几点是:

  • netty的websocket协议是在HTTP协议基础之上完成的,要使用WebSocket协议,需要将HTTP请求头中添加Upgrade:WebSocket
  • WebSocket相关的编解码是在handshaker.handshake(ctx.channel(), req);中添加进去的。handshaker是WebSocketServerHandshaker的对象。handshake方法中建握手响应消息返回给客户端。同时将WebSocket相关编解码类添加到ChannelPipeline中。

handshake源码:

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,HttpHeaders responseHeaders, final ChannelPromise promise) {if (logger.isDebugEnabled()) {logger.debug("{} WebSocket version {} server handshake", channel, version());}//构造握手响应FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
//下面将channelpipeline中的HttpObjectAggregator和HttpContentCompressor移除,并且添加WebSocket编解码器newWebSocketEncoder和newWebsocketDecoderChannelPipeline p = channel.pipeline();if (p.get(HttpObjectAggregator.class) != null) {p.remove(HttpObjectAggregator.class);}if (p.get(HttpContentCompressor.class) != null) {p.remove(HttpContentCompressor.class);}ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);final String encoderName;if (ctx == null) {// this means the user use an HttpServerCodecctx = p.context(HttpServerCodec.class);if (ctx == null) {promise.setFailure(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));return promise;}p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = ctx.name();} else {p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = p.context(HttpResponseEncoder.class).name();p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());}
//将response消息返回给客户端channel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {ChannelPipeline p = future.channel().pipeline();p.remove(encoderName);promise.setSuccess();} else {promise.setFailure(future.cause());}}});return promise;
}

客户端

客户端采用HTML实现,支持WebSocket的浏览器都可以,如果不支持会提示ERROR:您的浏览器不支持WebSocket!!
客户端的功能是想服务器发送请求消息,服务器返回服务器时间以及channelID。

<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"><title>my websocket client</title>
</head>
<body>
<textarea id="msgBoxs"></textarea><br>
待发送消息`:<input type="text" id="msg">
<input type="button" id="sendBtn" onclick="send()" value="发送">
<script type="application/javascript">var socket ;if(!window.WebSocket){window.WebSocket = window.MozWebSocket;}if(window.WebSocket){var msgBoxs = document.getElementById("msgBoxs")var msgBox = document.getElementById("msg")socket = new WebSocket("ws://localhost:8080/websocket")socket.onopen = function (evt) {console.log("Connection open ...");socket.send("Hello WebSocket!");}socket.onmessage = function (evt) {console.log("Received Message: ", evt.data)msgBoxs.value =  msgBoxs.value + "\n" + evt.data}socket.onclose = function (evt) {console.log("Connect closed.");}}else{alert("ERROR:您的浏览器不支持WebSocket!!");}function send() {var msg = msgBox.valuesocket.send(msg)//msgBox.value = ""}</script>
</body>
</html>

运行

启动服务端。然后用浏览器打开上面的HTML。如果你的浏览器支持WebSocket,那么会出现以下界面:


上图中将请求与响应的关键点标识出来了:Upgrade: websocket
再看一下服务端输出:

http server started . port : 8080
收到消息:HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
GET /websocket HTTP/1.1
Host: localhost:8080
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36
Upgrade: websocket
Origin: file://
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7
Sec-WebSocket-Key: hoOY0UnvE6Hcf3QIl3nP+A==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
content-length: 0
收到消息:TextWebSocketFrame(data: PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
服务端收到:Hello WebSocket!
收到消息:TextWebSocketFrame(data: PooledUnsafeDirectByteBuf(ridx: 0, widx: 3, cap: 3))
服务端收到:111

测试全双工–netty服务端主动推送数据

上面的实现还是请求应答模式。接下来让服务器每一秒主动推送一次服务器时间。

这种应用场景非常多,比如手机APP的消息推送,或者聊天室群发消息等。

定时器推送消息

这里定时推送采用的是定时器来操作的。在main方法中添加了定时方法:

public void pushMsg(){//模拟异步发送推送消息ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);scheduledThreadPool.scheduleWithFixedDelay(() -> {TextWebSocketFrame tws = new TextWebSocketFrame("服务器主动推送消息。当前服务器时间:"+System.currentTimeMillis());// 群发ChannelSupervise.send2All(tws);}, 0,1, TimeUnit.SECONDS);}public static void main(String[] args) {MyWebSocketServer server = new MyWebSocketServer(8080);// 8081为启动端口server.pushMsg();server.start();}

维护channel关系

这一步是关键点,服务器端需要维护创建连接的channel的集合,保存客户端链接,用于消息的推送。
当有客户端连接时候会被SimpleChannelInboundHandler中的channelActive监听到,当断开时会被channelInactive监听到,一般在这两个方法中去保存/移除客户端的通道信息。
channel信息保存在ChannelGroup中,ChannelGroup是netty提供用于管理web于服务器建立的通道channel的,其本质是一个高度封装的set集合,在服务器广播消息时,可以直接通过它的writeAndFlush将消息发送给集合中的所有通道中去。
但在查找某一个客户端的通道时候比较坑爹,必须通过channelId对象去查找,而channelId不能人为创建,所有必须通过map将channelId的字符串和channel保存起来。

代码修改如下:

//添加ChannelSupervise类用于保存客户端channel信息
class ChannelSupervise {/*** ChannelGroup是netty提供用于管理web于服务器建立的通道channel的,* 其本质是一个高度封装的set集合,在服务器广播消息时,* 可以直接通过它的writeAndFlush将消息发送给集合中的所有通道中去*/private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** ChannelMap维护的是channelID和channel的对应关系,用于向指定channel发送消息*/private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap<>();public static void addChannel(Channel channel) {GlobalGroup.add(channel);ChannelMap.put(channel.id().asShortText(), channel.id());}public static void removeChannel(Channel channel) {GlobalGroup.remove(channel);ChannelMap.remove(channel.id().asShortText());}//找到某个channel来发送消息public static Channel findChannel(String id) {return GlobalGroup.find(ChannelMap.get(id));}public static void send2All(TextWebSocketFrame tws) {GlobalGroup.writeAndFlush(tws);}
}//WebSocketRequestHandler对其进行修改,在监听到链接创建于销毁时,维护channel信息
class WebSocketRequestHandler extends SimpleChannelInboundHandler<Object> {//省略代码//添加下面channelActive和channelInactive方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//添加连接System.out.println("客户端加入连接:" + ctx.channel());ChannelSupervise.addChannel(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {//断开连接System.out.println("客户端断开连接:" + ctx.channel());ChannelSupervise.removeChannel(ctx.channel());}//代码省略

客户端HTML页面无需修改。

运行

启动服务器端。然后重新刷新客户端页面,可以看到服务器端会多打印一行日志:

客户端加入连接:[id: 0x3670424c, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62458]

页面上会每隔1秒收到服务器的推送消息,页面上依旧可以发送消息,然后服务器端返回值会输出到文本框中:

注:更多netty相关文章请访问博主专栏: netty专栏

netty实现WebSocket协议相关推荐

  1. 基于netty的websocket协议实现

    基于netty的websocket协议实现 背景 1.启动服务端 2.测试服务端和客户端效果 背景 项目中使用到了websocket,所以查阅相关资料,完成了一个基于netty的websocket的实 ...

  2. netty 游戏服务器框图_基于Netty和WebSocket协议实现Web端自动打印订单服务方法与流程...

    本发明涉及电子商务技术领域,尤其涉及一种基于netty和websocket协议实现web端自动打印订单服务方法. 背景技术: 电子商务是以信息网络技术为手段,以商品交换为中心的商务活动:也可理解为在互 ...

  3. [源码和文档分享]基于Netty和WebSocket的Web聊天室

    一.背景 伴随着Internet的发展与宽带技术的普及,人们可以通过Internet交换动态数据,展示新产品,与人进行沟通并进行电子商务贸易.作为构成网站的重要组成部分,留言管理系统为人们的交流提供了 ...

  4. 京东到家基于netty与websocket的实践

    作者:李天翼,软件开发工程师,任职于达达京东到家后台研发团队,负责订单流程的开发工作. 背景 在京东到家商家中心系统中,商家提出在 Web 端实现自动打印的需求,不需要人工盯守点击打印,直接打印小票, ...

  5. netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统

    https://segmentfault.com/a/1190000017464313 netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统 结构 netty 作为服 ...

  6. Netty(3)之WebSocket协议开发时间服务器

    WebSocket协议开发 1. Http协议弊端 半双工协议:同一时刻,只有一个方向上的数据传送(客户端 --> 服务端 或者 服务端 --> 客户端) 消息冗长繁琐 针对服务器推送的黑 ...

  7. 基于netty实现一个简单的支持http和webSocket协议的的服务器(含xxl-job通信模块源码分析)

    文章目录 背景 依赖 包结构 实现 WebSocketServer 业务handler WebSocketServerHandler 测试 xxl-job 源码中基于netty实现的http 总结 参 ...

  8. Netty权威指南之Websocket协议开发

    本章主要学习内容如下: 1.HTTP协议弊端 2.WebSocket入门 3.Netty WebSocket协议开发 第一节:HTTP协议弊端 将HTTP协议的主要弊端总结如下: 1.HTTP协议为半 ...

  9. Netty 实现 websocket

    现在网上网站为了实现推送基本都采用轮询的方式,比较新的轮询技术是comet,采用ajax,但是还是得发送请求,为了解决html效率低下的问题,html5定义了websocket协议. 服务端代码: i ...

最新文章

  1. Dev-C++安装和使用教程(手把手傻瓜式教学)
  2. npm run dev 报错echarts
  3. 定时自动启动任务crontab命令用法
  4. 小程序影藏溢出的gif_ScreenToGif:一款小巧实用动图gif制作神器
  5. 【线性dp】【决策优化】CH5E02
  6. MySQL分库分页_MySQL分库分表的分页查询解决方案
  7. xgboost通俗_【通俗易懂】XGBoost从入门到实战,非常详细
  8. 生成三角网算法java,一种低效但逻辑简单清晰的Delaunay三角网生成算法
  9. plsql 常用函数
  10. Android Studio真机调试,数据库sqllite时,Multiple dex files define Landroid/support/v4/R 问题...
  11. 【C语言】如何判断一个数字是否为素数(质数)?
  12. 格式工厂高清视频转换参数设置
  13. Java验证输入邮箱格式是否正确
  14. android耳机检测驱动程序,USB 音频 CTS 验证程序测试
  15. matlab处理各种数据、文件
  16. 背代码可以学好编程吗?下面的回答看的我一脸懵逼!
  17. 第一个小项目——坦克大战
  18. 药店微信小程序的功能
  19. ***没有规则可以创建“XXX”需要的目标“XXX”问题的解决方案
  20. 1109 擅长C – PAT乙级真题

热门文章

  1. 西门子PLC用TIA博途SCL语言写的一个产生随机实数的指令块(学习1)
  2. 在一个200美元的工业供电装置内,我发现了内幕......
  3. 七牛切片视频php,音视频切片
  4. 大牛服务器超时位置模拟失败,大牛模拟定位掉线怎么办 | 手游网游页游攻略大全...
  5. 天猫店群是什么意思,天猫店群是怎么赚钱的?优势都有哪些?
  6. 读《80前的前辈,你们都做了什么?!》一文有感。
  7. vue实现轮播图代码
  8. Qt无故卡死关闭解决办法
  9. Selenium常用操作——关闭页面和浏览器
  10. 关于Unity刷新帧率低的问题