Mina 自定义硬件通讯协议框架搭建(TCP Client)

2018.03.04 18:49:29字数 1057阅读 2323

Apache MINA 是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。

使用背景

大三读完,出去实习。接触到的第一个框架。我本是一名JAVA黑 微笑, 奈何实习公司仅JAVA开发,好吧,路转粉。
由于是边学边做, 难免在学习过程中遇到了很多坑,不过最终还是解决了,这里就不一一叙述了。
本文将根据自己搭建的框架的过程写此文(参考了很多前辈的代码,总结出来)。

项目背景

项目是根据一套设备(其实是一种超大型LED屏幕)厂商提供的通讯协议去控制设备,该协议是硬件厂家自己定制。
设备数目: 450套

自定义词说明

名称 说明
VMS 指设备

通讯协议

名称 说明
类型 TCP/IP
客户端 MINA
服务端 VMS

数据包格式

包头、类型、数据长度、数据、校验码、包尾
名称 偏移位置 长度 值范围
包头 0 1 固定字符‘*’
类型 1 1 0-127
数据长度 2 2 数据字节数
数据 4 n 协议类型决定
校验码 n+4 2 类型、数据长度、数据三部分的所有字节的CRC-16码
包尾 n+6 1 字符‘#’
转义问题: 其中字符'\'为转意符,所有除头尾外的字节,如果是'*','#','\'在通讯时换成"\*","\#","\\" 。 大小端问题:凡涉及多字节数据均为低字节在前

创建项目

开发环境: IntelliJ IDEA (学生可以免费申请收费版(hhh)),创建的是一个Maven的工程。

<!-- 日志工具-->
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version>
</dependency><!-- MINA  -->
<dependency><groupId>org.apache.mina</groupId><artifactId>mina-core</artifactId><version>2.0.7</version>
</dependency>

目录结构

|-- pom.xml
`-- src|-- com|   `-- taoroot|       `-- mina|           |-- protocol                                // 协议 目录|           |   |-- IMessageBody.java|           |   |-- MyBuffer.java                       // IoBuffer 封装|           |   |-- VMSMessage.java                     |           |   |-- VMSMessageFactory.java              // 解析工厂|           |   |-- VMSMessageHeader.java               // 表头部分内容|           |   |-- VMS_00.java                         // 心跳数据包|           |   |-- VMS_14.java                         // 巡检数据包|           |-- client                                  // Mina 目录|           |   |-- ClientSessionHandler.java|           |   |-- ConnectListener.java|           |   |-- MinaClient.java                     // 程序路口|           |   |-- ExceptionHandler.java               // 心跳包超时处理类|           |   |-- VMSKeepAliveFilter.java             // 心跳包拦截器|           |   |-- VMSKeepAliveMessageFactory.java     // 心跳包工厂|           |   |-- VMSMessageCodecFactory.java     |           |   |-- VMSMessageDecoder.java|           |   `-- VMSMessageEncoder.java|           `-- util|               |-- ByteUtil.java|               |-- ClassUtils.java|               |-- ConfigUtil.java|               |-- CrcCodeUtil.java|               |-- MinaUtil.java|               |-- RGBUtil.java|-- config.properties                                   // 默认配置`-- log4j.properties                                    // 日志配置

启动服务 MinaClient

vmsMap:

用来接收来自设备列表
key: 设备编号, value: 设备地址
编号是唯一的,后端下发指令时,用此编号区分设备

sessionMap

设备对应的session
key: 设备编号, value: IoSession
只有连接成功的设备才会存入,所以在下一次获取列表后,将重新尝试连接。

初始化Mina以后, 系统将定时从后端获取设备列表存入vmsMap中,与sessionMap中的编号比较。
有几次情况需要考虑:

情况1. vmsMap中有, sessionMap中没有: 尝试创建IoSession,创建成功则加入sessionMap。
情况2. vmsMap中有, sessionMap中有,但设备地址发生改变: 关闭现有session, 然后按情况1处理。
情况3. vmsMap中没有,sessionMap中有, 关闭现有session,从sessionMap移除。

流程图

getList.png

