服务端

1.常规的spring工程集成mina时,pom.xml中需要加入如下配置:

    <dependency><groupId>org.slf4j</groupId><artifactId>slf4j-jdk14</artifactId><version>1.7.7</version></dependency><dependency><groupId>org.apache.mina</groupId><artifactId>mina-integration-beans</artifactId><version>2.0.13</version></dependency><dependency><groupId>org.apache.mina</groupId><artifactId>mina-core</artifactId><version>2.0.13</version><type>bundle</type>  <scope>compile</scope></dependency><dependency><groupId>org.apache.mina</groupId><artifactId>mina-integration-spring</artifactId><version>1.1.7</version></dependency>

注意此处mina-core的配置写法。如果工程中引入上述依赖之后报错:Missing artifact xxx bundle,则需要在pom.xml的plugins之间加入如下插件配置:

    <plugin><groupId>org.apache.felix</groupId><artifactId>maven-bundle-plugin</artifactId><extensions>true</extensions></plugin>

2.Filter1:编解码器,实现ProtocolCodecFactory解码工厂

package com.he.server;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineDecoder;
import org.apache.mina.filter.codec.textline.TextLineEncoder;
public class MyCodeFactory implements ProtocolCodecFactory {private final TextLineEncoder encoder;private final TextLineDecoder decoder;public MyCodeFactory() {this(Charset.forName("utf-8"));}public MyCodeFactory(Charset charset) {encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);}public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {// TODO Auto-generated method stubreturn decoder;}public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {// TODO Auto-generated method stubreturn encoder;}public int getEncoderMaxLineLength() {return encoder.getMaxLineLength();}public void setEncoderMaxLineLength(int maxLineLength) {encoder.setMaxLineLength(maxLineLength);}public int getDecoderMaxLineLength() {return decoder.getMaxLineLength();}public void setDecoderMaxLineLength(int maxLineLength) {decoder.setMaxLineLength(maxLineLength);}
}

此处使用了mina自带的TextLineEncoder编解码器,此解码器支持使用固定长度或者固定分隔符来区分上下两条消息。如果要使用自定义协议,则需要自己编写解码器。要使用websocket,也需要重新编写解码器,关于mina结合websocket,jira上有一个开源项目https://issues.apache.org/jira/browse/DIRMINA-907,专门为mina编写了支持websocket的编解码器,亲测可用。。。此部分不是本文重点,略。

3.Filter2:心跳工厂,加入心跳检测功能需要实现KeepAliveMessageFactory:

package com.he.server;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
public class MyKeepAliveMessageFactory implements  KeepAliveMessageFactory{private final Logger LOG = Logger.getLogger(MyKeepAliveMessageFactory.class);/** 心跳包内容 */  private static final String HEARTBEATREQUEST = "1111";  private static final String HEARTBEATRESPONSE = "1112"; public Object getRequest(IoSession session) {LOG.warn("请求预设信息: " + HEARTBEATREQUEST);  return HEARTBEATREQUEST;}public Object getResponse(IoSession session, Object request) {LOG.warn("响应预设信息: " + HEARTBEATRESPONSE);  /** 返回预设语句 */  return HEARTBEATRESPONSE;  }public boolean isRequest(IoSession session, Object message) {LOG.warn("请求心跳包信息: " + message);  if (message.equals(HEARTBEATREQUEST))  return true;  return false;  }public boolean isResponse(IoSession session, Object message) {LOG.warn("响应心跳包信息: " + message);  if(message.equals(HEARTBEATRESPONSE))  return true;return false;}
}

此处我设置服务端发送的心跳包是1111,客户端应该返回1112.

4.实现必不可少的IoHandlerAdapter,得到监听事件处理权:

package com.he.server;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class MyHandler extends IoHandlerAdapter {//private final int IDLE = 3000;//(单位s)private final Logger LOG = Logger.getLogger(MyHandler.class);// public static Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {session.closeOnFlush();LOG.warn("session occured exception, so close it." + cause.getMessage());}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {String str = message.toString();LOG.warn("客户端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress() + "连接成功!");session.setAttribute("type", message);String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();session.setAttribute("ip", remoteAddress);LOG.warn("服务器收到的消息是:" + str);session.write("welcome by he");}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {LOG.warn("messageSent:" + message);}@Overridepublic void sessionCreated(IoSession session) throws Exception {LOG.warn("remote client [" + session.getRemoteAddress().toString() + "] connected.");// myLong time = System.currentTimeMillis();session.setAttribute("id", time);sessionsConcurrentHashMap.put(time, session);}@Overridepublic void sessionClosed(IoSession session) throws Exception {LOG.warn("sessionClosed.");session.closeOnFlush();// mysessionsConcurrentHashMap.remove(session.getAttribute("id"));}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {LOG.warn("session idle, so disconnecting......");session.closeOnFlush();LOG.warn("disconnected.");}@Overridepublic void sessionOpened(IoSession session) throws Exception {LOG.warn("sessionOpened.");//  //session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDLE);}
}

