最近根据工作需要研究了下 WebSocket 服务,想使用 Java 搭建一个服务来试试,查了些资料,总结以下四种实现方式。
下面通过代码以及说明如何使用不同的方式简单实现以下几个功能:

  • 可启动 Socket 服务,并等待客户端连接。
  • 可启动 Socket 客户端,并连接服务端,同时监听控制台输入文本,发送到服务端。
  • 服务端接受到消息展示输出,同时返回指定消息给当前客户端。
  • 服务端可广播消息到不同客户端。

1、 基于 JDK io 包做实现

服务端基本流程是:通过 java.net.ServerSocket创建服务对象,监听指定端口,然后通取与客户端连接的 Socket对象,再从 Socket 对象中通过 InputStreamOutputStream来实现接受消息和发送消息。
客户端基本流程是: 通过指定IP和端口初始化 Socket 对象来连接服务器,同时也是通过 Socket 对象中的 InputStreamOutputStram 来实现接受消息和发送消息。

简单的例子代码如下:
服务端

public class Server {public static class MsgServer implements Runnable {private ServerSocket serverSocket;private List<Socket> allClients = new ArrayList<>();public MsgServer() throws IOException {this.serverSocket = new ServerSocket(7777);}@Overridepublic void run() {try {while (true) {Socket socket = serverSocket.accept();System.out.println("有新的链接..");allClients.add(socket);new Thread(new IoSocketHandler(socket, allClients)).start();}} catch (IOException e) {e.printStackTrace();}}}public static class IoSocketHandler implements Runnable {private Socket socket;private List<Socket> allClients;public IoSocketHandler(Socket socket, List<Socket> allClients) {this.socket = socket;this.allClients = allClients;}@Overridepublic void run() {try {BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "utf-8"));PrintWriter out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "utf-8"));while (true) {String msg = br.readLine();if (!ObjectUtils.isEmpty(msg)) {System.out.println("收到消息:" + msg);String response = "服务器成功收到消息:" + msg;out.println(response);out.flush();if ("exit".equals(msg)) {System.out.println("客户端断开链接");socket.close();break;}if (msg.startsWith("all:")) {allClients.forEach(s -> {if (!socket.equals(s)) {System.out.println("发送给用户: " + allClients.indexOf(s));try {PrintWriter otherClient = new PrintWriter(new OutputStreamWriter(s.getOutputStream(), "utf-8"));otherClient.println(msg);otherClient.flush();} catch (IOException e) {e.printStackTrace();}}});}}}} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {new Thread(new MsgServer()).start();}}

说明:

  • MsgServer 构造方法创建 ServerSocket 对象, run 方法中 Socket socket = serverSocket.accept(); 会阻塞,当有客户端连接时,可以得到 Socket 对象,然后新创建子线程来处理该连接。
  • 子线程循环执行尝试从 SocketInputStream 中读取内容,通过 OutputStream 输出给客户端。
  • 每个客户端连接时将对应的 Socket 维护起来,便于服务端向多个客户端推送消息。
  • 每一个客户端连接时,在服务端都会新启动一个线程来维持连接。

客户端


public class Client {public static class MsgClient {private Socket socket;private String senderName;private MsgReceiver receiver;public MsgClient(String senderName) throws IOException {this.socket = new Socket("127.0.0.1", 7777);this.receiver = new MsgReceiver(socket);this.senderName = senderName;}public void start() {new Thread(receiver).start();}public void sendMsg(String msg) throws IOException {PrintWriter out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "utf-8"));// msg += ("--来自" + senderName);out.println(msg);out.flush();}}public static class MsgReceiver implements Runnable {private Socket socket;public MsgReceiver(Socket socket) {this.socket = socket;}@Overridepublic void run() {try {BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "utf-8"));while (true) {String msg = br.readLine();if (!ObjectUtils.isEmpty(msg)) {System.out.println("客户端收到消息:" + msg);}}} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {MsgClient client = new MsgClient("张三");client.start();Scanner sc = new Scanner(System.in);while (true) {String str = sc.nextLine();client.sendMsg(str);}}
}

说明:

  • 通过 Socket 对象连接服务端,使用 InputStream 获取服务端推送的数据。

2、 基于 JDK nio 包做实现