代码实现

    // Mina 初始化配置connector = new NioSocketConnector();connector.setConnectTimeoutMillis(MinaUtil.CONNECT_TIMEOUT);// ioBuffer 日志(实际生产环境不需要)connector.getFilterChain().addLast( "logger", new LoggingFilter() );// 解码过滤层 (数据包转对象)connector.getFilterChain().addLast("vms_coder", new ProtocolCodecFilter(new VMSMessageCodecFactory()));// 超时过滤层 (对TCP在线,心跳包超时的设备主动断开连接)KeepAliveFilter heartBeat = new VMSKeepAliveFilter(new VMSKeepAliveMessageFactory());// 设置心跳频率heartBeat.setRequestInterval((int) MinaUtil.HEART_BEAT_RATE);connector.getFilterChain().addLast("heartbeat", heartBeat);// 业务处理类connector.setHandler(new ClientSessionHandler());IoSession session;// MQ 初始化// Mina本身不知道设备的网络地址,是通过订阅形式,从后端获取过来TopicSender.createTopic(VMS_SCREEN_LIST_TOPIC);MQListener.init();for (; ; ) {// 获取列表 (0是设备列表信息)MinaUtil.getDeviceScreen("", 0);// 是否被锁if (!StaticUtil.isIsRefreshDeviceMap()) {// 关锁StaticUtil.setIsRefreshDeviceMap(true);// 遍历设备列表for (String devNo : deviceMap.keySet()) {// 新设备上线, 创建新连接if (!ioSessionMap.containsKey(devNo)) {newSocket(devNo, deviceMap.get(devNo));} else {// 查看设备对应的地址是否变化session = ioSessionMap.get(devNo);String currentAddress = "";  // 当前地址String newAddressPort = deviceMap.get(devNo);  // 新地址// 从session中获取出当前连接的地址currentAddress = getAddressPort(session);// 地址发生改变if (!currentAddress.equals(newAddressPort)) {session.close(true);    // 关闭目前连接的sessionnewSocket(devNo, newAddressPort);    // 用新地址创建连接}}}// 设备列表中已经删除了编号, session如果存在,也需要断开连接Iterator<Map.Entry<String, IoSession>> it = ioSessionMap.entrySet().iterator();while (it.hasNext()) {Map.Entry<String, IoSession> entry = it.next();String devNo = entry.getKey();session = entry.getValue();if (!deviceMap.containsKey(devNo)) {it.remove();// 从session中获取出当前连接的地址String currentAddress = getAddressPort(session);session.close(true);sendOnlineStatus(devNo, currentAddress, VMS_OFFLINE_STATUS);logger.info("设备: " + devNo + " 离线");}}// 开锁StaticUtil.setIsRefreshDeviceMap(false);}// 休眠一段时间在去获取设备列表try {Thread.sleep(REFRESH_DEVICE_LIST_TIME * 1000);}catch (Exception e) {e.toString();}}}

编解码工厂 VMSMessageCodecFactory

public class VMSMessageCodecFactory implements ProtocolCodecFactory {private final VMSMessageDecoder decoder;       // 解码器private final VMSMessageEncoder encoder;       // 编码器
}

解码器

粘包

两个数据包的部分数据相连接

解码流程图

// todo

解码器源代码

