使用netty不是一天两天了,但是使用netty和DTU通讯还是第一次,接下来要做DTU的通讯协议,对接工作还没有正式开始,只收到一个简单的DTU协议文档,里面的内容大概是下面表格中的样子。

位数

内容

取值

0

字头

0XCC

1

数据长度

低八位

2

数据长度

高八位

3

类型

0:心跳,1:登录

4

机器编码

0X00-0XFF

5

校验位

前面字节相加的低八位

这里把通讯协议简化了一下,仅剩下心跳和登录,如果有其他参数可以在此基础上进行扩展;从表格中可以看出来,每个字段数据位数还不一样,有的一位,有的两位,数据长度占两位,其他各占用一位。

只有这么一个文档,只有这么一丁点信息,其他就什么也不知道了,这可如何是好?不知道从哪里下手,这个疑问多多的项目就这样放了一段时间。

但是也不能总是这样放着呀,如果对接的人来了,我这边什么也没有,一下子也建不起一个项目呀?转念又想,曾经使用 **netty + google protobuf ** 开发过IM项目,也有些相似之处。这个DTU可否使用google protobuf呢?

于是,写了一个简单的客户端,一个服务端,来进行收发信息,google protobuf是通过对象编码成二进制进行数据通讯的,但文档中是字节数组,压根没有对象一说呀?写完了demo,测试一遍,但是和字节数组对应不起来,最后还是删掉了。

在网上找了很多资料,找来找去只找到这么两篇Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

Java 使用Socket 实现基于DTU的TCP服务器 + 数据解析 + 心跳检测可以参考,试着把上面的demo扒拉了好几遍,通过这两篇文章,可以获得一些信息:

第一,可以使用netty和dtu进行通信,选择使用netty框架没有错;

第二,和dtu对接,接收到的是字节数组,不能使用google的protobuf框架,需要另做处理。

把参考文档中带netty的demo也试着在本地拷贝了一份,大概知道了对接收到的字节数组如何处理,但是demo中只有接收,没有发送,这是不够完美的;这也是个问题,单方面的,不好运行呀。

另外在处理字节数组的时候,在流程上还不是太标准,后面有可能会遇到半包、粘包的问题,这些都是需要面对的问题。结合自己曾经开发过IM的经验,把编解码处理和数据处理也分离出来,半包、粘包的问题一并考过进去,这样后面再完善就方便多了。

下面就开始SpringBoot2.1.4 + netty + DTU的客户端编码,难点就在于字节数组的解码和编码,一进一出,搞定了这一步,其他的业务逻辑就好处理了。这里的客户端编码是为了测试DTU的请求,以便和服务端互动起来,可以进行测试。

第一步,pom文件引入netty架包,就这两个架包足够用的了;

org.springframework.boot

spring-boot-starter

io.netty

netty-all

第二步,对字节数组的编解码,包括半包、粘包处理;

字节数据解码类ByteArrayDecoder:

import java.util.List;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;

import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

/**

* byte 1字节 (8位) -27~27-1 0 Byte 255

* short 2字节 (16位) -215~215-1 0 Short

* int 4字节 (32位) -231~ 231-1 0 Integer

* long 8字节 (64位) -263~263-1 0 Long

* char 2字节 (C语言中是1字节)可以存储一个汉字

* float 4字节 (32位) -3.4e+38 ~ 3.4e+38 0.0f Float

* double 8字节 (64位) -1.7e+308 ~ 1.7e+308 0 Double

* char 2字节(16位) u0000~uFFFF(‘’~‘?’) ‘0’ Character (0~216-1(65535))

* 布尔 boolean 1/8字节(1位) true, false FALSE Boolean

* C语言中,short、int、float、long、double,分别为:1个、2个、4个、8个、16个

* 对字节数组进行解码

* @author 程就人生

* @date 2020年8月3日

* @Description

*

*/

