来源:cnblogs.com/blogtimes/p/14767484.html

一、前言

说实话,写这个玩意儿是我上周刚刚产生的想法,本想写完后把代码挂上来赚点积分也不错。写完后发现这东西值得写一篇文章,授人予鱼不如授人以渔嘛(这句话是这么说的吧),顺便赚点应届学生MM的膜拜那就更妙了。然后再挂一个收款二维码,一个人1块钱,一天10000个人付款,一个月30万,一年360万。。。可了不得了,离一个亿的小目标就差几十年了。

好了,不说梦话了,现在我们回到现实中,这篇博文如果能有>2个评论,我后续会再出一个Netty相关的专栏。否则,就不出了。有人会好奇,为什么把阈值定义成>2呢?不为什么,因为我肯定会先用我媳妇儿的号留个言,然后用自己的号留个言。

好了,废话不多说了,后面还有好多事儿呢,洗菜、做饭、刷碗、跪搓衣。。。好了,言归正传吧。

二、最终效果

为什么先看最终效果?因为此刻代码已经撸完了。更重要的是我们带着感官的目标去进行后续的分析,可以更好地理解。标题中提到了,整个工程包含三个部分:

1、聊天服务器

聊天服务器的职责一句话解释:负责接收所有用户发送的消息,并将消息转发给目标用户。

聊天服务器没有任何界面,但是却是IM中最重要的角色,为表达敬意,必须要给它放个效果图:

2021-05-11 10:41:40.037  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700900029,"messageType":"99"}
2021-05-11 10:41:50.049  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.n.handler.BussMessageHandler     : 收到消息:{"time":1620700910045,"messageType":"14","sendUserName":"guodegang","recvUserName":"yuqian","sendMessage":"于老师你好"}
2021-05-11 10:41:50.055  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.executor.SendMsgExecutor   : 消息转发成功:{"time":1620700910052,"messageType":"14","sendUserName":"guodegang","recvUserName":"yuqian","sendMessage":"于老师你好"}
2021-05-11 10:41:54.068  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700914064,"messageType":"99"}
2021-05-11 10:41:57.302  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.n.handler.BussMessageHandler     : 收到消息:{"time":1620700917301,"messageType":"14","sendUserName":"yuqian","recvUserName":"guodegang","sendMessage":"郭老师你好"}
2021-05-11 10:41:57.304  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.executor.SendMsgExecutor   : 消息转发成功:{"time":1620700917303,"messageType":"14","sendUserName":"yuqian","recvUserName":"guodegang","sendMessage":"郭老师你好"}
2021-05-11 10:42:05.050  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700925049,"messageType":"99"}
2021-05-11 10:42:12.309  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700932304,"messageType":"99"}
2021-05-11 10:42:20.066  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700940050,"messageType":"99"}
2021-05-11 10:42:27.311  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700947309,"messageType":"99"}
2021-05-11 10:42:35.070  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700955068,"messageType":"99"}
2021-05-11 10:42:42.316  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700962312,"messageType":"99"}
2021-05-11 10:42:50.072  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700970071,"messageType":"99"}
2021-05-11 10:42:57.316  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700977315,"messageType":"99"}

从效果图我们看到了一些内容:收到心跳包、收到消息,转发消息,这些内容后面会详细讲解。

2、聊天客户端

聊天客户端的职责一句话解释:登陆,给别人发聊天内容,收其它人发给自己的聊天内容。

下面为方便演示,我会打开两个客户端,用两个不同用户登陆,然后发消息。

3、Web管理控制台

目前只做了一个账户管理,具体看图吧:

三、需求分析

无(见第二章节)。

四、概要设计

1、技术选型

1)聊天服务端  

聊天服务器与客户端通过TCP协议进行通信,使用长连接、全双工通信模式,基于经典通信框架Netty实现。

那么什么是长连接?顾名思义,客户端和服务器连上后,会在这条连接上面反复收发消息,连接不会断开。与长连接对应的当然就是短连接了,短连接每次发消息之前都需要先建立连接,然后发消息,最后断开连接。显然,即时聊天适合使用长连接。

那么什么又是全双工?当长连接建立起来后,在这条连接上既有上行的数据,又有下行的数据,这就叫全双工。那么对应的半双工、单工,大家自行百度吧。

2)Web管理控制台

Web管理端使用SpringBoot脚手架,前端使用Layuimini(一个基于Layui前端框架封装的前端框架),后端使用SpringMVC+Jpa+Shiro。

3)聊天客户端