使用 nio 实现,与 io 的区别,主要是操作对象是 ChannelSelector 而不再是 Socket 。服务端需要创建 ServerSocketChannel, 并将其与Selector 绑定,该 Selector 会管理服务端与客户端连接的 Channel 。客户端则使用 SocketChannel 来与服务端连接。

服务端

public class Server {public static class NioServer implements Runnable {List<SocketChannel> allChannel = new ArrayList<>();@Overridepublic void run() {try {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(7777));//设置为非阻塞模式serverSocketChannel.configureBlocking(false);//为serverChannel注册selectorSelector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Nio Socket 启动完成, 等待连接...");while (true) {selector.select();//获取selectionKeys并处理Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();try {//连接请求if (key.isAcceptable()) {handleAccept(key);}//读请求if (key.isReadable()) {handleRead(key, selector);}} catch (IOException e) {e.printStackTrace();}// 处理完后移除当前使用的keykeyIterator.remove();}}} catch (IOException e) {e.printStackTrace();}}private void handleAccept(SelectionKey key) throws IOException {//获取channelSocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();//非阻塞socketChannel.configureBlocking(false);//注册selectorsocketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(10240));allChannel.add(socketChannel);System.out.println("有新的连接...");}private String handleRead(SelectionKey key, Selector selector) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();String receivedStr = "";if (socketChannel.read(buffer) == -1) {// 没有内容// System.out.println("暂时没有内容");} else {//将channel改为读取状态buffer.flip();//按照编码读取数据receivedStr = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();buffer.clear();System.out.println("收到消息:" + receivedStr);if ("exit".equals(receivedStr)) {socketChannel.shutdownOutput();socketChannel.shutdownInput();socketChannel.close();System.out.println("客户端断开链接");} else {//返回数据给客户端buffer = buffer.put(("服务器返回,收到消息内容 : " + receivedStr).getBytes("UTF-8"));//读取模式buffer.flip();socketChannel.write(buffer);buffer.clear();//注册selector 继续读取数据socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(10240));// 群发if (receivedStr.startsWith("all:")) {for (SocketChannel c : allChannel) {if (c.keyFor(selector).equals(key)) {continue;}buffer = buffer.put(receivedStr.getBytes("UTF-8"));//读取模式buffer.flip();c.write(buffer);buffer.clear();}}}}return receivedStr;}}public static void main(String[] args) {new Thread(new NioServer()).start();}
}

说明:

