netty客户端连接多个服务端
还是上一个项目,又变了一次方案,网安说服务器暴露端口太不安全,要求用服务器上部署TCP客户端程序,主动连接下属的各个终端,终端上面跑TCP服务端程序。
那就用Netty实现客户端,去连接多个服务端。
POM文件中引入netty:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.56.Final</version>
</dependency>
netty起始类,定时任务,从数据库中读取待连接的设备列表,设备列表中有一列是ip地址,
@Slf4j
@Component
public class NettyConfig {@AutowiredNettyClientHandler nettyClientHandler;@Value("${terminal.port}")private Integer port;@PostConstructpublic void init() {NettyClient.setHandler(nettyClientHandler);connectDevice();}/*** 初始化连接* 查询数据库表中所有host和port不为空的设备* netty客户端尝试连接并保存至channels内存中*/@Scheduled(cron = "30/60 * * * * ?")public void connectDevice() {log.info("剔除长时间不活动的终端");long curt = System.currentTimeMillis();for(Map.Entry<String, NettyConnectInfo> entry : NettyClient.channels.entrySet()) {String ip = entry.getKey();NettyConnectInfo info = entry.getValue();long actt = info.getActiveTime().getTime();//log.info("{},{}", curt, actt);if(curt-actt > 130000) {log.info("--{}-连接2分钟无活动,剔除!", ip);info.getChannel().close();NettyClient.channels.remove(ip);}}log.info("连接终端");new Thread() {@Overridepublic void run() {try {Map<String, NettyConnectInfo> mapParam = new HashMap<String, NettyConnectInfo>();// device from dbList<DeviceEntity> list = deviceRepository.getIps();for(DeviceEntity it : list) {NettyConnectInfo info = new NettyConnectInfo();info.setPort(port);info.setChannel(null);info.setConnected(false);info.setDevId(it.getIdentifierIn());info.setDeviceInfo(it);mapParam.put(it.getIp()+":"+String.valueOf(port), info);}// connectNettyClient.getChannel(mapParam);}catch (Exception e) {log.error("Netty client start fail : {}", e.getMessage());}}}.start();}
}
NettyClient类,建立socket连接:
@Slf4j
public class NettyClient {public static Map<String, NettyConnectInfo> channels = new HashMap<>();static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();static Bootstrap bootstrap = null;static NettyClientHandler handler;public static void setHandler(NettyClientHandler h) {handler = h;}/*** 初始化Bootstrap*/public static final Bootstrap getBootstrap(EventLoopGroup group) {if(bootstrap == null) {bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(20480, 20480, 65536)).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ByteArrayEncoder());socketChannel.pipeline().addLast(handler);}});}return bootstrap;}// 获取所有连接public static void getChannel(Map<String, NettyConnectInfo> map) {//Map<String , ChannelFuture> result = new HashMap<>();Bootstrap bootstrap = getBootstrap(null);for(Map.Entry<String, NettyConnectInfo> entry : map.entrySet()) {String ip = entry.getKey();// 如果IP没有连接,则尝试连接if(channels.get(ip) == null) {String[] arr = ip.split(":");NettyConnectInfo info = entry.getValue();//bootstrap.remoteAddress(ip, info.getPort());bootstrap.remoteAddress(arr[0], Integer.valueOf(arr[1]));//log.info("...尝试连接{}:{}", arr[0], Integer.valueOf(arr[1]));ChannelFuture future = bootstrap.connect().addListener((ChannelFuture futureListener) -> {//final EventLoop eventLoop = futureListener.channel().eventLoop();if (!futureListener.isSuccess()) {log.info("与"+ip+"连接失败!");}else {info.setChannel(futureListener.channel());info.setActiveTime(new Timestamp(System.currentTimeMillis()));channels.put(ip, info);}});}}}
}
NettyClientHandler类,处理消息,业务处理:
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 建立连接时*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {try {InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();int port = ipSocket.getPort();String host = ipSocket.getHostString();log.info("与设备"+host+":"+port+"连接成功!");ctx.fireChannelActive();}catch (Exception e) {log.info("channelActive: " + e.getMessage());}}/*** 关闭连接时*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {try {InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();int port = ipSocket.getPort();String host = ipSocket.getHostString();log.error("与设备" + host + ":" + port + "连接断开!");NettyClient.channels.remove(host+":"+String.valueOf(port));final EventLoop eventLoop = ctx.channel().eventLoop();}catch (Exception e) {log.info("channelInactive: " + e.getMessage());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("服务端连接异常:" + ctx.channel().id().asShortText());}/*** 收到报文,业务逻辑处理*/@Override@Transactionalpublic void channelRead(ChannelHandlerContext ctx, Object msg) {InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();int port = ipSocket.getPort();String host = ipSocket.getHostString();ByteBuf byteBuf = (ByteBuf) msg;byte[] buffers = new byte[byteBuf.readableBytes()];byteBuf.readBytes(buffers);byteBuf.release();//log.info("接收到 " + host + ":" + port + "的数据");int rplen = OpsProtocol.parseHead(buffers);// 断帧处理if(rplen == -1) {byte[] prerp = NettyClient.channels.get(host + ":" + port).getBuffer();if(prerp == null) {log.info("错误的报文");return;}else {byte[] buffers2 = new byte[prerp.length+buffers.length];System.arraycopy(prerp, 0, buffers2, 0, prerp.length);System.arraycopy(buffers, 0, buffers2, prerp.length, buffers.length);buffers = buffers2;rplen = OpsProtocol.parseHead(buffers2);}}if(rplen > buffers.length) {NettyClient.channels.get(host + ":" + port).setBuffer(buffers);}int cur = 0;while (cur < buffers.length) {// 粘桢处理byte[] bytes = new byte[buffers.length-cur];System.arraycopy(buffers, cur, bytes, 0, buffers.length-cur);StrcApduHead apduHead = new StrcApduHead();Head head = new Head();EnumFunName en = OpsProtocol.parseHead(bytes, head, apduHead);//……// 解析报文,发送报文ctx.channel().writeAndFlush(send2);//……cur += apduHead.getLength();}NettyClient.channels.get(host + ":" + port).setActiveTime(new Timestamp(System.currentTimeMillis()));}
}
bean,记录了每个连接的信息:
@Data
public class NettyConnectInfo {int port;Channel channel;boolean connected;Timestamp activeTime;DeviceEntity deviceInfo;byte[] buffer;// ……
}@Data
@Entity
@Table(name="device")
public class DeviceEntity { @Column(name = "ip" )private String ip;// ……
}
netty基于Reactor模式的处理,一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
netty客户端连接多个服务端相关推荐
- docker客户端连接远程docker服务端(export方式)
将下面的localhost替换成服务端的ip即可,同时服务端得开启2375端口 export DOCKER_HOST=tcp://localhost:2375
- 客户端连接不上服务端(Redis)
描述:在虚拟机安装了redis服务 本机使用客户端工具RDM连接 连接不上 一.版本信息 Redis 二.错误示例 三.解决 1. 第一种情况是redis设置了密码 然后连接的时候 忘记输入密码会导致 ...
- linux环境 Oracle客户端连接远程Oracle服务端
前提:su - oracle #格式: sqlplus 用户名/密码@ip地址:端口 sqlplus system/123456@192.168.0.128:1521 #格式2: sqlplus 用户 ...
- netty客户端连接后无限发送数据,连接不上时无限重试,断线重连
在之前的netty文章里,刚开始学,利用netty实现websocket写了一个聊天程序. 纯netty实现http,websocket协议,头像上传,搭建实时聊天室,群聊,私聊,文字,图片消息 本文 ...
- Netty 源码解析系列-服务端启动流程解析
netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...
- netty源码学习之服务端客户端初始化
文章目录 1. AbstractBootstrap类简介 1.1. 核心方法 2. netty服务端创建 2.1. 服务端启动入口 2.2. doBind()方法 2.3. netty服务初始化 2. ...
- Netty简单实现客户端与服务端收发消息
Netty简单实现客户端与服务端收发消息 这个小案例主要是实现netty收发消息,分为客户端,及服务端,以及包含了相关状态处理,主要的代码会放在最后 gitHub 地址上,有需要可以看一下 首先来简单 ...
- 采用netty开发智能手表tcp服务端还是非常不错的
采用netty开发智能手表tcp服务端还是非常不错的,经过单服务部署测试并发能达到10w,可以用于开发开发马蹄锁,儿童智能手表,其他智能设备,物联网等等,有啥有趣好玩的物联网可以进行交流一下
- 远程客户端连接linux,远程控制服务(SSH)之Linux环境下客户端与服务端的远程连接...
本篇blog将讲述sshd服务提供的两种安全验证的方法,并且通过这两种方法进行两台linux虚拟机之间的远程登陆. 准备工作: (1)准备两台安装有linux系统的虚拟机,虚拟机软件采用vmware: ...
最新文章
- ASP.NET MVC 中将FormCollection与实体间转换方法
- Java后台 自动 翻页查询
- shell-1.shell注释
- 2 Docker安装及使用
- linux系统 硬链接和软链接
- 发送请求_发送soap请求调用WSDL
- 《面向对象的思考过程(原书第4版)》一 导读
- 高德地图我的队伍查岗_详细测试高德地图的家人地图后 我学会了画地为牢
- mysql表无法获取_CentOS下无法正常获取MySQL数据库表数据的问题
- java 工具类库 Apache Commons
- php laravel手册,laravel5.6手册下载|Laravel5.6中文手册pdf最新版下载(附使用方法)_星星软件园...
- 网课答案公众号搭建-网课题库接口
- 线性规划——产销平衡
- IDEA - 如何安装Statistic代码统计插件?
- Java Day24
- 学习云计算有什么用处 该怎么学好云计算技术
- zabbix 自带模板监控mysql_zabbix使用自带模板监控mysql
- linux下使用man命令查看系统调用
- 计算机连接宽带还需要登陆用户,每次电脑启动都需要手动连接宽带怎么办? 爱问知识人...
- CSS语法与CSS选择器