此处有几点说明:
第一点:网上教程会在此处(sessionOpened()方法中)设置IDLE,IDLE表示session经过多久判定为空闲的时间,单位s,上述代码中已经注释掉了,因为后面在spring配置中加入心跳检测部分时会进行IDLE的配置,已经不需要在此处进行配置了,而且如果在心跳配置部分和此处都对BOTH_IDLE模式设置了空闲时间,亲测发现此处配置不生效。
第二点:关于存放session的容器,建议使用

public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();

而不是用已经注释掉的Collections.synchronizedSet类型的set或者map,至于原因,java5中新增了ConcurrentMap接口和它的一个实现类ConcurrentHashMap,可以保证线程的足够安全。详细的知识你应该搜索SynchronizedMap和ConcurrentHashMap的区别,学习更加多的并发安全知识。
上述代码中,每次在收到客户端的消息时,我会返回一段文本:welcome by he。
有了map,主动推送就不是问题了,想推给谁,在map中找到谁就可以了。

5.完成spring的配置工作

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"><bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"><property name="customEditors"><map><entry key="java.net.SocketAddress"value="org.apache.mina.integration.beans.InetSocketAddressEditor"></entry></map></property></bean><bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor"init-method="bind" destroy-method="unbind"><!--端口号--><property name="defaultLocalAddress" value=":8888" /><!--绑定自己实现的handler--><property name="handler" ref="serverHandler" /><!--声明过滤器的集合--><property name="filterChainBuilder" ref="filterChainBuilder" /><property name="reuseAddress" value="true" /></bean><bean id="filterChainBuilder"class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder"><property name="filters"><map><!--mina自带的线程池filter--><entry key="executor" value-ref="executorFilter" /><entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" /><!--自己实现的编解码器filter--><entry key="codecFilter" value-ref="codecFilter" /><!--日志的filter--><entry key="loggingFilter" value-ref="loggingFilter" /><!--心跳filter--><entry key="keepAliveFilter" value-ref="keepAliveFilter" /></map></property></bean><!-- executorFilter多线程处理 -->  <bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" /><bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter"><constructor-arg value="remoteAddress" /></bean><!--日志--><bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" /><!--编解码--><bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter"><constructor-arg><!--构造函数的参数传入自己实现的对象--><bean class="com.he.server.MyCodeFactory"></bean></constructor-arg></bean><!--心跳检测filter--><bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter"><!--构造函数的第一个参数传入自己实现的工厂--><constructor-arg><bean class="com.he.server.MyKeepAliveMessageFactory"></bean></constructor-arg><!--第二个参数需要的是IdleStatus对象,value值设置为读写空闲--><constructor-arg type = "org.apache.mina.core.session.IdleStatus" value="BOTH_IDLE" ></constructor-arg><!--心跳频率,不设置则默认60s  --><property name="requestInterval" value="5" /><!--心跳超时时间,不设置则默认30s    --><property name="requestTimeout" value="10" /><!--不设置默认false--><property name="forwardEvent" value="true" /></bean><!--自己实现的handler--><bean id="serverHandler" class="com.he.server.MyHandler" />
</beans>

好了,xml中已经写了足够多的注释了。说明一下关于心跳检测中的最后一个属性:forwardEvent,默认false,比如在心跳频率为5s时,实际上每5s会触发一次KeepAliveFilter中的session_idle事件,该事件中开始发送心跳包。当此参数设置为false时,对于session_idle事件不再传递给其他filter,如果设置为true,则会传递给其他filter,例如handler中的session_idle事件,此时也会被触发。另外IdleStatus一共有三个值,点击进源码就能看到。

6.写main方法启动服务端

package com.he.server;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MainTest {public static void main(String[] args) {ClassPathXmlApplicationContext ct = new ClassPathXmlApplicationContext("applicationContext.xml");}
}

run之后,端口就已经开始监听了。此处,如果是web工程,使用tomcat之类的容器,只要在web.xml中配置了

    <context-param><param-name>contextConfigLocation</param-name><param-value>/WEB-INF/classes/applicationContext.xml</param-value></context-param><listener><listener-class>org.springframework.web.context.ContextLoaderListener</listener-class></listener>

则容器在启动时就会加载spring的配置文件,端口的监听就开始了,这样就不需要main方法来启动。

客户端,本文采用两种方式来实现客户端

方式一:用mina结构来实现客户端,引入mina相关jar包即可,Android也可以使用