public class ByteArrayDecoder extends ByteToMessageDecoder{

private static Logger log = LoggerFactory.getLogger(ByteArrayDecoder.class);

@Override

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {

// 标记一下当前的readIndex的位置

in.markReaderIndex();

//判断获取到的数据是否够字头,不沟通字头继续往下读

//字头:1位,数据串总长度:2位

if(in.readableBytes() < ProtoInstant.FILED_LEN){

log.info("不够包头,继续读!");

return;

}

//读取字头1位

int fieldHead = CharacterConvert.byteToInt(in.readByte());

if(fieldHead != ProtoInstant.FIELD_HEAD){

String error = "字头不对:" + ctx.channel().remoteAddress();

log.info(error);

ctx.close();

return;

}

//长度2位,读取传送过来的消息的长度。

int length = CharacterConvert.shortToInt(in.readShort());

// 长度如果小于0

if (length < 0) {// 非法数据,关闭连接

log.info("数据长度为0,非法数据,关闭连接!");

ctx.close();

return;

}

// 读到的消息体长度如果小于传送过来的消息长度,减去字头1位,数据长度2位

int dataLength = length - ProtoInstant.FILED_LEN;

if (dataLength > in.readableBytes()) {

// 重置读取位置

in.resetReaderIndex();

return;

}

byte[] array;

if (in.hasArray()) {

log.info("堆缓冲");

// 堆缓冲

ByteBuf slice = in.slice();

array = slice.array();

} else {

log.info("直接缓冲");

// 直接缓冲

array = new byte[dataLength];

in.readBytes(array, 0, dataLength);

}

if(array.length > 0){

in.retain();

out.add(array);

}

}

}

字节数组编码类ByteArrayEncoder:

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

/**

* 对字节数组进行编码

* @author 程就人生

* @date 2020年8月3日

* @Description

*

*/

public class ByteArrayEncoder extends MessageToByteEncoder{

private static Logger log = LoggerFactory.getLogger(ByteArrayEncoder.class);

@Override

protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {

log.info(".....经过ByteArrayEncoder编码.....");

//字头(1位)

out.writeByte(ProtoInstant.FIELD_HEAD);

//数据长度(2位),字头1位+数据长度2位+数据位(包含校验1位)

out.writeShort(ProtoInstant.FILED_LEN + msg.length);

//消息体,包含我们要发送的数据

out.writeBytes(msg);

}

}

在编解码的时候,考虑到DTU那边对接的有可能是C语音,C语言和Java的数据类型不一样,所占用的位数也不一样,这个需要保持一致。

第三步,客户端启动类;

客户端启动类NettyClient,这里在头部加了@Component,只要项目一启动就去建立与服务端的链接,建立链接后登录,登录后保持心跳;

import java.util.Date;

import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import com.example.im.codec.ByteArrayDecoder;

import com.example.im.codec.ByteArrayEncoder;

import com.example.im.handler.ExceptionHandler;

import com.example.im.handler.LoginResponseHandler;

import com.example.instant.ProtoInstant;

import com.example.util.CharacterConvert;

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.PooledByteBufAllocator;

import io.netty.buffer.Unpooled;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoop;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.util.concurrent.Future;

import io.netty.util.concurrent.GenericFutureListener;

/**

* netty客户端连接类

* @author 程就人生

* @date 2020年8月6日

* @Description

*

*/

@Component

public class NettyClient {

private static Logger log = LoggerFactory.getLogger(NettyClient.class);

// 服务器ip地址

@Value("${netty.communication.host}")

private String host;

// 服务器端口

@Value("${netty.communication.port}")

private int port;

private Channel channel;

@Autowired

private LoginResponseHandler loginResponseHandler;

@Autowired

private ExceptionHandler exceptionHandler;

private Bootstrap bootstrap;

private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

@PostConstruct

public void start() throws Exception {

//启动客户端

doConnect();

}

/**

* 连接操作

*/

private void doConnect() {

try {

bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup);

bootstrap.channel(NioSocketChannel.class);

bootstrap.option(ChannelOption.SO_KEEPALIVE, true);

bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

bootstrap.remoteAddress(host, port);

// 设置通道初始化

bootstrap.handler(new ChannelInitializer() {

public void initChannel(SocketChannel ch) {

//编解码处理

ch.pipeline().addLast("decoder", new ByteArrayDecoder());

ch.pipeline().addLast("encoder", new ByteArrayEncoder());

//登录返回处理

ch.pipeline().addLast("loginHandler", loginResponseHandler);

//异常处理

ch.pipeline().addLast("exception", exceptionHandler);

}

}

);

log.info("客户端开始连接");

ChannelFuture f = bootstrap.connect();

f.addListener(connectedListener);

} catch (Exception e) {

e.printStackTrace();

log.info("客户端连接失败!" + e.getMessage());

}

}