使用SpringBoot+JavaFX,做了一个极其简陋的客户端,JavaFX是一个开发Java桌面程序的框架,本人也是第一次使用,代码中的写法都是网上查的,这并不是本文的重点,有兴趣的仔细百度吧。

4)SpringBoot

以上三个组件,全部以SpringBoot做为脚手架开发。

5)代码构建

Maven。

2、数据库设计

我们只简单用到一张用户表,比较简单直接贴脚本:

CREATE TABLE `sys_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',`user_name` varchar(64) DEFAULT NULL COMMENT '用户名:登陆账号',`pass_word` varchar(128) DEFAULT NULL COMMENT '密码',`name` varchar(16) DEFAULT NULL COMMENT '昵称',`sex` char(1) DEFAULT NULL COMMENT '性别:1-男,2女',`status` bit(1) DEFAULT NULL COMMENT '用户状态:1-有效,0-无效',`online` bit(1) DEFAULT NULL COMMENT '在线状态:1-在线,0-离线',`salt` varchar(128) DEFAULT NULL COMMENT '密码盐值',`admin` bit(1) DEFAULT NULL COMMENT '是否管理员(只有管理员才能登录Web端):1-是,0-否',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

这张表都在什么时候用到?

1)Web管理端登陆的时候;

2)聊天客户端将登陆请求发送到聊天服务端时,聊天服务端进行用户认证;

3)聊天客户端的好友列表加载。

3、通信设计

本节将会是本文的核心内容之一,主要描述通信报文协议格式、以及通信报文的交互场景。

1)报文协议格式

下面这张图应该能说明99%了:

剩下的1%在这里说:

a)粘包问题,TCP长连接中,粘包是第一个需要解决的问题。通俗的讲,粘包的意思是消息接收方往往收到的不是“整个”报文,有时候比“整个”多一点,有时候比“整个”少一点,这样就导致接收方无法解析这个报文。那么上图中的头8个字节就为了解决这个问题,接收方根据头8个字节标识的长度来获取到“整个”报文,从而进行正常的业务处理;

b)2字节报文类型,为了方便解析报文而设计。根据这两个字节将后面的json转成相应的实体以便进行后续处理;

c)变长报文体实际上就是json格式的串,当然,你可以自己设计报文格式,我这里为了方便处理就直接放json了;

d)当然,你可以把报文设计的更复杂、更专业,比如加密、加签名等。

2)报文交互场景

a)登陆

b)发送消息-成功

c)发送消息-目标客户端不在线

d)发送消息-目标客户端在线,但消息转发失败

五、编码实现

前面说了那么多,现在总得说点有用的。

1、先说说Netty

Netty是一个相当优秀的通信框架,大多数的顶级开源框架中都有Netty的身影。具体它有多么优秀,建议大家自行百度,我不如百度说的好。我只从应用方面说说Netty。应用过程中,它最核心的东西叫handler,我们可以简单理解它为消息处理器。收到的消息和出去的消息都会经过一系列的handler加工处理。收到的消息我们叫它入站消息,发出去的消息我们叫它出站消息,因此handler又分为出站handler和入站handler。收到的消息只会被入站handler处理,发出去的消息只会被出站handler处理。

举个例子,我们从网络上收到的消息是二进制的字节码,我们的目标是将消息转换成java bean,这样方便我们程序处理,针对这个场景我设计这么几个入站handler:

1)将字节转换成String的handler;

2)将String转成java bean的handler;

3)对java bean进行业务处理的handler。

发出去的消息呢,我设计这么几个出站handler:

1)java bean 转成String的handler;

2)String转成byte的handler。

以上是关于handler的说明。

接下来再说一下Netty的异步。异步的意思是当你做完一个操作后,不会立马得到操作结果,而是有结果后Netty会通知你。通过下面的一段代码来说明:

channel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {if (future.isSuccess()){logger.info("消息发送成功:{}",sendMsgRequest);}else {logger.info("消息发送失败:{}",sendMsgRequest);}}});

上面的writeAndFlush操作无法立即返回结果,如果你关注结果,那么为他添加一个listener,有结果后会在listener中响应。

到这里,百度上搜到的Netty相关的代码你基本就能看懂了。

2、聊天服务端

首先看主入口的代码

public void start(){EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//心跳ch.pipeline().addLast(new IdleStateHandler(25, 20, 0, TimeUnit.SECONDS));//收整包ch.pipeline().addLast(new StringLengthFieldDecoder());//转字符串ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));//json转对象ch.pipeline().addLast(new JsonDecoder());//心跳ch.pipeline().addLast(new HeartBeatHandler());//实体转jsonch.pipeline().addLast(new JsonEncoder());//消息处理ch.pipeline().addLast(bussMessageHandler);}});try {ChannelFuture f = serverBootstrap.bind(port).sync();f.channel().closeFuture().sync();}catch (InterruptedException e) {logger.error("服务启动失败:{}", ExceptionUtils.getStackTrace(e));}finally {worker.shutdownGracefully();boss.shutdownGracefully();}}

代码中除了initChannel方法中的代码,其他代码都是固定写法。那么什么叫固定写法呢?通俗来讲就是可以Ctrl+c、Ctrl+v。

下面我们着重看initChannel方法里面的代码。这里面就是上面讲到的各种handler,我们下面挨个讲这些handler都是干啥的。

1)IdleStateHandler。这个是Netty内置的一个handler,既是出站handler又是入站handler。它的作用一般是用来实现心跳监测。所谓心跳,就是客户端和服务端建立连接后,服务端要实时监控客户端的健康状态,如果客户端挂了或者hung住了,服务端及时释放相应的资源,以及做出其他处理比如通知运维。所以在我们的场景中,客户端需要定时上报自己的心跳,如果服务端检测到一段时间内没收到客户端上报的心跳,那么及时做出处理,我们这里就是简单的将其连接断开,并修改数据库中相应账户的在线状态。

现在开始说IdleStateHandler,第一个参数叫读超时时间,第二个参数叫写超时时间,第三个参数叫读写超时时间,第四个参数时时间单位秒。这个handler表达的意思是当25秒内没读到客户端的消息,或者20秒内没往客户端发消息,就会产生一个超时事件。那么这个超时事件我们该对他做什么处理呢,请看下一条。

2)HeartBeatHandler。结合a)一起看,当发生超时事件时,HeartBeatHandler会收到这个事件,并对它做出处理:第一将链接断开;第二讲数据库中相应的账户更新为不在线状态。

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {private static Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent)evt;if (event.state() == IdleState.READER_IDLE) {//读超时,应将连接断掉InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();String ip = socketAddress.getAddress().getHostAddress();ctx.channel().disconnect();logger.info("【{}】连接超时,断开",ip);String userName = SessionManager.removeSession(ctx.channel());SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);}else {super.userEventTriggered(ctx, evt);}}else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HeartBeat){//收到心跳包,不处理logger.info("server收到心跳包:{}",msg);return;}super.channelRead(ctx, msg);}
}

3)StringLengthFieldDecoder。这是个入站handler,他的作用就是解决上面提到的粘包问题:

public class StringLengthFieldDecoder extends LengthFieldBasedFrameDecoder {public StringLengthFieldDecoder() {super(10*1024*1024,0,8,0,8);}@Overrideprotected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {buf = buf.order(order);byte[] lenByte = new byte[length];buf.getBytes(offset, lenByte);String lenStr = new String(lenByte);Long len =  Long.valueOf(lenStr);return len;}
}

只需要集成Netty提供的LengthFieldBasedFrameDecoder 类,并重写getUnadjustedFrameLength方法即可。

首先看构造方法中的5个参数。第一个表示能处理的包的最大长度;第二三个参数应该结合起来理解,表示长度字段从第几位开始,长度的长度是多少,也就是上面报文格式协议中的头8个字节;第四个参数表示长度是否需要校正,举例理解,比如头8个字节解析出来的长度=包体长度+头8个字节的长度,那么这里就需要校正8个字节,我们的协议中长度只包含报文体,因此这个参数填0;最后一个参数,表示接收到的报文是否要跳过一些字节,本例中设置为8,表示跳过头8个字节,因此经过这个handler后,我们收到的数据就只有报文本身了,不再包含8个长度字节了。

再看getUnadjustedFrameLength方法,其实就是将头8个字符串型的长度为转换成long型。重写完这个方法后,Netty就知道如何收一个“完整”的数据包了。

4)StringDecoder。这个是Netty自带的入站handler,会将字节流以指定的编码解析成String。

5)JsonDecoder。是我们自定义的一个入站handler,目的是将json String转换成java bean,以方便后续处理:

public class JsonDecoder extends MessageToMessageDecoder<String> {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, String o, List<Object> list) throws Exception {Message msg = MessageEnDeCoder.decode(o);list.add(msg);}}

这里会调用我们自定义的一个编解码帮助类进行转换:

public static Message decode(String message){if (StringUtils.isEmpty(message) || message.length() < 2){return null;}String type = message.substring(0,2);message = message.substring(2);if (type.equals(LoginRequest)){return JsonUtil.jsonToObject(message,LoginRequest.class);}else if (type.equals(LoginResponse)){return JsonUtil.jsonToObject(message,LoginResponse.class);}else if (type.equals(LogoutRequest)){return JsonUtil.jsonToObject(message,LogoutRequest.class);}else if (type.equals(LogoutResponse)){return JsonUtil.jsonToObject(message,LogoutResponse.class);}else if (type.equals(SendMsgRequest)){return JsonUtil.jsonToObject(message,SendMsgRequest.class);}else if (type.equals(SendMsgResponse)){return JsonUtil.jsonToObject(message,SendMsgResponse.class);}else if (type.equals(HeartBeat)){return JsonUtil.jsonToObject(message,HeartBeat.class);}return null;}

6)BussMessageHandler。先看这个入站handler,是我们的一个业务处理主入口,他的主要工作就是将消息分发给线程池去处理,另外还负载一个小场景,当客户端主动断开时,需要将相应的账户数据库中状态更新为不在线。

public class BussMessageHandler extends ChannelInboundHandlerAdapter {private static Logger logger = LoggerFactory.getLogger(BussMessageHandler.class);@Autowiredprivate TaskDispatcher taskDispatcher;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("收到消息:{}",msg);if (msg instanceof Message){taskDispatcher.submit(ctx.channel(),(Message)msg);}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {//客户端连接断开InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();String ip = socketAddress.getAddress().getHostAddress();logger.info("客户端断开:{}",ip);String userName = SessionManager.removeSession(ctx.channel());SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);super.channelInactive(ctx);}
}

接下来还差线程池的处理逻辑,也非常简单,就是将任务封装成executor然后交给线程池处理:

public class TaskDispatcher {private ThreadPoolExecutor threadPool;public TaskDispatcher(){int corePoolSize = 15;int maxPoolSize = 50;int keepAliveSeconds = 30;int queueCapacity = 1024;BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueCapacity);this.threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS,queue);}public void submit(Channel channel, Message msg){ExecutorBase executor = null;String messageType = msg.getMessageType();if (messageType.equals(MessageEnDeCoder.LoginRequest)){executor = new LoginExecutor(channel,msg);}if (messageType.equalsIgnoreCase(MessageEnDeCoder.SendMsgRequest)){executor = new SendMsgExecutor(channel,msg);}if (executor != null){this.threadPool.submit(executor);}}
}

接下来看一下消息转发executor是怎么做的:

public class SendMsgExecutor extends ExecutorBase {private static Logger logger = LoggerFactory.getLogger(SendMsgExecutor.class);public SendMsgExecutor(Channel channel, Message message) {super(channel, message);}@Overridepublic void run() {SendMsgResponse response = new SendMsgResponse();response.setMessageType(MessageEnDeCoder.SendMsgResponse);response.setTime(new Date());SendMsgRequest request = (SendMsgRequest)message;String recvUserName = request.getRecvUserName();String sendContent = request.getSendMessage();Channel recvChannel = SessionManager.getSession(recvUserName);if (recvChannel != null){SendMsgRequest sendMsgRequest = new SendMsgRequest();sendMsgRequest.setTime(new Date());sendMsgRequest.setMessageType(MessageEnDeCoder.SendMsgRequest);sendMsgRequest.setRecvUserName(recvUserName);sendMsgRequest.setSendMessage(sendContent);sendMsgRequest.setSendUserName(request.getSendUserName());recvChannel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {if (future.isSuccess()){logger.info("消息转发成功:{}",sendMsgRequest);response.setResultCode("0000");response.setResultMessage(String.format("发给用户[%s]消息成功",recvUserName));channel.writeAndFlush(response);}else {logger.error(ExceptionUtils.getStackTrace(future.cause()));logger.info("消息转发失败:{}",sendMsgRequest);response.setResultCode("9999");response.setResultMessage(String.format("发给用户[%s]消息失败",recvUserName));channel.writeAndFlush(response);}}});}else {logger.info("用户{}不在线,消息转发失败",recvUserName);response.setResultCode("9999");response.setResultMessage(String.format("用户[%s]不在线",recvUserName));channel.writeAndFlush(response);}}
}

整体逻辑:一获取要把消息发给那个账号;二获取该账号对应的连接;三在此连接上发送消息;四获取消息发送结果,将结果发给消息“发起者”。

下面是登录处理的executor:

public class LoginExecutor extends ExecutorBase {private static Logger logger = LoggerFactory.getLogger(LoginExecutor.class);public LoginExecutor(Channel channel, Message message) {super(channel, message);}@Overridepublic void run() {LoginRequest request = (LoginRequest)message;String userName = request.getUserName();String password = request.getPassword();UserService userService = SpringContextUtil.getBean(UserService.class);boolean check = userService.checkLogin(userName,password);LoginResponse response = new LoginResponse();response.setUserName(userName);response.setMessageType(MessageEnDeCoder.LoginResponse);response.setTime(new Date());response.setResultCode(check?"0000":"9999");response.setResultMessage(check?"登陆成功":"登陆失败,用户名或密码错");if (check){userService.updateOnlineStatus(userName,Boolean.TRUE);SessionManager.addSession(userName,channel);}channel.writeAndFlush(response).addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {//登陆失败,断开连接if (!check){logger.info("用户{}登陆失败,断开连接",((LoginRequest) message).getUserName());channel.disconnect();}}});}
}

登陆逻辑也不复杂,登陆成功则更新用户在线状态,并且无论登陆成功还是失败,都会返一个登陆应答。同时,如果登陆校验失败,在返回应答成功后,需要将链接断开。

7)JsonEncoder。最后看这个唯一的出站handler,服务端发出去的消息都会被出站handler处理,他的职责就是将java bean转成我们之前定义的报文协议格式:

public class JsonEncoder extends MessageToByteEncoder<Message> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {String msgStr = MessageEnDeCoder.encode(message);int length = msgStr.getBytes(Charset.forName("UTF-8")).length;String str = String.valueOf(length);String lenStr = StringUtils.leftPad(str,8,'0');msgStr = lenStr + msgStr;byteBuf.writeBytes(msgStr.getBytes("UTF-8"));}
}

8)SessionManager。剩下最后一个东西没说,这个是用来保存每个登陆成功账户的链接的,底层是个map,key为用户账户,value为链接:

public class SessionManager {private static ConcurrentHashMap<String,Channel> sessionMap = new ConcurrentHashMap<>();public static void addSession(String userName,Channel channel){sessionMap.put(userName,channel);}public static String removeSession(String userName){sessionMap.remove(userName);return userName;}public static String removeSession(Channel channel){for (String key:sessionMap.keySet()){if (channel.id().asLongText().equalsIgnoreCase(sessionMap.get(key).id().asLongText())){sessionMap.remove(key);return key;}}return null;}public static Channel getSession(String userName){return sessionMap.get(userName);}
}

到这里,整个服务端的逻辑就走完了,是不是,很简单呢!

3、聊天客户端

客户端中界面相关的东西是基于JavaFX框架做的,这个我是第一次用,所以不打算讲这块,怕误导大家。主要还是讲Netty作为客户端是如何跟服务端通信的。

按照惯例,还是先贴出主入口:

public void login(String userName,String password) throws Exception {Bootstrap clientBootstrap = new Bootstrap();EventLoopGroup clientGroup = new NioEventLoopGroup();try {clientBootstrap.group(clientGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(20, 15, 0, TimeUnit.SECONDS));ch.pipeline().addLast(new StringLengthFieldDecoder());ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new JsonDecoder());ch.pipeline().addLast(new JsonEncoder());ch.pipeline().addLast(bussMessageHandler);ch.pipeline().addLast(new HeartBeatHandler());}});ChannelFuture future = clientBootstrap.connect(server,port).sync();if (future.isSuccess()){channel = (SocketChannel)future.channel();LoginRequest request = new LoginRequest();request.setTime(new Date());request.setUserName(userName);request.setPassword(password);request.setMessageType(MessageEnDeCoder.LoginRequest);channel.writeAndFlush(request).addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {if (future.isSuccess()){logger.info("登陆消息发送成功");}else {logger.info("登陆消息发送失败:{}", ExceptionUtils.getStackTrace(future.cause()));Platform.runLater(new Runnable() {@Overridepublic void run() {LoginController.setLoginResult("网络错误,登陆消息发送失败");}});}}});}else {clientGroup.shutdownGracefully();throw new RuntimeException("网络错误");}}catch (Exception e){clientGroup.shutdownGracefully();throw new RuntimeException("网络错误");}}

对这段代码,我们主要关注这几点:一所有handler的初始化;二connect服务端。

所有handler中,除了bussMessageHandler是客户端特有的外,其他的handler在服务端章节已经讲过了,不再赘述。

1)先看连接服务端的操作。首先发起连接,连接成功后发送登陆报文。发起连接需要对成功和失败进行处理。发送登陆报文也需要对成功和失败进行处理。注意,这里的成功失败只是代表当前操作的网络层面的成功失败,这时候并不能获取服务端返回的应答中的业务层面的成功失败,如果不理解这句话,可以翻看前面讲过的“异步”相关内容。

2)BussMessageHandler。整体流程还是跟服务端一样,将受到的消息扔给线程池处理,我们直接看处理消息的各个executor。

先看客户端发出登陆请求后,收到登陆应答消息后是怎么处理的(这段代码可以结合1)的内容一起理解):

public class LoginRespExecutor extends ExecutorBase {private static Logger logger = LoggerFactory.getLogger(LoginRespExecutor.class);public LoginRespExecutor(Channel channel, Message message) {super(channel, message);}@Overridepublic void run() {LoginResponse response = (LoginResponse)message;logger.info("登陆结果:{}->{}",response.getResultCode(),response.getResultMessage());if (!response.getResultCode().equals("0000")){Platform.runLater(new Runnable() {@Overridepublic void run() {LoginController.setLoginResult("登陆失败,用户名或密码错误");}});}else {LoginController.setCurUserName(response.getUserName());ClientApplication.getScene().setRoot(SpringContextUtil.getBean(MainView.class).getView());}}
}

接下来看客户端是怎么发聊天信息的:

public void sendMessage(Message message) {channel.writeAndFlush(message).addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {SendMsgRequest send = (SendMsgRequest)message;if (future.isSuccess()){Platform.runLater(new Runnable() {@Overridepublic void run() {MainController.setMessageHistory(String.format("[我]在[%s]发给[%s]的消息[%s],发送成功",DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));}});}else {Platform.runLater(new Runnable() {@Overridepublic void run() {MainController.setMessageHistory(String.format("[我]在[%s]发给[%s]的消息[%s],发送失败",DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));}});}}});}

实际上,到这里通信相关的代码已经贴完了。剩下的都是界面处理相关的代码,不再贴了。

客户端,是不是,非常简单!

4、Web管理端

Web管理端可以说是更没任何技术含量,就是Shiro登陆认证、列表增删改查。增删改没什么好说的,下面重点说一下Shiro登陆和列表查询。

1)Shiro登陆

首先定义一个Realm,至于这是什么概念,自行百度吧,这里并不是本文重点:

public class UserDbRealm extends AuthorizingRealm {@Overrideprotected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {return null;}@Overrideprotected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {RequestAttributes attributes = RequestContextHolder.getRequestAttributes();UsernamePasswordToken upToken = (UsernamePasswordToken) authenticationToken;String username = upToken.getUsername();String password = "";if (upToken.getPassword() != null){password = new String(upToken.getPassword());}// TODO: 2021/5/13 校验用户名密码,不通过则抛认证异常即可 ShiroUser user = new ShiroUser();SimpleAuthenticationInfo info = new SimpleAuthenticationInfo(user, password, getName());return info;}
}

接下来把这个Realm注册成Spring Bean,同时定义过滤链:

@Beanpublic Realm realm() {UserDbRealm realm = new UserDbRealm();realm.setAuthorizationCachingEnabled(true);realm.setCacheManager(cacheManager());return realm;}@Beanpublic ShiroFilterChainDefinition shiroFilterChainDefinition() {DefaultShiroFilterChainDefinition chainDefinition = new DefaultShiroFilterChainDefinition();chainDefinition.addPathDefinition("/css/**", "anon");chainDefinition.addPathDefinition("/img/**", "anon");chainDefinition.addPathDefinition("/js/**", "anon");chainDefinition.addPathDefinition("/logout", "logout");chainDefinition.addPathDefinition("/login", "anon");chainDefinition.addPathDefinition("/captchaImage", "anon");chainDefinition.addPathDefinition("/**", "authc");return chainDefinition;}

到现在为止,Shiro配置好了,下面看如何调起登陆:

@PostMapping("/login")@ResponseBodypublic Result<String> login(String username, String password, Boolean rememberMe){Result<String> ret = new Result<>();UsernamePasswordToken token = new UsernamePasswordToken(username, password);Subject subject = SecurityUtils.getSubject();try{subject.login(token);return ret;}catch (AuthenticationException e){String msg = "用户或密码错误";if (StringUtils.isNotEmpty(e.getMessage())){msg = e.getMessage();}ret.setCode(Result.FAIL);ret.setMessage(msg);return ret;}}

登陆代码就这么愉快的完成了。

2)列表查询

查是个很简单的操作,但是却是所有web系统中使用最频繁的操作。因此,做一个通用性的封装,非常有必要。以下代码不做过多讲解,初级工程师到高级工程师,就差这段代码了(手动捂脸):

a)Controller

@RequestMapping("/query")@ResponseBodypublic Result<Page<User>> query(@RequestParam Map<String,Object> params, String sort, String order, Integer pageIndex, Integer pageSize){Page<User> page = userService.query(params,sort,order,pageIndex,pageSize);Result<Page<User>> ret = new Result<>();ret.setData(page);return ret;}

b)Service

@Autowiredprivate UserDao userDao;@Autowiredprivate QueryService queryService;public Page<User> query(Map<String,Object> params, String sort, String order, Integer pageIndex, Integer pageSize){return queryService.query(userDao,params,sort,order,pageIndex,pageSize);}
public class QueryService {public <T> com.easy.okim.common.model.Page<T> query(JpaSpecificationExecutor<T> dao, Map<String,Object> filters, String sort, String order, Integer pageIndex, Integer pageSize){com.easy.okim.common.model.Page<T> ret = new com.easy.okim.common.model.Page<T>();Map<String,Object> params = new HashMap<>();if (filters != null){filters.remove("sort");filters.remove("order");filters.remove("pageIndex");filters.remove("pageSize");for (String key:filters.keySet()){Object value = filters.get(key);if (value != null && StringUtils.isNotEmpty(value.toString())){params.put(key,value);}}}Pageable pageable = null;pageIndex = pageIndex - 1;if (StringUtils.isEmpty(sort)){pageable = PageRequest.of(pageIndex,pageSize);}else {Sort s = Sort.by(Sort.Direction.ASC,sort);if (StringUtils.isNotEmpty(order) && order.equalsIgnoreCase("desc")){s = Sort.by(Sort.Direction.DESC,sort);}pageable = PageRequest.of(pageIndex,pageSize,s);}Page<T> page = null;if (params.size() ==0){page = dao.findAll(null,pageable);}else {Specification<T> specification = new Specification<T>() {@Overridepublic Predicate toPredicate(Root<T> root, CriteriaQuery<?> criteriaQuery, CriteriaBuilder builder) {List<Predicate> predicates = new ArrayList<>();for (String filter : params.keySet()) {Object value = params.get(filter);if (value == null || StringUtils.isEmpty(value.toString())) {continue;}String field = filter;String operator = "=";String[] arr = filter.split("\\|");if (arr.length == 2) {field = arr[0];operator = arr[1];}if (arr.length == 3) {field = arr[0];operator = arr[1];String type = arr[2];if (type.equalsIgnoreCase("boolean")){value = Boolean.parseBoolean(value.toString());}else if (type.equalsIgnoreCase("integer")){value = Integer.parseInt(value.toString());}else if (type.equalsIgnoreCase("long")){value = Long.parseLong(value.toString());}}String[] names = StringUtils.split(field, ".");Path expression = root.get(names[0]);for (int i = 1; i < names.length; i++) {expression = expression.get(names[i]);}// logic operatorswitch (operator) {case "=":predicates.add(builder.equal(expression, value));break;case "!=":predicates.add(builder.notEqual(expression, value));break;case "like":predicates.add(builder.like(expression, "%" + value + "%"));break;case ">":predicates.add(builder.greaterThan(expression, (Comparable) value));break;case "<":predicates.add(builder.lessThan(expression, (Comparable) value));break;case ">=":predicates.add(builder.greaterThanOrEqualTo(expression, (Comparable) value));break;case "<=":predicates.add(builder.lessThanOrEqualTo(expression, (Comparable) value));break;case "isnull":predicates.add(builder.isNull(expression));break;case "isnotnull":predicates.add(builder.isNotNull(expression));break;case "in":CriteriaBuilder.In in = builder.in(expression);String[] arr1 = StringUtils.split(filter.toString(), ",");for (String e : arr1) {in.value(e);}predicates.add(in);break;}}// 将所有条件用 and 联合起来if (!predicates.isEmpty()) {return builder.and(predicates.toArray(new Predicate[predicates.size()]));}return builder.conjunction();}};page = dao.findAll(specification,pageable);}ret.setTotal(page.getTotalElements());ret.setRows(page.getContent());return ret;}
}

c)Dao

public interface UserDao extends JpaRepository<User,Long>,JpaSpecificationExecutor<User> {//啥都不用写,继承Spring Data Jpa提供的类就行了
}

五、结语

本文确实都是实实在在的干货,希望本文能对大家有一些帮助,源代码工程不打算贴了,希望你能跟着文章自己手敲一遍。

开头说的收款二维码,只是说笑,如果你真想付款,请私信我索取收款二维码,金额不设上限的,哈哈~

Java实现即时聊天:聊天服务器+聊天客户端+Web管理控制台相关推荐

  1. 实战即时聊天,一文说明白:聊天服务器+聊天客户端+Web管理控制台。

    目录 一.前言 二.最终效果 1.聊天服务器 2.聊天客户端 3.Web管理控制台 三.需求分析 四.概要设计 1.技术选型 1)聊天服务端 2)Web管理控制台 3)聊天客户端 4)SpringBo ...

  2. java c s聊天程序_Java建立C/S 模式聊天室服务器和客户端

    时间:2018-12-01 概述:聊天室 服务器 客户端 在网络上经常进各种聊天室,本例通过编程实现了C/S 模式的聊天室服务器和客户端.实现方法:ChatServer 类实现了一个聊天室服务器端, ...

  3. 关于项目 java版本QQ (含服务器和客户端)

    下面概要讲述一下我在设计完成服务器模块和设计客户端后台中遇到的问题及解决方案. 服务器: 1.服务器使用什么机制,是线程还是进程? 2.数据库如何设计能使服务器访问的效率提高? 3.如何处理大量用户同 ...

  4. 【Java 网络编程】UDP 服务器 与 客户端持续交互 案例

    文章目录 I UDP 交互原理 II UDP 服务器端代码示例 III UDP 客户端代码示例 IV 服务器 客户端 运行结果 I UDP 交互原理 1. UDP 单播传输流程 : A 给 B 发送数 ...

  5. java上传音频到服务器_Java 客户端向服务端上传mp3文件数据的实例代码

    客户端: package cn.itcast.uploadpicture.demo; import java.io.BufferedInputStream; import java.io.FileIn ...

  6. Python -- 服务器与客户端,发送邮箱与短信

    主要内容:建立简易聊天室服务器及客户端.采用UDP协议发送及接受图片,采用smtplib发送邮件,采用urllib发送短信 建立简易聊天室 服务器 from socket import socket ...

  7. JAVA之socket编程服务器与客户端通信--实现简易聊天室

    本文将介绍TCP和UDP两种服务器与客户端之间的通信协议 1.首先介绍TCP和UDP分别是什么:TCP(Transfer Control Protocol) 是传输控制协议的缩写,被称 TCP / I ...

  8. java客户端服务器聊天程序流程图_基于java的socket简单聊天编程

    socket编程: 一:什么是socket:socket是BSD UNIX的通信机制,通常称为"套接字",其英文原意是"孔"或"插座".有些 ...

  9. Java丨即时聊天程序的实现

    继续昨天的代码丨多客户端通信实现 想要建立多客户端即时通信,必须具有这几个条件(类): 1.客户端: 2.服务器: 3.消息类(数据封包): 4.消息类型. Demo; package Task.de ...

最新文章

  1. 概念艺术绘画学习教程 Schoolism – Foolproof Concept Painting with Airi Pan
  2. Java Socket编程 - 基于TCP方式的二进制文件传输
  3. zentao双机(数据库备份)
  4. pycharm 调试错误 Connection to Python debugger failed: Socket operation on nonsocket: configureBlocking
  5. 用命令行连接到远程计算机
  6. swagger restful api form映射实体对象和body映射实体对象配置
  7. 物流链云平台云ROS——看得见的成本节约
  8. oracle对大对象类型操作:blob,clob,nclob,bfile
  9. eclipse 中 构建路径下的 order and export 是干什么用
  10. axure rp 使用心得
  11. 《Xcode实战开发》——1.1节下载
  12. 【SQL精彩语句】按某一字段分组取最大(小)值所在行的数据
  13. Unix/Linux/BSD命令大全|实用指南
  14. 【总结】学堂云慕课-如何写好科研论文
  15. pci和pcie的区别
  16. kvm连接服务器显示不全有重影,KVM多电脑切换器常见故障排查及处理方法
  17. Navigator对象,获取浏览器类型userAgent,机器类型platform
  18. 中山LED芯片IC方案!JLC1041, JLK105系列两款超实用
  19. 小程序picker三级联动
  20. ADAS需要用到的技术

热门文章

  1. 移植jerryscript到单片机(bes2600yp)
  2. 日紫白飞星算法_陈春林|紫白飞星排布技巧
  3. pyinstall 打包报错
  4. 关于Mask R-CNN 画PR曲线
  5. iOS 闪退与内存管理
  6. Android HCE开发
  7. layui表格列动态显示或隐藏
  8. 编写C语言函数求字符串长度,用C语言编写函数,实现strlen计算字符串长度的功能...
  9. 360浏览器怎么截图?
  10. dellg3计算机rom,戴尔G3笔记本内存升级 | 笔记本内存升级方法_什么值得买