1.先实现IoHandlerAdater得到监听事件,类似于服务端:
package com.he.client.minaclient;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class ClientHandler extends IoHandlerAdapter{private final Logger LOG = Logger.getLogger(ClientHandler.class);  @Overridepublic void messageReceived(IoSession session, Object message) throws Exception {// TODO Auto-generated method stubLOG.warn("客户端收到消息:" + message);if (message.toString().equals("1111")) {//收到心跳包LOG.warn("收到心跳包");session.write("1112");}}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {// TODO Auto-generated method stubsuper.messageSent(session, message);}@Overridepublic void sessionClosed(IoSession session) throws Exception {// TODO Auto-generated method stubsuper.sessionClosed(session);}@Overridepublic void sessionCreated(IoSession session) throws Exception {// TODO Auto-generated method stubsuper.sessionCreated(session);}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {// TODO Auto-generated method stubsuper.sessionIdle(session, status);}@Overridepublic void sessionOpened(IoSession session) throws Exception {// TODO Auto-generated method stubsuper.sessionOpened(session);}
}
2.写main方法启动客户端,连接服务端:
package com.he.client.minaclient;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class ClientTest {public static void main(String[] args) throws InterruptedException {//创建客户端连接器. NioSocketConnector connector = new NioSocketConnector();connector.getFilterChain().addLast("logger", new LoggingFilter());connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); //设置编码过滤器 connector.setHandler(new ClientHandler());//设置事件处理器 ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1", 8888));//建立连接 cf.awaitUninterruptibly();//等待连接创建完成 cf.getSession().write("hello,测试!");//发送消息,中英文都有 //cf.getSession().closeOnFlush();//cf.getSession().getCloseFuture().awaitUninterruptibly();//等待连接断开 //connector.dispose();}
}

过程也是一样的,加各种filter,绑定handler。上述代码运行之后向服务器发送了:“hello,测试”,并且收到返回值:welcome by he。然后每隔5s,就会收到服务端的心跳包:1111。在handler的messageReceived中,确认收到心跳包之后返回1112,实现心跳应答。以上过程,每5s重复一次。
main方法中最后三行注释掉的代码如果打开,客户端在发送完消息之后会主动断开。

方式二:客户端不借助于mina,换用java的普通socket来实现,这样就可以换成其他任何语言:

package com.he.client;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
/***@function:java的简单socket连接,长连接,尝试连续从服务器获取消息*@parameter:*@return:*@date:2016-6-22 下午03:43:18*@author:he*@notice:*/
public class SocketTestTwo {public static final String IP_ADDR = "127.0.0.1";// 服务器地址public static final int PORT = 8888;// 服务器端口号static String text = null;public static void main(String[] args) throws IOException {System.out.println("客户端启动...");Socket socket = null;socket = new Socket(IP_ADDR, PORT);PrintWriter os = new PrintWriter(socket.getOutputStream());os.println("al");os.println("two");os.flush();while (true) {try {// 创建一个流套接字并将其连接到指定主机上的指定端口号DataInputStream input = new DataInputStream(socket.getInputStream());// 读取服务器端数据byte[] buffer;buffer = new byte[input.available()];if (buffer.length != 0) {System.out.println("length=" + buffer.length);// 读取缓冲区input.read(buffer);// 转换字符串String three = new String(buffer);System.out.println("内容=" + three);if (three.equals("1111\n")) {System.out.println("发送返回心跳包");os = new PrintWriter(socket.getOutputStream());os.println("1112");os.flush();}}} catch (Exception e) {System.out.println("客户端异常:" + e.getMessage());os.close();}}}
}

以上代码运行效果和前一种方式完全一样。
但是注意此种方法和使用mina结构的客户端中有一处不同:对于心跳包的判断。本教程中服务端选用了mina自带的编解码器,通过换行符来区分上下两条消息,也就是每一条消息后面会带上一个换行符,所以在使用java普通的socket来连接时,判断心跳包不再是判断是否为“1111”,而是“1111\n”。对比mina结构的客户端中并不需要加上换行符是因为客户端中绑定了相同的编解码器。

程序运行结果截图:
服务端:

客户端:

红色的打印是mina自带的打印信息,黑色的是本工程中使用的log4j打印,所以你们的工程应该配置有如下log4j的配置文件才能看到一样的打印结果:

log4j.rootLogger=WARN,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.Threshold=WARN
log4j.appender.stdout.layout.ConversionPattern = [%-5p] [%d{yyyy-MM-dd HH\:mm\:ss,SSS}] [%x] %c %l - %m%n 

备注:要完整代码留邮箱哦,看到就回,或者发邮件到smile326@qq.com。

好了,应大家需求,工程代码终于抽空放到github了!
https://github.com/smile326/minaSpring
大家自己去看吧,顺便可以一起讨论下。

spring集成mina,包含心跳检测,实现服务端主动推送相关推荐

  1. flux服务器推消息,服务端主动推送数据,除了 WebSocket 你还能想到啥?

    原标题:服务端主动推送数据,除了 WebSocket 你还能想到啥? 来自公众号: 江南一点雨 在 上篇文章 中,松哥和大家分享了 WebFlux 的基本用法,小伙伴们已经了解到使用 WebFlux ...

  2. 使用SignalR从服务端主动推送警报日志到各种终端(桌面、移动、网页)

    工作上有个业务,.Net Core WebAPI作为服务端,需要将运行过程中产生的日志分类,并实时推送到各种终端进行报警,终端有桌面(WPF).移动(Xamarin.Forms).网站(Angular ...

  3. Netty-Server-Hander自定义解码器-服务端主动推送

    netty server端使用自定义解码器,通过存储client连接实现主动推送消息,并发送自定义心跳包 Server端 依赖 <dependency><groupId>io. ...

  4. 服务端主动推送数据,除了 WebSocket 你还能想到啥?

    在上篇文章中,松哥和大家分享了 WebFlux 的基本用法,小伙伴们已经了解到使用 WebFlux 我们的返回值可以是 Mono 也可以是 Flux,如果是 Flux,由于 Flux 中包含多个元素, ...

  5. 利用mochiweb让服务端主动推送数据至前端页面

    对于智能化监控软件,从wincc等国外的有相当积累的系统,以及国内一些小型的智能化集成软件,通常其监控数据通过前端绑定控件的方式,做到了实时的通讯,通过控件直接和后端服务交互.这种方式可以灵活的组态, ...

  6. 服务端如何推送消息给客户端?

    大家好,我是前端西瓜哥,今天带大家了解一下服务端如何推送消息给客户端. 有时候,我们希望服务端能够主动推送一些信息给客户端.但 HTTP 协议只能让客户端发起请求然后服务端响应,而无法让服务端主动去发 ...

  7. Asp.net SignalR 实现服务端消息推送到Web端

    参考博客https://www.cnblogs.com/wintersun/p/4148223.html ASP .NET SignalR是一个ASP .NET 下的类库,可以在ASP .NET 的W ...

  8. pushlet实现单机-集群服务端消息推送

    一.什么是pushlet? 1.pushlet推送是一种将java后台数据推送到web页面的框架技术,实现了comet. 2.comet是一个用于描述客户端和服务器之间交互的术语,即使用长期保持的ht ...

  9. SSE:使用HTTP做服务端数据推送的技术及其他通信技术

    文章目录 一.SSE 使用场景 服务端响应示例 浏览器处理服务器返回数据 SSE使用注意事项 使用示例 二.轮询 三.WebSocket 什么是Socket?什么是WebSocket? 那么他是如何建 ...

最新文章

  1. a*算法matlab代码_蚁群算法(含MATLAB代码)
  2. IntelliJ 发布 2020 RoadMap,中文版终于要来了?
  3. mysql 按照指定字段拼接_mysql 根据某个字段将多条记录的某个字段拼接成一个字段...
  4. C++高斯赛德迭代法,求线性方程组的解(version1.0)
  5. leetcode第72题:编辑距离
  6. 2007年浙江大学计算机及软件工程研究生机试真题
  7. java10下编译lombok注解的代码
  8. Procrustes Analysis(普氏分析)
  9. 变量命名规则_Java变量与常量
  10. 为什么你必须了解云原生?!
  11. 【Luogu1484】种树(贪心,堆)
  12. 使用JS读取本地文件
  13. matlab 几何布朗运动 股价,某股票价格遵循几何布朗运动
  14. 搭建文件服务器或nas免费版,nas文件服务器搭建
  15. hutool的BeanUtil
  16. java模拟手机号码发短信_java实现发送手机短信
  17. 李帅燕山大学计算机,燕山大学第二十三届学生会复试结果
  18. 软件项目管理课程复习题
  19. inux中,如何把一个目录及其子目录中的读写权限交给一个普通用户!
  20. TE Edit Control控件介绍

热门文章

  1. 问卷与量表数据分析(SPSS+AMOS)学习笔记(三) : 数据分析工具,三线表的制作
  2. OpenCV-图像旋转Rotate
  3. 新手向强化学习入门:机器如何理解世界以及RL基本概念
  4. Deep Learning(深度学习)学习系列之(七)
  5. (AM3517)修改u-boot与Linux调试串口以及文件系统显示终端串口(瑞泰ICETEK-AM3517)
  6. 【Windows】程序界面不显示在屏幕上
  7. 学计算机人都很自私,心理学:总有一天你会明白,那些选择丁克的人,都是自私的...
  8. 旗舰版ndows7bios设置,旗舰版windows7怎么设置u盘启动
  9. 【甄选靶场】Vulnhub百个项目渗透——项目十六:FristiLeaks_1.3(文件上传,py脚本改写,sudo提权,脏牛提权,源码获取)
  10. 64qam带宽计算_传输带宽计算