//连接关闭监听

GenericFutureListener closeListener = (ChannelFuture f) -> {

log.info(new Date() + ": 连接已经断开……");

channel = f.channel();

};

//连接监听

GenericFutureListener connectedListener = (ChannelFuture f) -> {

final EventLoop eventLoop = f.channel().eventLoop();

if (!f.isSuccess()) {

log.info("连接失败!在10s之后准备尝试重连!");

eventLoop.schedule(() -> doConnect(), 10, TimeUnit.SECONDS);

} else {

log.info("服务器 连接成功!" + f.channel().remoteAddress() + ":" + f.channel().localAddress());

channel = f.channel();

login();

}

};

/**

* 登录操作

*/

private void login(){

//构建登录请求

ByteBuf buf = Unpooled.buffer(3);

//登录

buf.writeByte(ProtoInstant.LOGIN);

buf.writeByte(ProtoInstant.DEVICE_ID);

//校验位

int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD,6,ProtoInstant.LOGIN,ProtoInstant.DEVICE_ID);

int verify = CharacterConvert.getLow8(sum);

buf.writeByte(verify);

writeAndFlush(buf.array());

}

/**

* 发送消息

* @param msg

*/

public void writeAndFlush(Object msg){

this.channel.writeAndFlush(msg).addListener(new GenericFutureListener>() {

@Override

public void operationComplete(Future super Void> future)

throws Exception {

// 回调

if (future.isSuccess()) {

log.info("请求netty服务器,消息发送成功!");

} else {

log.info("请求netty服务器,消息发送失败!");

}

}

});

}

/**

* 重新建立连接

* @throws Exception

*/

public void reconnect() throws Exception {

if (channel != null && channel.isActive()) {

return;

}

log.info("reconnect....");

start();

log.info("reconnect success");

}

/**

* 关闭连接

*/

public void close() {

eventLoopGroup.shutdownGracefully();

}

}

别忘了application.properties文件中的配置:

netty.communication.host=127.0.0.1

netty.communication.port=8500

第四步,handler处理类;

异常处理类ExceptionHandler:

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.example.im.NettyClient;

import io.netty.channel.ChannelHandler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

/**

* 客户端异常处理handler

* @author 程就人生

* @date 2020年8月3日

* @Description

*

*/

@ChannelHandler.Sharable

@Service("exceptionHandler")

public class ExceptionHandler extends ChannelInboundHandlerAdapter {

private static Logger log = LoggerFactory.getLogger(ExceptionHandler.class);

@Autowired

private NettyClient nettyClient;

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

if (cause instanceof Exception) {

//捕捉异常信息

cause.printStackTrace();

log.error(cause.getMessage());

ctx.close();

} else {

//捕捉异常信息

cause.printStackTrace();

log.error(cause.getMessage());

ctx.close();

}

//出现异常时,定时重连;比如上位机服务器重启服务器

nettyClient.reconnect();

}

/**

* 通道 Read 读取 Complete 完成

* 做刷新操作 ctx.flush()

*/

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

}

登录处理LoginResponseHandler:

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Service;

import com.example.instant.ProtoInstant;

import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.channel.ChannelPipeline;

/**

* netty客户端登录处理

* @author 程就人生

* @date 2020年8月3日

* @Description

*

*/

@Service("loginResponseHandler")

@ChannelHandler.Sharable

