spring集成mina:

在学习mina这块时,在网上找了很多资料,只有一些demo,只能实现客户端向服务端发送消息、建立长连接之类。但是实际上在项目中,并不简单实现这些,还有业务逻辑之类的处理以及消息的推送之类的。于是就单独建立了一个工程项目,能够实现客户端和服务端相互之间发送消息、建立长连接、实现心跳检测等功能。
例如:可以实现客户端A向服务端发送消息,服务端将消息转发给客户端B。

效果实现图:
服务端启动成功后, 客户端A绑定服务端。

客户端B向服务端发送信息,请求服务端向客户端A推送消息

客户端A受到服务端转发的客户端B的消息

服务端心跳检测的实现

代码的目录结构:

那么开始实现代码的编写。(可以直接跳到底部,通过链接下载工程代码)
首先在官网上下载mina以及spring相关架包,这里相关架包已准备好:http://download.csdn.net/detail/qazwsxpcm/9870787

服务端:

1. 首先实现数据传输对象、消息常量的代码编写。

我使用的两个传输对象,接受和发送,代码如下。(传输对象可以自行定义)。

package com.pcm.mina.service.model;import java.io.Serializable;
import java.util.HashMap;/*** @author ZERO* @Description 服务端接收消息对象*/
public class SentBody implements Serializable {private static final long serialVersionUID = 1L;private String key;private HashMap<String, String> data;private long timestamp;public SentBody() {data = new HashMap<String, String>();timestamp = System.currentTimeMillis();}public String getKey() {return key;}public String get(String k) {return data.get(k);}public void put(String k, String v) {data.put(k, v);}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public void setKey(String key) {this.key = key;}public void remove(String k) {data.remove(k);}public HashMap<String, String> getData() {return data;}@Overridepublic String toString() {StringBuffer buffer = new StringBuffer();buffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");buffer.append("<sent>");buffer.append("<key>").append(key).append("</key>");buffer.append("<timestamp>").append(timestamp).append("</timestamp>");buffer.append("<data>");for (String key : data.keySet()) {buffer.append("<" + key + ">").append(data.get(key)).append("</" + key + ">");}buffer.append("</data>");buffer.append("</sent>");return buffer.toString();}public String toXmlString() {return toString();}
}
package com.pcm.mina.service.model;import java.io.Serializable;
import java.util.HashMap;
/*** @author ZERO* @Description 服务端发送消息对象*/
public class ReplyBody implements Serializable {private static final long serialVersionUID = 1L;/*** 请求key*/private String key;/*** 返回码*/private String code;/*** 返回说明*/private String message;/*** 返回数据集合*/private HashMap<String, String> data;private long timestamp;public ReplyBody(){data = new HashMap<String, String>();timestamp = System.currentTimeMillis();}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public void put(String k, String v) {data.put(k, v);}public String get(String k) {return data.get(k);}public void remove(String k) {data.remove(k);}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public HashMap<String, String> getData() {return data;}public void setData(HashMap<String, String> data) {this.data = data;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String toString(){StringBuffer buffer = new StringBuffer();buffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");buffer.append("<reply>");buffer.append("<key>").append(this.getKey()).append("</key>");buffer.append("<timestamp>").append(timestamp).append("</timestamp>");buffer.append("<code>").append(code).append("</code>");buffer.append("<message>").append(message).append("</message>");buffer.append("<data>");for(String key:this.getData().keySet()){buffer.append("<"+key+">").append(this.get(key)).append("</"+key+">");}buffer.append("</data>");buffer.append("</reply>");return buffer.toString();}public String toXmlString(){return toString();}
}
package com.pcm.mina.service.model;/*** @author ZERO* @Description 消息常量*/
public class Message {public static class ReturnCode {public static String CODE_404 = "404"; public static String CODE_403 = "403";  //该账号未绑定public static String CODE_405 = "405"; //事物未定义public static String CODE_200 = "200"; //成功public static String CODE_500 = "500"; //未知错误}public static final String SESSION_KEY = "account";/*** 服务端心跳请求命令*/public static final String CMD_HEARTBEAT_REQUEST = "hb_request";/*** 客户端心跳响应命令*/public static final String CMD_HEARTBEAT_RESPONSE = "hb_response";public static class MessageType {// 用户会 踢出下线消息类型public static String TYPE_999 = "999";}}

2,实现心跳检测功能。

服务端发送的是hb_request,那么客户端就应该返回hb_response,以此来实现心跳检测。

/*** @author ZERO* @Description  心跳协议的实现类*/
public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{private final Logger LOG=Logger.getLogger(KeepAliveMessageFactoryImpl.class);/*** 客户端心跳响应命令*/private static  String HEARRESPONSE=Message.CMD_HEARTBEAT_RESPONSE; /*** 服务端心跳请求命令*/private static  String HEARREQUEST=Message.CMD_HEARTBEAT_REQUEST;public Object getRequest(IoSession session) {LOG.warn("请求预设信息:"+HEARREQUEST);return HEARREQUEST;}public Object getResponse(IoSession session, Object message) {LOG.warn("响应预设信息: " + message);  /** 返回预设语句 */  return HEARRESPONSE;  }public boolean isRequest(IoSession session, Object message) {LOG.warn("请求心跳包信息: " + message);  return message.equals(HEARREQUEST); }public boolean isResponse(IoSession session, Object message) {LOG.warn("响应心跳包信息: " + message);  return message.equals(HEARRESPONSE);}
}

3, 实现服务端代码编写

服务端代码这块,因为注释写的已经够详细了,所以这里就不细说了。

package com.pcm.mina.service;import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.pcm.mina.service.filter.KeepAliveMessageFactoryImpl;/*** @author ZERO* @Description  mina服务端*/
public class SerNioSociketAcceptor {IoAcceptor acceptor;IoHandler ioHandler;int port;//记录日志public static Logger logger=Logger.getLogger(SerNioSociketAcceptor.class);//创建bind()方法接收连接public void bind() throws IOException{   //创建 协议编码解码过滤器ProtocolCodecFilter//设置序列化Object  可以自行设置自定义解码器ProtocolCodecFilter pf=new ProtocolCodecFilter(new ObjectSerializationCodecFactory());//getFilterChain() 获取 I/O 过滤器链,可以对 I/O 过滤器进行管理,包括添加和删除 I/O 过滤器。      acceptor = new NioSocketAcceptor();  //设置缓存大小acceptor.getSessionConfig().setReadBufferSize(1024);  // 设置过滤器acceptor.getFilterChain().addLast("executor",new ExecutorFilter()); acceptor.getFilterChain().addLast("logger",new LoggingFilter());  acceptor.getFilterChain().addLast("codec",pf);KeepAliveMessageFactory kamf=new KeepAliveMessageFactoryImpl();KeepAliveFilter kaf = new KeepAliveFilter(kamf, IdleStatus.BOTH_IDLE);kaf.setForwardEvent(true);kaf.setRequestInterval(30);  //本服务器为被定型心跳  即需要每30秒接受一个心跳请求  否则该连接进入空闲状态 并且发出idled方法回调acceptor.getFilterChain().addLast("heart", kaf); //读写通道60秒内无操作进入空闲状态acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);//绑定逻辑处理器acceptor.setHandler(ioHandler);  //绑定端口acceptor.bind(new InetSocketAddress(port));logger.info("Mina服务端启动成功...端口号为:"+port); //测试使用}//创建unbind()方法停止监听public void unbind(){acceptor.unbind();logger.info("服务端停止成功");}public void setAcceptor(IoAcceptor acceptor) {this.acceptor = acceptor;}//  设置 I/O 处理器。该 I/O 处理器会负责处理该 I/O 服务所管理的所有 I/O 会话产生的 I/O 事件。public void setIoHandler(IoHandler ioHandler) {this.ioHandler = ioHandler;}//设置端口public void setPort(int port) {this.port = port;}
//  获取该 I/O 服务所管理的 I/O 会话。public  Map<Long, IoSession> getManagedSessions(){return acceptor.getManagedSessions();}
}

4,实现session容器

如果需要保证线程安全,可以使用 ConcurrentHashMap,作为session容器。

package com.pcm.mina.service.session;import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import org.apache.mina.core.session.IoSession;/*** @author ZERO* @Description  IoSession包装类  */
public class PcmSession implements Serializable{private static final long serialVersionUID = 1L;private transient IoSession session;private String gid;             //session全局IDprivate Long nid;               //session在本台服务器上的IDprivate String host;            //session绑定的服务器IPprivate String account;         //session绑定的账号private String message;         //session绑定账号的消息private Long bindTime;          //登录时间private Long heartbeat;         //心跳时间public PcmSession(){}public PcmSession(IoSession session) {this.session = session;this.nid = session.getId();}public String getGid() {return gid;}public void setGid(String gid) {this.gid = gid;}public Long getBindTime() {return bindTime;}public void setBindTime(Long bindTime) {this.bindTime = bindTime;}public Long getHeartbeat() {return heartbeat;}public void setHeartbeat(Long heartbeat) {this.heartbeat = heartbeat;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public String getAccount() {return account;}public void setAccount(String account) {this.account = account;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public void setIoSession(IoSession session) {this.session = session;}public IoSession getIoSession() {return session;}//  将键为 key,值为 value的用户自定义的属性存储到 I/O 会话中。public void setAttribute(String key, Object value) {if(session!=null){session.setAttribute(key, value);}}public boolean containsAttribute(String key) {if(session!=null){return session.containsAttribute(key);}return false;}//  从 I/O 会话中获取键为 key的用户自定义的属性。public Object getAttribute(String key) {if(session!=null){return session.getAttribute(key);}return null;}//从 I/O 会话中删除键为 key的用户自定义的属性。public void removeAttribute(String key) {if(session!=null){session.removeAttribute(key);}}public SocketAddress getRemoteAddress() {if(session!=null){return session.getRemoteAddress();}       return null;}/*   将消息对象 message发送到当前连接的对等体。该方法是异步的,当消息被真正发送到对等体的时候,IoHandler.messageSent(IoSession,Object)会被调用。如果需要的话,也可以等消息真正发送出去之后再继续执行后续操作。*/public void write(Object msg) {if(session!=null){session.write(msg).isWritten(); }}public boolean isConnected() {if(session!=null){return session.isConnected();}return false;}public boolean  isLocalhost(){try {String ip = InetAddress.getLocalHost().getHostAddress();return ip.equals(host);} catch (UnknownHostException e) {e.printStackTrace();}return false;}/* 关闭当前连接。如果参数 immediately为 true的话,* 连接会等到队列中所有的数据发送请求都完成之后才关闭;否则的话就立即关闭。 */ public void close(boolean immediately) {if(session!=null){session.close(immediately);}}public boolean equals(Object message) {if (message instanceof PcmSession) {PcmSession t = (PcmSession) message;if( t.nid!=null && nid!=null){return  t.nid.longValue()==nid.longValue() && t.host.equals(host);} }  return false;}public String  toString(){StringBuffer buffer = new   StringBuffer();buffer.append("{");buffer.append("\"").append("gid").append("\":").append("\"").append(gid).append("\"").append(",");buffer.append("\"").append("nid").append("\":").append(nid).append(",");buffer.append("\"").append("host").append("\":").append("\"").append(host).append("\"").append(",");buffer.append("\"").append("account").append("\":").append("\"").append(account).append("\"").append(",");buffer.append("\"").append("bindTime").append("\":").append(bindTime).append(",");buffer.append("\"").append("heartbeat").append("\":").append(heartbeat);buffer.append("}");return buffer.toString();}
}
package com.pcm.mina.service.session;/*** @author ZERO* @Description  客户端的session管理接口*/
public interface SessionManager {/*** 添加新的session*/public void addSession(String account,PcmSession session);/*** * @param account 客户端session的 key 一般可用 用户账号来对应session* @return*/PcmSession getSession(String account);/*** 删除session* @param session*/public void  removeSession(PcmSession session);/*** 删除session* @param account*/public void  removeSession(String account);}
package com.pcm.mina.service.session;import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.pcm.mina.service.model.Message;/*** @author ZERO* @Description  自带默认 session管理实现*/
public class DefaultSessionManager implements SessionManager{private static HashMap<String,PcmSession> sessions =new  HashMap<String,PcmSession>();private static final AtomicInteger connectionsCounter = new AtomicInteger(0);public void addSession(String account, PcmSession session) {if(session !=null){sessions.put(account, session);connectionsCounter.incrementAndGet();}}public PcmSession getSession(String account) {return sessions.get(account);}public void removeSession(PcmSession session) {sessions.remove(session.getAttribute(Message.SESSION_KEY));}public void removeSession(String account) {sessions.remove(account);}}

5, 实现业务逻辑处理器。

因为注释写的已经够详细了,所以这里就不细说了。
做了简单业务逻辑处理,如有需要可以自行更改。

package com.pcm.mina.service.handler;import java.net.InetSocketAddress;
import java.util.HashMap;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.springframework.stereotype.Component;
import com.pcm.mina.service.RequestHandler;
import com.pcm.mina.service.model.Message;
import com.pcm.mina.service.model.ReplyBody;
import com.pcm.mina.service.model.SentBody;
import com.pcm.mina.service.session.PcmSession;/*** @author ZERO* @Description  I/O 处理器 客户端请求的入口,所有请求都首先经过它分发处理 业务逻辑实现*/ @Component("sercixeMainHandler")
public class ServiceMainHandler extends IoHandlerAdapter{protected final Logger logger = Logger.getLogger(ServiceMainHandler.class);//本地handler请求private HashMap<String, RequestHandler> handlers = new HashMap<String, RequestHandler>();//出错时@Overridepublic void exceptionCaught(IoSession session, Throwable cause){logger.error("exceptionCaught()... from "+session.getRemoteAddress());logger.error(cause);cause.printStackTrace();}//接收到消息时@Overridepublic void messageReceived(IoSession iosession,Object message){        logger.info("服务端接收到的消息..."+message.toString());if(message instanceof SentBody){SentBody sent=(SentBody) message;ReplyBody rb=new ReplyBody();PcmSession session=new PcmSession(iosession);String key=sent.getKey();if("quit".equals(sent.get("message"))){ //服务器断开的条件try {sessionClosed(iosession);} catch (Exception e) {rb.setCode(Message.ReturnCode.CODE_500);e.printStackTrace();}}else{//根据key的不同调用不同的handlerRequestHandler rhandler=handlers.get(key);if(rhandler==null){//如果没有这个handlerrb.setCode(Message.ReturnCode.CODE_405);rb.setMessage("服务端未定义!");}else{//有的话rb=rhandler.process(session, sent);}}if(rb !=null){rb.setKey(key);session.write(rb);logger.info("服务端发送的消息: " + rb.toString());}}}//发送消息@Overridepublic void messageSent(IoSession session, Object message) throws Exception {//   session.close(); //发送成功后主动断开与客户端的连接 实现短连接logger.info("服务端发送信息成功...");}//建立连接时@Overridepublic void sessionCreated(IoSession session) throws Exception {InetSocketAddress sa=(InetSocketAddress)session.getRemoteAddress();String address=sa.getAddress().getHostAddress(); //访问的ipsession.setAttribute("address", address);//将连接的客户端ip保存到map集合中SentBody body=new SentBody();body.put("address", address);logger.info("访问的ip:"+address);}//关闭连接时   @Overridepublic void sessionClosed(IoSession iosession) throws Exception {PcmSession session=new PcmSession(iosession);logger.debug("sessionClosed()... from "+session.getRemoteAddress());try {RequestHandler hand=handlers.get("client_closs");if(hand !=null && session.containsAttribute(Message.SESSION_KEY)){hand.process(session, null);}} catch (Exception e) {e.printStackTrace();}session.close(true);  logger.info("连接关闭");}//空闲时@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {logger.debug("sessionIdle()... from "+session.getRemoteAddress());}//打开连接时   @Overridepublic void sessionOpened(IoSession session) throws Exception {logger.info("开启连接...");}public HashMap<String, RequestHandler> getHandlers() {return handlers;}public void setHandlers(HashMap<String, RequestHandler> handlers) {this.handlers = handlers;}
}

6, 实现业务逻辑代码。

目前实现了绑定,推送以及关闭逻辑代码。如有需要,可自行增加。

package com.pcm.mina.service;import com.pcm.mina.service.model.ReplyBody;
import com.pcm.mina.service.model.SentBody;
import com.pcm.mina.service.session.PcmSession;
/*** @author ZERO* @Description  请求处理接口,所有的请求必须实现此接口*/
public interface RequestHandler {public abstract ReplyBody process(PcmSession session,SentBody message);
}
package com.pcm.mina.service.handler;import java.net.InetAddress;
import java.util.UUID;
import org.apache.log4j.Logger;
import com.pcm.mina.service.RequestHandler;
import com.pcm.mina.service.model.Message;
import com.pcm.mina.service.model.ReplyBody;
import com.pcm.mina.service.model.SentBody;
import com.pcm.mina.service.session.DefaultSessionManager;
import com.pcm.mina.service.session.PcmSession;
import com.pcm.util.ContextHolder;/*** @author ZERO* @Description  账号绑定实现*/
public class BindHandler implements RequestHandler {protected final Logger logger = Logger.getLogger(BindHandler.class);public ReplyBody process(PcmSession newSession, SentBody message) {ReplyBody reply = new ReplyBody();DefaultSessionManager sessionManager= ((DefaultSessionManager) ContextHolder.getBean("PcmSessionManager"));try { String account = message.get(Message.SESSION_KEY);newSession.setAccount(account);newSession.setMessage(message.get("message"));newSession.setGid(UUID.randomUUID().toString());newSession.setHost(InetAddress.getLocalHost().getHostAddress());//第一次设置心跳时间为登录时间newSession.setBindTime(System.currentTimeMillis());newSession.setHeartbeat(System.currentTimeMillis());/*** 由于客户端断线服务端可能会无法获知的情况,客户端重连时,需要关闭旧的连接*/PcmSession oldSession  = sessionManager.getSession(account);//如果是账号已经在另一台终端登录。则让另一个终端下线if(oldSession!=null&&!oldSession.equals(newSession)){oldSession.removeAttribute(Message.SESSION_KEY);ReplyBody rb = new ReplyBody();rb.setCode(Message.MessageType.TYPE_999);//强行下线消息类型rb.put(Message.SESSION_KEY, account);if(!oldSession.isLocalhost()){/*判断当前session是否连接于本台服务器,如不是发往目标服务器处理MessageDispatcher.execute(rb, oldSession.getHost());*/}else{oldSession.write(rb);oldSession.close(true);oldSession = null;}oldSession = null;}if(oldSession==null){sessionManager.addSession(account, newSession);}reply.setCode(Message.ReturnCode.CODE_200);} catch (Exception e) {reply.setCode(Message.ReturnCode.CODE_500);e.printStackTrace();}logger.debug("绑定账号:" +message.get(Message.SESSION_KEY)+"-----------------------------" +reply.getCode());return reply;}}
package com.pcm.mina.service.handler;import org.apache.log4j.Logger;
import com.pcm.mina.service.RequestHandler;
import com.pcm.mina.service.model.Message;
import com.pcm.mina.service.model.ReplyBody;
import com.pcm.mina.service.model.SentBody;
import com.pcm.mina.service.session.DefaultSessionManager;
import com.pcm.mina.service.session.PcmSession;
import com.pcm.util.ContextHolder;/*** @author ZERO* @Description  推送消息*/
public class PushMessageHandler implements RequestHandler {protected final Logger logger = Logger.getLogger(PushMessageHandler.class);public ReplyBody process(PcmSession ios, SentBody sent) {ReplyBody reply = new ReplyBody();String account=(String) sent.getData().get(Message.SESSION_KEY);DefaultSessionManager sessionManager=(DefaultSessionManager) ContextHolder.getBean("PcmSessionManager");PcmSession session=sessionManager.getSession(account);if(session !=null){sent.remove(Message.SESSION_KEY);reply.setKey(sent.getKey());reply.setMessage("推送的消息");reply.setData(sent.getData());reply.setCode(Message.ReturnCode.CODE_200); session.write(reply); //转发获取的消息logger.info("推送的消息是:"+reply.toString());}else{reply.setCode(Message.ReturnCode.CODE_403);reply.setMessage("推送失败");}return reply;}
}
package com.pcm.mina.service.handler;import org.apache.log4j.Logger;
import com.pcm.mina.service.RequestHandler;
import com.pcm.mina.service.model.Message;
import com.pcm.mina.service.model.ReplyBody;
import com.pcm.mina.service.model.SentBody;
import com.pcm.mina.service.session.DefaultSessionManager;
import com.pcm.mina.service.session.PcmSession;
import com.pcm.util.ContextHolder;/*** @author ZERO* @Description  断开连接,清除session*/
public class SessionClosedHandler implements RequestHandler {protected final Logger logger = Logger.getLogger(SessionClosedHandler.class);public ReplyBody process(PcmSession ios, SentBody message) {DefaultSessionManager sessionManager  =  ((DefaultSessionManager) ContextHolder.getBean("PcmSessionManager"));if(ios.getAttribute(Message.SESSION_KEY)==null){return null;}String account = ios.getAttribute(Message.SESSION_KEY).toString();sessionManager.removeSession(account);return null;}
}

7,spring配置

可以将过滤器添加到spring这块,包括心跳设置。

<!-- spring集成mina --><!-- 设置 I/O 接受器,并指定接收到请求后交给 myHandler 进行处理 --> <bean id="customEditorConfigurer" class="org.springframework.beans.factory.config.CustomEditorConfigurer"><property name="customEditors" ><map><entry key="java.net.SocketAddress"  value="org.apache.mina.integration.beans.InetSocketAddressEditor"/></map></property></bean><!-- handlers事件 --><bean id="IoHandler" class="com.pcm.mina.service.handler.ServiceMainHandler"><property name="handlers"><map><entry key="client_bind">  <!-- 创建连接 --><bean class="com.pcm.mina.service.handler.BindHandler"></bean></entry><entry key="client_closs">  <!--断开清除会话  --><bean class="com.pcm.mina.service.handler.SessionClosedHandler"></bean></entry><entry key="client_push">  <!--在线推送消息  --><bean class="com.pcm.mina.service.handler.PushMessageHandler"></bean></entry></map></property></bean><!-- IoAccepter,绑定到1255端口 --><!-- 通过 init-method指明了当 I/O 接受器创建成功之后,调用其 bind方法来接受连接;通过 destroy-method声明了当其被销毁的时候,调用其 unbind来停止监听 --><bean id="SerNioSociketAcceptor"  class="com.pcm.mina.service.SerNioSociketAcceptor" init-method="bind" destroy-method="unbind">  <property name="port" value="1255" /> <property name="ioHandler" ref="IoHandler" /> </bean><!--spring动态获取bean实现  --><bean id="ContextHolder" class="com.pcm.util.ContextHolder"></bean><bean id="PcmSessionManager" class="com.pcm.mina.service.session.DefaultSessionManager"/> 

客户端

1,编写业务逻辑处理器

几乎和服务端一样,这里因为测试,所以就从简了。

package com.pcm.mina.client.MinaDemo;import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;/*** @author ZERO* @Description 客户端handle*/
public class MinaClientHandler extends IoHandlerAdapter {private static Logger logger = Logger.getLogger(MinaClientHandler.class);@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {String msg = message.toString();//    logger.info("客户端A接收的数据:" + msg);System.out.println("客户端A接收的数据:" + msg);if(msg.equals("hb_request")){logger.warn("客户端A成功收到心跳包:hb_request");session.write("hb_response");logger.warn("客户端A成功发送心跳包:hb_response");}}@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {logger.error("发生错误...", cause);}
}

2,编写客户端程序。

也几乎和服务端一致,为了简单使用,编写main方法。
注:客户端和服务端的过滤器要一致。

package com.pcm.mina.client.MinaDemo;import java.net.InetSocketAddress;
import org.apache.log4j.Logger;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import com.pcm.mina.service.model.SentBody;/*** @author ZERO* @Description mina 客户端*/public class MinaClient {private static Logger logger = Logger.getLogger(MinaClient.class);private static String HOST = "127.0.0.1";private static int PORT = 1255;private static  IoConnector connector=new NioSocketConnector();private static   IoSession session;public static IoConnector getConnector() {return connector;}public static void setConnector(IoConnector connector) {MinaClient.connector = connector;}/* * 测试服务端与客户端程序!a. 启动服务端,然后再启动客户端b. 服务端接收消息并处理成功;*/@SuppressWarnings("deprecation")public static void main(String[] args) {// 设置链接超时时间connector.setConnectTimeout(30000);// 添加过滤器  可序列话的对象 connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));// 添加业务逻辑处理器类connector.setHandler(new MinaClientHandler());ConnectFuture future = connector.connect(new InetSocketAddress(HOST, PORT));// 创建连接future.awaitUninterruptibly();// 等待连接创建完成session = future.getSession();// 获得sessionbindstart();//   pushstart();}public static void bindstart(){logger.info("客户端A绑定服务端");try {SentBody sy=new SentBody();sy.put("message", "这是个测试账号");sy.put("account", "123456");sy.setKey("client_bind");session.write(sy);// 发送消息System.out.println("客户端A与服务端建立连接成功...发送的消息为:"+sy);//      logger.info("客户端A与服务端建立连接成功...发送的消息为:"+sy);} catch (Exception e) {e.printStackTrace();logger.error("客户A端链接异常...", e);}session.getCloseFuture().awaitUninterruptibly();// 等待连接断开connector.dispose();}
}       

注意事项

1,客户端和服务端的过滤器要保持一致,不然容易出现异常 java.nio.charset.MalformedInputException。

2,使用对象进行传输的时候需要实现接口java.io.Serializable接口。

3,如果使用对象传输,所在的包。类名也要一致,不然会出现 org.apache.mina.core.buffer.BufferDataException: java.io.InvalidClassException: failed to read class descriptor (Hexdump: 00 00 00 3C AC ED 00 05 73 72 01 00 1C 63 6F 6D 2E 65 78 61 6D 70 6C 65 2E 63 这种错误(被困扰过很久)。

代码就先告一段落。客户端也可以通过socket和mina进行数据传输,这里就不贴代码了。
spring整合mina,暂时就到这了。项目我放到了github上,地址:https://github.com/xuwujing/springMina/tree/master
如果感觉不错,希望可以给个star。

spring集成mina 实现消息推送以及转发相关推荐

  1. spring boot 集成socketIo 做消息推送

    spring boot 集成socketIo 做消息推送 项目需求 代码展示 客户端代码 服务端代码 项目需求 后台管理系统用户小铃铛,消息推送功能并展示有多少条消息或者小红点 代码展示 客户端代码 ...

  2. Springboot集成websocket实现消息推送和在线用户统计

    一.HTTP 说到websocket首先要说Http,Http大家都知道是一个网络通信协议,每当客户端浏览器需要访问后台时都会发一个请求,服务器给出响应后该连接就会关闭,请求只能有客户端发起,服务端是 ...

  3. Android集成阿里云消息推送的方法步骤

    一 创建App应用 1.1 在控制台发(https://mhub.console.aliyun.com)的App列表页,点击页面产品列表中"添加产品"的图标即可创建一个新的产品(产 ...

  4. 友盟小米收不到推送消息_一个轻量级、可插拔的Android消息推送框架。一键集成推送(极光推送、友盟推送、华为、小米推送等)...

    XPush 一个轻量级.可插拔的Android消息推送框架.一键集成推送(极光推送.友盟推送.华为.小米推送等),提供有效的保活机制,支持推送的拓展,充分解耦推送和业务逻辑,解放你的双手! 在提iss ...

  5. WebSocket消息推送和聊天功能实现

    WebSocket消息推送 SpringBoot集成WebSocket实现消息推送和聊天Demo gradle引入依赖 测试用的Controller 两个测试页面 WebSocket的Endpoint ...

  6. Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送

    一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...

  7. 使用spring boot +WebSocket实现(后台主动)消息推送

    前言:使用此webscoket务必确保生产环境能兼容/支持!使用此webscoket务必确保生产环境能兼容/支持!使用此webscoket务必确保生产环境能兼容/支持!主要是tomcat的兼容与支持. ...

  8. java服务端集成极光消息推送--详细开发步骤

    1.极光推送账号准备 要使用极光消息推送必须先在官方网站上注册账号,并添加应用. 产品介绍:https://docs.jiguang.cn/jpush/guideline/intro/ 注册开发者账号 ...

  9. springboot集成钉钉_Java(SpringBoot)实现钉钉机器人消息推送

    零.前言 上一次做消息推送,是微信公众号的定时消息通知. 由于自己当时的水平不够,加上企鹅家的开发文档普遍不太友好,导致根本看不懂文档在写什么,不得不去看第三方博客来学习公众号的开发. 这次就不一样了 ...

最新文章

  1. RocketMQ原理解析-producer 4.发送分布式事物消息
  2. jsp 选择时分秒控件_【最全】9月计算机考试报名通知(6.19更新,仅差4省市)附:分析如何选择科目...
  3. 微型计算机有多少进制,微型计算机原理二进制十进制十六进制.doc
  4. 解决Android studio 加载不出网络图片的步骤
  5. node.js 安装 测试
  6. 创建控制文件后的疑难解答
  7. 在java中excel格式变为zip什么原因_Excel工作表中最常见的8类问题,你一定遇到过,附解决方法!...
  8. 如何在面试中发现优秀程序员
  9. 【RLchina第二讲】汪军老师推荐的强化学习理论学习资料
  10. python numpy 数据类型为python对象-关于Numpy数据类型对象(dtype)使用详解
  11. 查看iOS App的bundleId
  12. 模式识别算法:SVM支持向量机
  13. python从入门到精通 清华大学出版社-清华大学出版社 python
  14. C++ WinHTTP实现文件下载
  15. ue4蓝图碰撞检测的类型_UE4蓝图碰撞检测解析
  16. c# 接管系统鼠标_4个阶段的方法来接管大型,混乱的IT系统
  17. Excel催化剂开源第35波-图片压缩及自动旋转等处理
  18. 如何看待快码编程这一款中文多平台编程工具
  19. 论文写作Word技巧
  20. 超全Python 量化金融库汇总,必看

热门文章

  1. 通过Camtasia来添加各种各样的光标效果
  2. 关于 Goby 红队专版获取指南!
  3. Android字体渐变效果
  4. 20秋C语言在线作业1,地大20秋《C语言程序设计(新)》在线作业一资料
  5. NB卡开卡注意事项【转】
  6. 协整检验该如何分析?
  7. unity用方向键来控制角色上楼梯
  8. 知道创宇总监姚昌林:敏捷开发-如何打破研发交付过程中的“墙”
  9. IEEE论文Latex 参考文献插入说明
  10. java web 上传附件_JAVA WEB文件上传步骤