  • 服务端创建 ServerSocketChannel 需要指定为非阻塞模式,serverSocketChannel.configureBlocking(false);
  • 服务端获取与客户端连接的方式是通过 Selector 循环获取准备好的 SelectionKey 对应的 Channel , 然后根据其状态是 连接请求 还是 读请求 来做对应的处理。因此服务端只需要一个线程。(这就是对应 nio 线程模型中,Selector 的作用,所有 Channel 的 IO 由 Selector 来通知处理,不知道我理解是否准确。
  • SocketChannel 统一都是通过 ByteBuffer 来传输数据,因此要注意 buffer.clear() 的调用,如果不及时请掉 buffer 中的数据,就可能造成数组越界错误。

客户端

public class Client {public static class NioClient {private String senderName;private SocketChannel socketChannel;private MsgReceiver receiver;private static ByteBuffer buffer = ByteBuffer.allocate(1024);public NioClient(String senderName) throws IOException {this.senderName = senderName;this.socketChannel = SocketChannel.open();//连接服务端socketSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 7777);this.socketChannel.connect(socketAddress);this.receiver = new MsgReceiver(socketChannel);}public void start() {new Thread(receiver).start();}public void sendMsg(String msg) {try {//  msg += ("--来自" + senderName);buffer.clear();//向服务端发送消息buffer.put(msg.getBytes());//读取模式buffer.flip();socketChannel.write(buffer);buffer.clear();} catch (IOException e) {e.printStackTrace();}}}public static class MsgReceiver implements Runnable {private SocketChannel socketChannel;public MsgReceiver(SocketChannel socketChannel) {this.socketChannel = socketChannel;}@Overridepublic void run() {ByteBuffer buffer = ByteBuffer.allocate(10240);while (true) {try {int readLenth = socketChannel.read(buffer);//读取模式buffer.flip();byte[] bytes = new byte[readLenth];buffer.get(bytes);System.out.println(new String(bytes, "UTF-8"));buffer.clear();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) throws IOException {NioClient client = new NioClient("张三");client.start();// 监听输入Scanner sc = new Scanner(System.in);while (true) {String str = sc.nextLine();client.sendMsg(str);}}
}

说明:

  • 客户端使用 SocketChannel 来连接,消息传输同样使用 ByteBuffer .

3、 基于 Netty 做实现

Netty 本身就是基于 nio 的服务器框架,使用它做实现时,主要是 API 的使用以及代码结构有区别。

服务端

public class Server {public static class NettyServer implements Runnable {@Overridepublic void run() {//监听线程组,主要是监听客户端请求EventLoopGroup parentGroup = new NioEventLoopGroup();//工作线程组,主要是处理与客户端的数据通讯。EventLoopGroup childGroup = new NioEventLoopGroup();try {//初始化netty服务器,并且开始监听端口的socket请求ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitHandler()).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture channelFuture = serverBootstrap.bind(7777).sync();channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {if (channelFuture1.isSuccess()) {System.out.println("Server bound");} else {System.err.println("Bound attempt failed");channelFuture1.cause().printStackTrace();}});channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}}public static class ChannelInitHandler extends ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//pipeline.addLast(new HttpServerCodec());// pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));//对 String 对象自动编码,属于出站站处理器pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));//把网络字节流自动解码为 String 对象,属于入站处理器//  pipeline.addLast(new HttpObjectAggregator(1024 * 64));  // 64 k// websocket 服务器处理协议 用于指定给客户端连接访问的路由//  pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 消息处理 handlerpipeline.addLast(new ServerMessageHandler());}}public static class ServerMessageHandler extends SimpleChannelInboundHandler<String> {// 用于记录和管理channelprivate static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {// 获取客户端传来的信息String text = msg;// 获取客户端IDString shortId = ctx.channel().id().asShortText();System.out.println("服务端接收到客户端消息,客户端ID:" + shortId + ",消息内容:" + text);// 返回消息给当前客户端ctx.writeAndFlush("服务器成功接收到消息:" + text);if (text.startsWith("all:")) {// 发给群组中其他用户clients.stream().forEach(s -> {if (!shortId.equals(s.id().asShortText())) {s.writeAndFlush("群发:" + text);}});}}/**客户端打开连接* */@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();clients.add(channel);System.out.println("客户端进入,客户端ID:" + channel.id().asShortText());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {clients.remove(ctx.channel());System.out.println("客户端离开,客户端ID:" + ctx.channel().id().asShortText());}}public static void main(String[] args) {NettyServer server = new NettyServer();new Thread(server).start();}}

说明:

  • 服务端使用 ServerBootstrap 启动服务,同样是能通过 SocketChannel 来维持连接。
  • SocketChannel 中有 ChannelPipeline 对象,他是接受处理消息的管道,可自定义消息处理 handler, 如 ServerMessageHandler , 同时 Netty 本身也提供了很多 handler 供实际使用选择,如 StringEncoder 和StringDecoder 。

客户端

public class Client {public static class NettyClient {private String senderName;private Bootstrap bootstrap;private ChannelFuture channelFuture;private SocketChannel socketChannel;public NettyClient(String senderName) {this.senderName = senderName;}public void start(Consumer<Scanner> consumer) throws InterruptedException {EventLoopGroup workerGroup = new NioEventLoopGroup();try {bootstrap = new Bootstrap();bootstrap.group(workerGroup);bootstrap.channel(NioSocketChannel.class);bootstrap.option(ChannelOption.SO_KEEPALIVE, true);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加一个http的编解码器// pipeline.addLast(new HttpClientCodec());// 添加一个用于支持大数据流的支持//pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));//对 String 对象自动编码,属于出站站处理器pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));//把网络字节流自动解码为 String 对象,属于入站处理器// 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response// pipeline.addLast(new HttpObjectAggregator(1024 * 64));pipeline.addLast(new ClientMessageHandler());}});// Start the client.channelFuture = bootstrap.connect("127.0.0.1", 7777).sync();if (channelFuture.isSuccess()) {socketChannel = (SocketChannel) channelFuture.channel();System.out.println("connect server success");}Scanner sc = new Scanner(System.in);consumer.accept(sc);// Wait until the connection is closed.channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}public void sendMsg(String msg) {System.out.println("即将发送:" + msg);this.socketChannel.writeAndFlush(msg);}}public static class ClientMessageHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {// 读取消息String text = msg;// 获取客户端IDString shortId = ctx.channel().id().asShortText();System.out.println("客户端收到消息:" + shortId + ",消息内容:" + text);}}public static void main(String[] args) throws InterruptedException {NettyClient client = new NettyClient("张三");client.start((sc)->{while (true) {String str = sc.nextLine();client.sendMsg(str);}});}
}

说明:

  • 客户端使用 Bootstrap 来连接服务器端,连接之后依旧是得到 SocketChannel.
  • 客户端的 SocketChannel 对应的 pipeline 中也要定义客户端接受消息的 handler, 例子中是 ClientMessageHandler .
  • 客户端 channelFuture.channel().closeFuture().sync(); 代码会阻塞后续的执行,因此例子中的 Scanner 只能通过 Consumer 的方式传递执行。

4、 SpringBoot + webSocket stater

前面的三种方式的实现,服务端都可以直接 main 方法执行,或者启动一个线程来构造 Socket 服务,可以整合在现在常见的 SpringBoot 服务中使用,只不过如果服务只需要实现 Socket 连接的话,服务端会要额外使用一个端口。而 SpringBoot 本身已经有对应整合的 websocket 的依赖,所有的对象管理也都交予 Spring 容器,直接使用会更简单方便。

服务端

  • 添加依赖
      compile 'org.springframework.boot:spring-boot-starter-websocket:2.5.14'
  • 配置 webSocket 的path 以及消息处理handler,同时也可以在registerWebSocketHandlers 方法中指定对应的权限拦截器(这里没有做实现,可查询 HttpSessionHandshakeInterceptor 类的相关内容
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate WebSocketHandler msgHandler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(msgHandler, "/socket").setAllowedOrigins("*");}@Beanpublic WebSocketHandler myHandler() {return new WebSocketMessageHandler();}
}
  • 实现对应的消息 handler
@SuppressWarnings("unchecked")
public class WebSocketMessageHandler extends TextWebSocketHandler {private List<WebSocketSession> clients = new ArrayList<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {clients.add(session);System.out.println("建立连接: " + session.getId());}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) {clients.remove(session);System.out.println("断开连接: " + session.getId());}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {String payload = message.getPayload();System.out.println("接受到的数据" + payload);clients.forEach(s -> {try {System.out.println("发送消息给: " + session.getId());s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));} catch (Exception e) {e.printStackTrace();}});}
}
  • 启动类开启 @EnableWebSocket 注解
@EnableWebSocket
@SpringBootApplication
public class WebsocketServer {public static void main(String[] args) {SpringApplication.run(WebsocketServer.class, args);}
}

客户端
客户端可借助 @ClientEndpoint 注解来做相关实现。偷懒没做…

总结

在 io 与 nio 的实现过程中,还是对两种不同的 I/O 模型有了更直观的理解。Netty 作为一个高效的框架,其特性也很明显,功能扩展性也很好,pipeline对于消息的处理有灵活的操作。不过目前新项目都是基于 SpringBoot 开发,从使用层面上来说,最简单还是直接用 websocket stater 就行。

上述代码有借鉴其他人文章的部分内容,如有版权异议,请联系本人。

“茴”字有几种写法? Java 实现 WebSocket 的方式相关推荐

  1. 茴字的四种写法——浅谈移动前端适配

    1. 什么是前端适配 从UI展现层面上: 我们期望不同尺寸的设备,页面可以自适应的展示或者进行等比缩放,从而在不同的尺寸的设备下看起来协调或者差不多. 从代码实现层面上: 我们希望前端适配可以用用尽可 ...

  2. 茴字的四种写法—移动适配方案的进化

    话说我刚工作的时候,就开始用rem了,过了没多久,接触到了flexible,系统化且支持iOS的retina屏迅速征服了我,最近又看到了大漠大神的vw.所以本文想完成一篇一站式的文章,可以系统的了解前 ...

  3. 编程代码风格之茴字有四种写法

    (养成中午写blog的习惯) 一直以来注意代码风格. 最早的时候使用Visual Basic时,没有花括号{},也不需要;,靠换行和缩进来控制逻辑,遇到复杂的事情时候很着急,及至后来开始深入到C, J ...

  4. 茴字的N种写法 (读《把脉VC++》笔记)

    在Windows编程中有许多方法实现文件写入和读取,下面是<把脉VC++>中介绍的几种. 1.使用Windows API Windows API提供了CreateFile, WriteFi ...

  5. “茴”字有四种写法,this也是一样

    说到这个地方又想起以前高中还是初中学的<孔乙己>这个梗,但是这里的this显然实用性比那个要大很多,哈哈. 简单来说,this有四种应用场景,分别是在构造函数上.对象属性中.普通函数中.c ...

  6. 回字有四种写法,阶乘verilog实现有几种方法?

    回字有四种写法,阶乘verilog实现有几种方法? 方式一:普通方式实现阶乘计算: verilog代码: module tryfact; function[31:0]factorial; input[ ...

  7. 请问“回”字有几种写法?

    我们一行10人在讨论如何提高我们学校毕业生就业竞争力的问题,突然孔乙己用一种挑衅加高傲的眼神问我:"在论坛里看你说话感觉你才高八斗,满腹经纶.现在我有一个问题想请教一下:你知道'回'字有几种 ...

  8. HelloWorld! 程序猿同志,茴香豆的茴字有几种字法?

    挖煤的矿工,久处深矿,不见天日. 一新矿工某日第一次出井, 就要重见天日.正欲抬头睁眼之时, 守在井口的人对他大声吼到: "别睁大眼,慢点出来,千万别睁大眼, 小心太阳光,亮瞎你狗日的眼睛& ...

  9. 回字有四种写法,那你知道单例有五种写法吗

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达今日推荐:2020年7月程序员工资统计,平均14357元,又跌了,扎心个人原创100W+访问量博客:点击前往,查看更多 转自 ...

  10. 【转】回字有四种写法,那你知道单例有五种写法吗

    目录导航 基本介绍 写法介绍 饿汉式 懒汉式 双重检测 内部类 枚举 总结 基本介绍 单例模式(Singleton)应该是大家接触的第一个设计模式,其写法相较于其他的设计模式来说并不复杂,核心理念也非 ...

最新文章

  1. 知识图谱中传统关系抽取方法
  2. 水域大小 Java_水域大小
  3. Vue.js组件化开发实践
  4. datax的工具配置oracle,完全小白级DataX安装配置过程详解
  5. C++使用socket实现进程通信
  6. “智慧城市”建设以前是传说,现在能体验
  7. 【Python3】复制、移动、删除文件及文件夹
  8. Akka网络编程基本介绍
  9. poj 1651区间dp
  10. 解决Eclipse自动补全变量名的问题
  11. indesign软件教程,如何将文本格式保存为样式?
  12. 记录:pycharm的强大之处之两个文件代码的比对
  13. 零基础学SQL(三、MYSQL环境变量配置及启动)
  14. 机器视觉入门 Visual Studio 2015 配置 Opencv3.2
  15. 各种常见3D建模软件比较
  16. Idm在B站没有显示下载按钮
  17. python花瓣飘零_【动态网页】python3爬取花瓣网图片
  18. cloudflare免费证书_新Cloudflare:免费CDN+免费SSL证书轻松搞定https
  19. 台灯有必要买AA级的吗?精选专业护眼的国AA级台灯
  20. MySQL廖雪峰的官方网站

热门文章

  1. Pillow(PIL)入门教程(非常详细)
  2. Excel 如何合并工作簿中多个工作表
  3. 【BZOJ5316】【JSOI2018】绝地反击
  4. 亲爱的,别把上帝缩小了 ---- 读书笔记3
  5. 路径规划-Minimum snap轨迹优化
  6. hive修复分区或修复表 以及msck命令的使用
  7. 一分钟教你搞定chrome的安装---redhat7.6如何安装谷歌浏览器
  8. windows文件权限管理dos命令
  9. windows 内网域电脑无法ntp时间同步
  10. Ubuntu上,使用shell脚本实现鼠标自动点击,打开并设置桌面软件