public class LoginResponseHandler extends ChannelInboundHandlerAdapter {

private static Logger log = LoggerFactory.getLogger(LoginResponseHandler.class);

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

if (null == msg || !(msg instanceof byte[])) {

super.channelRead(ctx, msg);

return;

}

//对接收到的数据进行处理

byte[] data = (byte[]) msg;

int dataLength = data.length;

ByteBuf buf = Unpooled.buffer(dataLength);

buf.writeBytes(data);

int type = CharacterConvert.byteToInt(buf.readByte());

//机器编码

int deviceId = CharacterConvert.byteToInt(buf.readByte());

//校验位

int verify = CharacterConvert.byteToInt(buf.readByte());

//如果是登录操作时

if(type == ProtoInstant.LOGIN){

//计算字头 + 数据长度 + 类型 + 参数的总和

int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);

if(verify != CharacterConvert.getLow8(sum)){

log.error("登录返回,校验位错误!");

}else{

ChannelPipeline channelPipeline = ctx.pipeline();

channelPipeline.addAfter("encoder", "heartbeat", new HeartBeatHandler());

// 移除登录响应处理器

channelPipeline.remove(this);

log.info("服务器机登录返回了!");

}

return;

}else{

super.channelRead(ctx, msg);

return;

}

}

}

心跳处理HeartBeatHandler:

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Service;

import com.example.instant.ProtoInstant;

import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

/**

* netty客户端心跳处理

* @author 程就人生

* @date 2020年8月1日

* @Description

*

*/

@ChannelHandler.Sharable

@Service("heartHandler")

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

private static Logger log = LoggerFactory.getLogger(HeartBeatHandler.class);

// 心跳的时间间隔,单位为s

private static final int HEARTBEAT_INTERVAL = 100;

// 在Handler被加入到Pipeline时,开始发送心跳

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

ByteBuf buf = Unpooled.buffer(3);

//心跳

buf.writeByte(ProtoInstant.HEART_BEAT);

//机器编码

buf.writeByte(ProtoInstant.DEVICE_ID);

//校验位

int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, 6, ProtoInstant.HEART_BEAT, ProtoInstant.DEVICE_ID);

int verify = CharacterConvert.getLow8(sum);

buf.writeByte(verify);

// 发送心跳

heartBeat(ctx, buf.array());

}

// 使用定时器,发送心跳报文

public void heartBeat(ChannelHandlerContext ctx, byte[] heartbeatMsg) {

ctx.executor().schedule(() -> {

if (ctx.channel().isActive()) {

log.info(" 发送心跳 消息 to netty服务器系统");

ctx.writeAndFlush(heartbeatMsg);

// 递归调用,发送下一次的心跳

heartBeat(ctx, heartbeatMsg);

}

}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);

}