public class VMSMessageDecoder extends CumulativeProtocolDecoder {private static Logger logger = Logger.getLogger(VMSMessageDecoder.class);protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {if (in.remaining() < 1) {return false;}in.mark();byte[] data = new byte[in.remaining()];in.get(data);int pos = 0;in.reset();while (in.remaining() > 0) {in.mark();byte tag = in.get();//搜索包的开始位置if (tag == 0x2A && in.remaining() > 0) {tag = in.get();//寻找包的结束while (tag != 0x23) {if (in.remaining() <= 0) {in.reset(); //没有找到结束包,等待下一次包return false;}tag = in.get();}pos = in.position();int packetLength = pos - in.markValue();if (packetLength > 1) {byte[] tmp = new byte[packetLength];in.reset();in.get(tmp);//解析VMSMessage message = new VMSMessage();message.ReadFromBytes(tmp);out.write(message); //触发接收Message的事件}}}return false;}
}

编码器

编码器相对就简单很多。

编码器源代码

public class VMSMessageEncoder extends ProtocolEncoderAdapter {@Overridepublic void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) throws Exception {IoBuffer buf = IoBuffer.allocate(500).setAutoExpand(true);VMSMessage vmsMessage = (VMSMessage) message;buf.put(vmsMessage.WriteToBytes());buf.flip();out.write(buf);out.flush();buf.free();}
}

心跳包机制

vms 和 mina 之间需要不能有超过2分钟的空闲状态。 需要定期发送心跳包,否则vms将进入离线状态,停止播放。
发送过程如下: mina 进入空闲状态, 发送心跳包, vms回发心跳包, mina 接收到心跳包。如果未在规定的时间内,接收到心跳包,超过三次,将主动关闭session。
mina 其实自带了一套心跳包拦截器,(上文mina配置代码)。可以将心跳包在IoHandler前处理掉,就不用再业务层去关心了。

MyKeepAliveFilter

public class MyKeepAliveFilter extends KeepAliveFilter {private static final int TIMEOUT = CmdOptionHandler.getTimeout();public MyKeepAliveFilter(KeepAliveMessageFactory messageFactory) {// super(心跳包工厂, 两遍都是空闲状态, 超时处理类, 上发超时时间, 下发超时时间)super(messageFactory, IdleStatus.BOTH_IDLE, new MyKeepAliveRequestTimeoutHandler(), TIMEOUT, TIMEOUT);//此消息不会继续传递,不会被业务层看见this.setForwardEvent(false);}
}

VMSKeepAliveMessageFactory

主要有两个功能:
第一,判断是否是心跳包,
第二,生成一个心跳包数据。
KeepAliveMessageFactory 提供了四个接口,当初看了网上教程说什么半工,双工什么的,一头雾水,最后硬着头皮,源代码了。当时猜测是 sessionIdle 发送了心跳包, messageReceived 接收心跳包。

KeepAliveFilter 解读

// 空闲状态触发
public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {if (status == interestedIdleStatus) {if (!session.containsAttribute(WAITING_FOR_RESPONSE)) {// 看来 getRequest 是用来获取一个发送心跳包数据接口Object pingMessage = messageFactory.getRequest(session);if (pingMessage != null) {nextFilter.filterWrite(session, new DefaultWriteRequest(pingMessage));if (getRequestTimeoutHandler() != KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {markStatus(session);if (interestedIdleStatus == IdleStatus.BOTH_IDLE) {session.setAttribute(IGNORE_READER_IDLE_ONCE);}} else {resetStatus(session);}}} else {handlePingTimeout(session);}}
}
// 接收到数据触发
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {try {// 判断数据包是否是心跳包,if (messageFactory.isRequest(session, message)) {// 这里又获取了一个心跳包,所以说这里是用来判断,vms主动发送心跳包过来,然后mina回应一个心跳包// 在我们设备中,设备不主动发送心跳包,也不对设备主动发送心跳包做一个回应,所以,不需要这个逻辑// 所以让 isRequest 放回 false 就行, Object pongMessage = messageFactory.getResponse(session, message);// 保险起见,让 getResponse 也返回nullif (pongMessage != null) {nextFilter.filterWrite(session, new DefaultWriteRequest(pongMessage));}}// 这里也是判断心跳包,if (messageFactory.isResponse(session, message)) {// 里面是清空了 mina在sessionIdl下发送的心跳包标志位// 所以这心跳包 是用来是vms回发的心跳响应包。resetStatus(session);}} finally {if (!isKeepAliveMessage(session, message)) {nextFilter.messageReceived(session, message);}}
}

好了,到这里就知道怎么去写 VMSKeepAliveMessageFactory 了。
当初看教程的死活没看懂,还弄出了死循环,vms、mina一直在对送心跳包。有时候源代码才是最好的教程呀(hhh)。

心跳包计数器(业务逻辑扩展)

心跳包是在空闲状态下会以一定的频率发送,业务层需要有一个定时巡检的数据包,vms会返回自身状态信息,所以我就把这个单做了了一个计数器使用。mina每次发送一个心跳包,就自增一,达到触发值就发送状态包。
巡检间隔 = 心跳包发送间隔 * 触发值

VMSKeepAliveMessageFactory 源代码

public class VMSKeepAliveMessageFactory implements KeepAliveMessageFactory {private final static org.slf4j.Logger logger = LoggerFactory.getLogger(ClientSessionHandler.class);private VMSMessage vmsMessage;// 发送心跳包@Overridepublic Object getRequest(IoSession arg0) {heartCountAdd(arg0);vmsMessage = new VMSMessage();vmsMessage.setMessageContents(new VMS_00());return vmsMessage;}// 拦截心跳包@Overridepublic boolean isResponse(IoSession session, Object message) {return isHeartPage(message);}@Overridepublic Object getResponse(IoSession arg0, Object arg1) {return null;}@Overridepublic boolean isRequest(IoSession session, Object message) {return false;}//  心跳包 计数器private void heartCountAdd(IoSession session) {int heartCounter = (int) session.getAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE) + 1;session.setAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE, heartCounter);if(heartCounter > MinaUtil.VMS_HEARTBEAT_MAX) {session.setAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE, 0);heartCounterHandler(session);}}// 定时查询任务private void heartCounterHandler(IoSession session) {// 定期查询 设备状态VMSMessage message = new VMSMessage();message.setMessageContents(new VMS_0F());session.write(message);}// 判断是不是心跳包private boolean isHeartPage(Object message) {vmsMessage = (VMSMessage) message;if (vmsMessage.getMessageContents() instanceof VMS_00) {return true;}return false;}
}

tcp client相关推荐

  1. codeblock socket 编译错误_从Linux源码看Socket(TCP)Client端的Connect

    从Linux源码看Socket(TCP)Client端的Connect 前言 笔者一直觉得如果能知道从应用到框架再到操作系统的每一处代码,是一件Exciting的事情. 今天笔者就来从Linux源码的 ...

  2. nodejs TCP server和TCP client如何进行数据交互

    使用Jerry之前的文件成功建立服务器端和客户端的TCP连接后,客户端得到一个client实例: async function startClient(ip = configJson.TCP.clie ...

  3. nodejs TCP server和TCP client如何建立连接

    首先客户端和服务器端通信得需要一个消息结构,我用如下的构造函数创建一个消息结构,包含消息正文和消息类型(method): function message(_data,_method = '') {_ ...

  4. Modbus协议栈开发笔记之四:Modbus TCP Client开发

    这一次我们封装Modbus TCP Client应用.同样的我们也不是做具体的应用,而是实现TCP客户端的基本功能.我们将TCP客户端的功能封装为函数,以便在开发具体应用时调用. 对于TCP客户端我们 ...

  5. 网络通信之TCP Client通信(基于Arduino)

    网络通信之TCP Client通信(基于Arduino) 前期准备 实验要求 具体方法 程序流程 程序 实验现象 后续 前期准备 Arduino IDE ESPDuino Arduino ESP826 ...

  6. swoole深入学习 2. tcp Server和tcp Client

    swoole深入学习 2. tcp Server和tcp Client 标签(空格分隔): swoole 这节来学习Swoole最基础的Server和Client.会通过创建一个tcp Server来 ...

  7. STM32F1 W5500 TCP Client 回环测试

    刚刚接触W5500的时候,做TCP Client回环测试的时候,出现很奇怪的问题,查了好多遍代码,死活连接不上PC网络助手的TCPServer.其实代码本身没什么大问题,PC机的防火墙忘记关闭了.总结 ...

  8. 关于HML要玩物联网这件事 之 CC3200 TCP Client

    假装是个引言 去年TI难得发了次大福利,圣诞节派送CC3200开发板.本辣鸡博主抢得早,加上没落C9.edu.cn 邮箱加持,抽奖居然抽中了.TI不论是赞助竞赛还是送板子,目的都很明确,就是想推广自己 ...

  9. ESP8266开发之旅 网络篇⑦ TCP Server TCP Client

    授人以鱼不如授人以渔,目的不是为了教会你具体项目开发,而是学会学习的能力.希望大家分享给你周边需要的朋友或者同学,说不定大神成长之路有博哥的奠基石... 共同学习成长QQ群 622368884,不喜勿 ...

最新文章

  1. 几个阿里, 美团,腾讯大佬的公众号!超级变态!
  2. bootstrape实战案例_第二百五十二节,Bootstrap项目实战-首页
  3. python中的位置怎么看_如何知道项目在Python有序字典中的位置
  4. 基于MATLAB的turbo码代码,一种基于Simulink的Turbo码仿真实现
  5. 抽象工厂模式(Absraact Factory)介绍与实现
  6. java jdk最新版本是多少_Linux下一键安装java-jdk任意版本
  7. Mono SVN最新代码或者Mono 1.2.5 支持IronPython 2.0
  8. websocket握手失败_WebSocket握手期间出错:意外的响应代码:500
  9. iptables的详细介绍及配置方法
  10. 【CF】304 E. Soldier and Traveling
  11. 《Kotlin项目实战开发》 第3章 类型系统与可空类型
  12. python小白教程-面向小白的Python教程:入门篇(六)
  13. WPF 弹框 并自动关闭
  14. 华为S5700交换机堆叠
  15. Fxfactory插件:光雾滤镜插件PHYX Stylist
  16. NLP︱词向量经验总结(功能作用、高维可视化、R语言实现、大规模语料、延伸拓展)
  17. hosts文件位置及作用
  18. 注册表“.REG”文件完全攻略
  19. 【C++笔试强训】第三天
  20. esxi安装威联通_威联通TS-453Bmini NAS加装内存,轻松玩转虚拟机安装win10系统

热门文章

  1. 《被讨厌的勇气》---希望大家都能勇敢前行
  2. 2021 年你需要知道的 CSS 工程化技术
  3. WIZnet芯片和支持IP分片的设备通讯的时候怎么办?
  4. 互融云软件系统提供商|一款成熟的数字资产交易系统
  5. c语言等号运算符先计算右边,C语言运算符和表达式.ppt
  6. 携手上海融资租赁基于区块链共建融资租赁行业可信金融服务平台
  7. 创业团队成员的挑战与成长(转)
  8. PCI总线的工作原理?
  9. SAP License:RISE with SAP
  10. Linux下Ourlink WU708E网卡驱动安装