tcp client
Mina 自定义硬件通讯协议框架搭建(TCP Client)
2018.03.04 18:49:29字数 1057阅读 2323
Apache MINA 是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。
使用背景
大三读完,出去实习。接触到的第一个框架。我本是一名JAVA黑 微笑, 奈何实习公司仅JAVA开发,好吧,路转粉。
由于是边学边做, 难免在学习过程中遇到了很多坑,不过最终还是解决了,这里就不一一叙述了。
本文将根据自己搭建的框架的过程写此文(参考了很多前辈的代码,总结出来)。
项目背景
项目是根据一套设备(其实是一种超大型LED屏幕)厂商提供的通讯协议去控制设备,该协议是硬件厂家自己定制。
设备数目: 450套
自定义词说明
名称 | 说明 |
---|---|
VMS | 指设备 |
通讯协议
名称 | 说明 |
---|---|
类型 | TCP/IP |
客户端 | MINA |
服务端 | VMS |
数据包格式
包头、类型、数据长度、数据、校验码、包尾
名称 | 偏移位置 | 长度 | 值范围 |
---|---|---|---|
包头 | 0 | 1 | 固定字符‘*’ |
类型 | 1 | 1 | 0-127 |
数据长度 | 2 | 2 | 数据字节数 |
数据 | 4 | n | 协议类型决定 |
校验码 | n+4 | 2 | 类型、数据长度、数据三部分的所有字节的CRC-16码 |
包尾 | n+6 | 1 | 字符‘#’ |
转义问题: 其中字符'\'为转意符,所有除头尾外的字节,如果是'*','#','\'在通讯时换成"\*","\#","\\" 。 大小端问题:凡涉及多字节数据均为低字节在前
创建项目
开发环境: IntelliJ IDEA (学生可以免费申请收费版(hhh)),创建的是一个Maven的工程。
<!-- 日志工具-->
<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version>
</dependency><!-- MINA -->
<dependency><groupId>org.apache.mina</groupId><artifactId>mina-core</artifactId><version>2.0.7</version>
</dependency>
目录结构
|-- pom.xml
`-- src|-- com| `-- taoroot| `-- mina| |-- protocol // 协议 目录| | |-- IMessageBody.java| | |-- MyBuffer.java // IoBuffer 封装| | |-- VMSMessage.java | | |-- VMSMessageFactory.java // 解析工厂| | |-- VMSMessageHeader.java // 表头部分内容| | |-- VMS_00.java // 心跳数据包| | |-- VMS_14.java // 巡检数据包| |-- client // Mina 目录| | |-- ClientSessionHandler.java| | |-- ConnectListener.java| | |-- MinaClient.java // 程序路口| | |-- ExceptionHandler.java // 心跳包超时处理类| | |-- VMSKeepAliveFilter.java // 心跳包拦截器| | |-- VMSKeepAliveMessageFactory.java // 心跳包工厂| | |-- VMSMessageCodecFactory.java | | |-- VMSMessageDecoder.java| | `-- VMSMessageEncoder.java| `-- util| |-- ByteUtil.java| |-- ClassUtils.java| |-- ConfigUtil.java| |-- CrcCodeUtil.java| |-- MinaUtil.java| |-- RGBUtil.java|-- config.properties // 默认配置`-- log4j.properties // 日志配置
启动服务 MinaClient
vmsMap:
用来接收来自设备列表
key: 设备编号, value: 设备地址
编号是唯一的,后端下发指令时,用此编号区分设备
sessionMap
设备对应的session
key: 设备编号, value: IoSession
只有连接成功的设备才会存入,所以在下一次获取列表后,将重新尝试连接。
初始化Mina以后, 系统将定时从后端获取设备列表存入vmsMap中,与sessionMap中的编号比较。
有几次情况需要考虑:
情况1. vmsMap中有, sessionMap中没有: 尝试创建IoSession,创建成功则加入sessionMap。
情况2. vmsMap中有, sessionMap中有,但设备地址发生改变: 关闭现有session, 然后按情况1处理。
情况3. vmsMap中没有,sessionMap中有, 关闭现有session,从sessionMap移除。
流程图
getList.png
代码实现
// Mina 初始化配置connector = new NioSocketConnector();connector.setConnectTimeoutMillis(MinaUtil.CONNECT_TIMEOUT);// ioBuffer 日志(实际生产环境不需要)connector.getFilterChain().addLast( "logger", new LoggingFilter() );// 解码过滤层 (数据包转对象)connector.getFilterChain().addLast("vms_coder", new ProtocolCodecFilter(new VMSMessageCodecFactory()));// 超时过滤层 (对TCP在线,心跳包超时的设备主动断开连接)KeepAliveFilter heartBeat = new VMSKeepAliveFilter(new VMSKeepAliveMessageFactory());// 设置心跳频率heartBeat.setRequestInterval((int) MinaUtil.HEART_BEAT_RATE);connector.getFilterChain().addLast("heartbeat", heartBeat);// 业务处理类connector.setHandler(new ClientSessionHandler());IoSession session;// MQ 初始化// Mina本身不知道设备的网络地址,是通过订阅形式,从后端获取过来TopicSender.createTopic(VMS_SCREEN_LIST_TOPIC);MQListener.init();for (; ; ) {// 获取列表 (0是设备列表信息)MinaUtil.getDeviceScreen("", 0);// 是否被锁if (!StaticUtil.isIsRefreshDeviceMap()) {// 关锁StaticUtil.setIsRefreshDeviceMap(true);// 遍历设备列表for (String devNo : deviceMap.keySet()) {// 新设备上线, 创建新连接if (!ioSessionMap.containsKey(devNo)) {newSocket(devNo, deviceMap.get(devNo));} else {// 查看设备对应的地址是否变化session = ioSessionMap.get(devNo);String currentAddress = ""; // 当前地址String newAddressPort = deviceMap.get(devNo); // 新地址// 从session中获取出当前连接的地址currentAddress = getAddressPort(session);// 地址发生改变if (!currentAddress.equals(newAddressPort)) {session.close(true); // 关闭目前连接的sessionnewSocket(devNo, newAddressPort); // 用新地址创建连接}}}// 设备列表中已经删除了编号, session如果存在,也需要断开连接Iterator<Map.Entry<String, IoSession>> it = ioSessionMap.entrySet().iterator();while (it.hasNext()) {Map.Entry<String, IoSession> entry = it.next();String devNo = entry.getKey();session = entry.getValue();if (!deviceMap.containsKey(devNo)) {it.remove();// 从session中获取出当前连接的地址String currentAddress = getAddressPort(session);session.close(true);sendOnlineStatus(devNo, currentAddress, VMS_OFFLINE_STATUS);logger.info("设备: " + devNo + " 离线");}}// 开锁StaticUtil.setIsRefreshDeviceMap(false);}// 休眠一段时间在去获取设备列表try {Thread.sleep(REFRESH_DEVICE_LIST_TIME * 1000);}catch (Exception e) {e.toString();}}}
编解码工厂 VMSMessageCodecFactory
public class VMSMessageCodecFactory implements ProtocolCodecFactory {private final VMSMessageDecoder decoder; // 解码器private final VMSMessageEncoder encoder; // 编码器
}
解码器
粘包
两个数据包的部分数据相连接
解码流程图
// todo
解码器源代码
public class VMSMessageDecoder extends CumulativeProtocolDecoder {private static Logger logger = Logger.getLogger(VMSMessageDecoder.class);protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {if (in.remaining() < 1) {return false;}in.mark();byte[] data = new byte[in.remaining()];in.get(data);int pos = 0;in.reset();while (in.remaining() > 0) {in.mark();byte tag = in.get();//搜索包的开始位置if (tag == 0x2A && in.remaining() > 0) {tag = in.get();//寻找包的结束while (tag != 0x23) {if (in.remaining() <= 0) {in.reset(); //没有找到结束包,等待下一次包return false;}tag = in.get();}pos = in.position();int packetLength = pos - in.markValue();if (packetLength > 1) {byte[] tmp = new byte[packetLength];in.reset();in.get(tmp);//解析VMSMessage message = new VMSMessage();message.ReadFromBytes(tmp);out.write(message); //触发接收Message的事件}}}return false;}
}
编码器
编码器相对就简单很多。
编码器源代码
public class VMSMessageEncoder extends ProtocolEncoderAdapter {@Overridepublic void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) throws Exception {IoBuffer buf = IoBuffer.allocate(500).setAutoExpand(true);VMSMessage vmsMessage = (VMSMessage) message;buf.put(vmsMessage.WriteToBytes());buf.flip();out.write(buf);out.flush();buf.free();}
}
心跳包机制
vms 和 mina 之间需要不能有超过2分钟的空闲状态。 需要定期发送心跳包,否则vms将进入离线状态,停止播放。
发送过程如下: mina 进入空闲状态, 发送心跳包, vms回发心跳包, mina 接收到心跳包。如果未在规定的时间内,接收到心跳包,超过三次,将主动关闭session。
mina 其实自带了一套心跳包拦截器,(上文mina配置代码)。可以将心跳包在IoHandler前处理掉,就不用再业务层去关心了。
MyKeepAliveFilter
public class MyKeepAliveFilter extends KeepAliveFilter {private static final int TIMEOUT = CmdOptionHandler.getTimeout();public MyKeepAliveFilter(KeepAliveMessageFactory messageFactory) {// super(心跳包工厂, 两遍都是空闲状态, 超时处理类, 上发超时时间, 下发超时时间)super(messageFactory, IdleStatus.BOTH_IDLE, new MyKeepAliveRequestTimeoutHandler(), TIMEOUT, TIMEOUT);//此消息不会继续传递,不会被业务层看见this.setForwardEvent(false);}
}
VMSKeepAliveMessageFactory
主要有两个功能:
第一,判断是否是心跳包,
第二,生成一个心跳包数据。
KeepAliveMessageFactory 提供了四个接口,当初看了网上教程说什么半工,双工什么的,一头雾水,最后硬着头皮,源代码了。当时猜测是 sessionIdle 发送了心跳包, messageReceived 接收心跳包。
KeepAliveFilter 解读
// 空闲状态触发
public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {if (status == interestedIdleStatus) {if (!session.containsAttribute(WAITING_FOR_RESPONSE)) {// 看来 getRequest 是用来获取一个发送心跳包数据接口Object pingMessage = messageFactory.getRequest(session);if (pingMessage != null) {nextFilter.filterWrite(session, new DefaultWriteRequest(pingMessage));if (getRequestTimeoutHandler() != KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {markStatus(session);if (interestedIdleStatus == IdleStatus.BOTH_IDLE) {session.setAttribute(IGNORE_READER_IDLE_ONCE);}} else {resetStatus(session);}}} else {handlePingTimeout(session);}}
}
// 接收到数据触发
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {try {// 判断数据包是否是心跳包,if (messageFactory.isRequest(session, message)) {// 这里又获取了一个心跳包,所以说这里是用来判断,vms主动发送心跳包过来,然后mina回应一个心跳包// 在我们设备中,设备不主动发送心跳包,也不对设备主动发送心跳包做一个回应,所以,不需要这个逻辑// 所以让 isRequest 放回 false 就行, Object pongMessage = messageFactory.getResponse(session, message);// 保险起见,让 getResponse 也返回nullif (pongMessage != null) {nextFilter.filterWrite(session, new DefaultWriteRequest(pongMessage));}}// 这里也是判断心跳包,if (messageFactory.isResponse(session, message)) {// 里面是清空了 mina在sessionIdl下发送的心跳包标志位// 所以这心跳包 是用来是vms回发的心跳响应包。resetStatus(session);}} finally {if (!isKeepAliveMessage(session, message)) {nextFilter.messageReceived(session, message);}}
}
好了,到这里就知道怎么去写 VMSKeepAliveMessageFactory 了。
当初看教程的死活没看懂,还弄出了死循环,vms、mina一直在对送心跳包。有时候源代码才是最好的教程呀(hhh)。
心跳包计数器(业务逻辑扩展)
心跳包是在空闲状态下会以一定的频率发送,业务层需要有一个定时巡检的数据包,vms会返回自身状态信息,所以我就把这个单做了了一个计数器使用。mina每次发送一个心跳包,就自增一,达到触发值就发送状态包。
巡检间隔 = 心跳包发送间隔 * 触发值
VMSKeepAliveMessageFactory 源代码
public class VMSKeepAliveMessageFactory implements KeepAliveMessageFactory {private final static org.slf4j.Logger logger = LoggerFactory.getLogger(ClientSessionHandler.class);private VMSMessage vmsMessage;// 发送心跳包@Overridepublic Object getRequest(IoSession arg0) {heartCountAdd(arg0);vmsMessage = new VMSMessage();vmsMessage.setMessageContents(new VMS_00());return vmsMessage;}// 拦截心跳包@Overridepublic boolean isResponse(IoSession session, Object message) {return isHeartPage(message);}@Overridepublic Object getResponse(IoSession arg0, Object arg1) {return null;}@Overridepublic boolean isRequest(IoSession session, Object message) {return false;}// 心跳包 计数器private void heartCountAdd(IoSession session) {int heartCounter = (int) session.getAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE) + 1;session.setAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE, heartCounter);if(heartCounter > MinaUtil.VMS_HEARTBEAT_MAX) {session.setAttribute(MinaUtil.VMS_HEARTBEAT_COUNT_ATTRIBUTE, 0);heartCounterHandler(session);}}// 定时查询任务private void heartCounterHandler(IoSession session) {// 定期查询 设备状态VMSMessage message = new VMSMessage();message.setMessageContents(new VMS_0F());session.write(message);}// 判断是不是心跳包private boolean isHeartPage(Object message) {vmsMessage = (VMSMessage) message;if (vmsMessage.getMessageContents() instanceof VMS_00) {return true;}return false;}
}
tcp client相关推荐
- codeblock socket 编译错误_从Linux源码看Socket(TCP)Client端的Connect
从Linux源码看Socket(TCP)Client端的Connect 前言 笔者一直觉得如果能知道从应用到框架再到操作系统的每一处代码,是一件Exciting的事情. 今天笔者就来从Linux源码的 ...
- nodejs TCP server和TCP client如何进行数据交互
使用Jerry之前的文件成功建立服务器端和客户端的TCP连接后,客户端得到一个client实例: async function startClient(ip = configJson.TCP.clie ...
- nodejs TCP server和TCP client如何建立连接
首先客户端和服务器端通信得需要一个消息结构,我用如下的构造函数创建一个消息结构,包含消息正文和消息类型(method): function message(_data,_method = '') {_ ...
- Modbus协议栈开发笔记之四:Modbus TCP Client开发
这一次我们封装Modbus TCP Client应用.同样的我们也不是做具体的应用,而是实现TCP客户端的基本功能.我们将TCP客户端的功能封装为函数,以便在开发具体应用时调用. 对于TCP客户端我们 ...
- 网络通信之TCP Client通信(基于Arduino)
网络通信之TCP Client通信(基于Arduino) 前期准备 实验要求 具体方法 程序流程 程序 实验现象 后续 前期准备 Arduino IDE ESPDuino Arduino ESP826 ...
- swoole深入学习 2. tcp Server和tcp Client
swoole深入学习 2. tcp Server和tcp Client 标签(空格分隔): swoole 这节来学习Swoole最基础的Server和Client.会通过创建一个tcp Server来 ...
- STM32F1 W5500 TCP Client 回环测试
刚刚接触W5500的时候,做TCP Client回环测试的时候,出现很奇怪的问题,查了好多遍代码,死活连接不上PC网络助手的TCPServer.其实代码本身没什么大问题,PC机的防火墙忘记关闭了.总结 ...
- 关于HML要玩物联网这件事 之 CC3200 TCP Client
假装是个引言 去年TI难得发了次大福利,圣诞节派送CC3200开发板.本辣鸡博主抢得早,加上没落C9.edu.cn 邮箱加持,抽奖居然抽中了.TI不论是赞助竞赛还是送板子,目的都很明确,就是想推广自己 ...
- ESP8266开发之旅 网络篇⑦ TCP Server TCP Client
授人以鱼不如授人以渔,目的不是为了教会你具体项目开发,而是学会学习的能力.希望大家分享给你周边需要的朋友或者同学,说不定大神成长之路有博哥的奠基石... 共同学习成长QQ群 622368884,不喜勿 ...
最新文章
- 几个阿里, 美团,腾讯大佬的公众号!超级变态!
- bootstrape实战案例_第二百五十二节,Bootstrap项目实战-首页
- python中的位置怎么看_如何知道项目在Python有序字典中的位置
- 基于MATLAB的turbo码代码,一种基于Simulink的Turbo码仿真实现
- 抽象工厂模式(Absraact Factory)介绍与实现
- java jdk最新版本是多少_Linux下一键安装java-jdk任意版本
- Mono SVN最新代码或者Mono 1.2.5 支持IronPython 2.0
- websocket握手失败_WebSocket握手期间出错:意外的响应代码:500
- iptables的详细介绍及配置方法
- 【CF】304 E. Soldier and Traveling
- 《Kotlin项目实战开发》 第3章 类型系统与可空类型
- python小白教程-面向小白的Python教程:入门篇(六)
- WPF 弹框 并自动关闭
- 华为S5700交换机堆叠
- Fxfactory插件:光雾滤镜插件PHYX Stylist
- NLP︱词向量经验总结(功能作用、高维可视化、R语言实现、大规模语料、延伸拓展)
- hosts文件位置及作用
- 注册表“.REG”文件完全攻略
- 【C++笔试强训】第三天
- esxi安装威联通_威联通TS-453Bmini NAS加装内存,轻松玩转虚拟机安装win10系统