/**

* 接受到服务器的心跳回写

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// 判断消息实例

if (null == msg || !(msg instanceof byte[])) {

super.channelRead(ctx, msg);

return;

}

//对接收到的数据进行处理

byte[] data = (byte[]) msg;

int dataLength = data.length;

ByteBuf buf = Unpooled.buffer(dataLength);

buf.writeBytes(data);

int type = CharacterConvert.byteToInt(buf.readByte());

int deviceId = CharacterConvert.byteToInt(buf.readByte());

//如果是心跳信息时

if(type == ProtoInstant.HEART_BEAT){

int verify = CharacterConvert.byteToInt(buf.readByte());

//计算字头 + 数据长度 + 类型 + 参数的总和

int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);

if(verify != CharacterConvert.getLow8(sum)){

log.error("心跳包,校验位错误!");

}else{

log.info("收到回写的心跳 消息 from netty服务器系统");

}

return;

}else{

super.channelRead(ctx, msg);

}

}

}

客户端的核心编码大抵就是这些,看完这些编码是不是有些期待服务端的编码呢,服务端的编码敬请期待下一篇文章。

dtu tcp java_SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端相关推荐

  1. c#基于socket的UDP服务器和客户端实例

    基于Udp协议是无连接模式通讯,占用资源少,响应速度快,延时低.至于可靠性,可通过应用层的控制来满足.(不可靠连接) 使用Udp协议通讯需要具备以下几个条件: (1).建立一个套接字(Socket) ...

  2. Netty入门(二)时间服务器及客户端

    在这个例子中,我在服务器和客户端连接被创立时发送一个消息,然后在客户端解析收到的消息并输出.并且,在这个项目中我使用 POJO 代替 ByteBuf 来作为传输对象. 一.服务器实现 1.  首先我们 ...

  3. 基于c语言客户端的步骤,基于c语言tftp服务器与客户端实现

    开发环境:ubuntu 所用知识点:c,socket, tcp/ip协议 A)本实验主要实现tftp协议的服务器与客户端. 服务器实现功能有: 1)接收处理客户端请求,上传下下载文件 2)进行用户验证 ...

  4. Springboot整合Netty,实现Socket通信

    文章目录 *Springboot整合Netty,实现Socket通信* 1.模拟单客户端 2.模拟单服务端 总结 Springboot整合Netty,实现Socket通信 1.模拟单客户端 引入Net ...

  5. springboot整合netty实现tcp通信

    1.依赖 <!-- netty依赖--> <dependency><groupId>io.netty</groupId><artifactId&g ...

  6. Springboot+Netty实现基于天翼物联网平台CTWing(AIOT)终端TCP协议(透传模式)-应用订阅端(北向应用)

    之前实现了使用Springboot+Netty基于天翼物联网平台CTWing(AIOT)终端TCP协议(透传模式)-设备终端(南向设备),模拟设备发送的数据给到了AIOT平台,那么在第三方应用也需要去 ...

  7. Springboot+Netty实现基于天翼物联网平台CTWing(AIOT)终端TCP协议(透传模式)-设备终端(南向设备)

    电信的天翼物联网平台CTWing(AIOT)原先是我们俗称的aep,主要用于接入nb-iot设备,当然也可以接入其他的设备,在熟悉AIOT平台后,做后端的我有时候急需终端样品(智能门禁,支付识别终端, ...

  8. 关于SpringBoot整合Netty客户端和服务端实现JT808协议

    关于SpringBoot整合Netty客户端和服务端实现JT808协议 最近做了一个使用netty实现交通部JT808协议的项目,对比了mina和netty两种框架的使用,先整理一下netty的实现过 ...

  9. 三分钟构建高性能 WebSocket 服务 | 超优雅的 SpringBoot 整合 Netty 方案

    前言 每当使用SpringBoot进行Weboscket开发时,最容易想到的就是spring-boot-starter-websocket(或spring-websocket).它可以让我们使用注解, ...

最新文章

  1. python爬虫之cookie方式自动登录巴比特网
  2. 多图:多样化实现App多渠道统计
  3. Docker 安装redis(四)
  4. STM32时钟源时钟系统配置
  5. backbone.js全栈开发
  6. Java RMI 框架(远程方法调用)
  7. BM模式匹配算法原理(图解)
  8. VMware虚拟桌面,后台更改用户密码后,掉域的问题
  9. C++代码审查工具Cppcheck和TscanCode
  10. 笛卡尔心形函数表达式_笛卡尔爱心函数表达式 笛卡尔形式
  11. 易生活(二)-APP—安卓中评论功能的实现
  12. Multisim基础 电容遇到交流+直流电路时,容抗与隔直通交的特性
  13. 第6章 项目整体管理
  14. DNS劫持、流量劫持,HTTP/HTTPS劫持
  15. Windows下安装VMware
  16. 手游各个平台开接入发者中心网址汇总
  17. Charles 使用总结
  18. dw中css鼠标经过的时候,Dreamweaver鼠标经过出现效果 怎么操作
  19. Directsound开发指南(2)
  20. Android自定义View 多边形能力分析控件,雷达图(蛛网)动态实现

热门文章

  1. PL/SQL_高级编程
  2. 微信 3.9 版本,Sandboxie 沙盒双开报错
  3. 自动驾驶“稳打地基”,小鹏汽车基于阿里云建自动驾驶AI智算中心算力可达600PFLOPS
  4. 141. 环形链表(java实现)--2种解法(双指针,hahs)LeetCode
  5. 小米3 android8,小米8 Miui10.3 最新稳定版(10.3.6.0 Android 9.0)稳定版 快过闪电 AI加持 高级工具箱 黑域 精简 流畅 省电 实用...
  6. idea上传代码到gitee出现的问题及解决办法
  7. 百度地图 | 定位到大西洋的几内亚湾的解决方法
  8. 常用激活函数(relu,glu,gelu,swish等)
  9. div包video在某些电脑或者浏览器上出现黑边
  10. oracle段的集合称为,oracle 集合和成员函数 (plsql表也被称为索引表)