IoT是什么

The Internet of things的简称IoT,即是物联网的意思

IoT推送系统的设计

比如说,像一些智能设备,需要通过APP或者微信中的小程序等,给设备发送一条指令,让这个设备下载或者播放音乐,那么需要做什么才可以完成上面的任务呢?

首先需要推送服务器,这个服务器主要负责消息的分发,不处理业务消息;设备会连接到推送服务器,APP通过把指令发送到推送服务器,然后推送服务器再把指令分发给相应的设备。

可是,当买设备的人越来越多,推送服务器所能承受的压力就越大,这个时候就需要对推送服务器做集群,一台不行,就搞十台,那么还有一个问题,就是推送服务器增加了,设备如何找到相应的服务器,然后和服务器建立连接呢,注册中心可以解决这个问题,每一台服务器都注册到注册中心上,设备会请求注册中心,得到推送服务器的地址,然后再和服务器建立连接。

而且还会有相应的redis集群,用来记录设备订阅的主题以及设备的信息;APP发送指令到设备,其实就是发送了一串数据,相应的会提供推送API,提供一些接口,通过接口把数据发送过去;而推送API不是直接去连接推送服务器的,中间还会有MQ集群,主要用来消息的存储,推送API推送消息到MQ,推送服务器从MQ中订阅消息,以上就是简单的IoT推送系统的设计。

下面看下结构图:

注意:设备连接到注册中心的是短连接,设备和推送服务器建立的连接是长连接

心跳检测机制

简述心跳检测

心跳检测,就是判断对方是否还存活,一般采用定时的发送一些简单的包,如果在指定的时间段内没有收到对方的回应,则判断对方已经挂掉

Netty提供了IdleStateHandler类来实现心跳,简单的使用如下:

pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));

下面是IdleStateHandler的构造函数:

public IdleStateHandler(            long readerIdleTime, long writerIdleTime, long allIdleTime,            TimeUnit unit) {        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}

四个参数说明:

  1. readerIdleTime,读超时时间
  2. writerIdleTime,写超时时间
  3. allIdleTime,所有事件超时时间
  4. TimeUnit unit,超时时间单位

心跳检测机制代码示例

简单示例: 服务端:

static final int BEGIN_PORT = 8088;    static final int N_PORT = 100;    public static void main(String[] args) {        new PingServer().start(BEGIN_PORT, N_PORT);    }    public void start(int beginPort, int nPort) {        System.out.println("启动服务....");        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.handler(new LoggingHandler(LogLevel.INFO));        bootstrap.group(bossGroup, workerGroup);        bootstrap.channel(NioServerSocketChannel.class);        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);        bootstrap.childHandler(new ChannelInitializer() {            @Override            protected void initChannel(SocketChannel ch) throws Exception {                ChannelPipeline pipeline = ch.pipeline();                pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));                pipeline.addLast(new PingHandler());                //每个连接都有个ConnectionCountHandler对连接记数进行增加                pipeline.addLast(new ConnectionCountHandler());            }        });        bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> {            System.out.println("端口绑定成功: " + beginPort);        });        System.out.println("服务已启动!");}
public class PingHandler extends SimpleUserEventChannelHandler {    private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes()));    private int count;    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf buf = (ByteBuf) msg;        byte[] data = new byte[buf.readableBytes()];        buf.readBytes(data);        String str = new String(data);        if ("pong".equals(str)) {            System.out.println(ctx + " ---- " + str);            count--;        }        ctx.fireChannelRead(msg);    }    @Override    protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {        if (evt.state() == ALL_IDLE) {            if (count >= 3) {                System.out.println("检测到客户端连接无响应,断开连接:" + ctx.channel());                ctx.close();                return;            }            count++;            System.out.println(ctx.channel() + " ---- ping");            ctx.writeAndFlush(PING_BUF.duplicate());        }        ctx.fireUserEventTriggered(evt);    }}

客户端:

//服务端的IP private static final String SERVER_HOST = "localhost"; static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) {     new PoneClient().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) {     System.out.println("客户端启动....");     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();     final Bootstrap bootstrap = new Bootstrap();     bootstrap.group(eventLoopGroup);     bootstrap.channel(NioSocketChannel.class);     bootstrap.option(ChannelOption.SO_REUSEADDR, true);     bootstrap.handler(new ChannelInitializer() {         @Override         protected void initChannel(SocketChannel ch) {             ch.pipeline().addLast(new PongHandler());         }     });     int index = 0;     int port;     String serverHost = System.getProperty("server.host", SERVER_HOST);     ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort);     channelFuture.addListener((ChannelFutureListener) future -> {         if (!future.isSuccess()) {             System.out.println("连接失败,退出!");             System.exit(0);         }     });     try {         channelFuture.get();     } catch (ExecutionException e) {         e.printStackTrace();     } catch (InterruptedException e) {         e.printStackTrace();     } }
public class PongHandler extends SimpleChannelInboundHandler {    private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes()));    @Override    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {        ByteBuf buf = (ByteBuf) msg;        byte[] data = new byte[buf.readableBytes()];        buf.readBytes(data);        String str = new String(data);        if ("ping".equals(str)) {            ctx.writeAndFlush(PONG_BUF.duplicate());        }    }}

服务端输出结果:

百万长连接优化

连接优化代码示例

服务端:

 static final int BEGIN_PORT = 11000;    static final int N_PORT = 100;    public static void main(String[] args) {        new Server().start(BEGIN_PORT, N_PORT);    }    public void start(int beginPort, int nPort) {        System.out.println("启动服务....");        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(bossGroup, workerGroup);        bootstrap.channel(NioServerSocketChannel.class);        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);        bootstrap.childHandler(new ChannelInitializer() {            @Override            protected void initChannel(SocketChannel ch) throws Exception {                ChannelPipeline pipeline = ch.pipeline();                //每个连接都有个ConnectionCountHandler对连接记数进行增加                pipeline.addLast(new ConnectionCountHandler());            }        });        //这里开启 10000到100099这100个端口        for (int i = 0; i < nPort; i++) {            int port = beginPort + i;            bootstrap.bind(port).addListener((ChannelFutureListener) future -> {                System.out.println("端口绑定成功: " + port);            });        }        System.out.println("服务已启动!");    }

客户端:

//服务端的IP    private static final String SERVER_HOST = "192.168.231.129";    static final int BEGIN_PORT = 11000;    static final int N_PORT = 100;    public static void main(String[] args) {        new Client().start(BEGIN_PORT, N_PORT);    }    public void start(final int beginPort, int nPort) {        System.out.println("客户端启动....");        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();        final Bootstrap bootstrap = new Bootstrap();        bootstrap.group(eventLoopGroup);        bootstrap.channel(NioSocketChannel.class);        bootstrap.option(ChannelOption.SO_REUSEADDR, true);        int index = 0;        int port;        String serverHost = System.getProperty("server.host", SERVER_HOST);        //从10000的端口开始,按端口递增的方式进行连接        while (!Thread.interrupted()) {            port = beginPort + index;            try {                ChannelFuture channelFuture = bootstrap.connect(serverHost, port);                channelFuture.addListener((ChannelFutureListener) future -> {                    if (!future.isSuccess()) {                        System.out.println("连接失败,退出!");                        System.exit(0);                    }                });                channelFuture.get();            } catch (Exception e) {            }            if (++index == nPort) {                index = 0;            }        }    }

ConnectionCountHandler类:

public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {    //这里用来对连接数进行记数,每两秒输出到控制台    private static final AtomicInteger nConnection = new AtomicInteger();    static {        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {            System.out.println("连接数: " + nConnection.get());        }, 0, 2, TimeUnit.SECONDS);    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        nConnection.incrementAndGet();    }    @Override    public void channelInactive(ChannelHandlerContext ctx) {        nConnection.decrementAndGet();    }}

上述的代码会打包成jar放到linux上运行,对于上述的优化来说,程序方面的就暂时不做,下面会从操作系统层面进行优化,让其支撑起百万连接。

TCP连接四元组

在优化之前先来看下网络里的一个小知识,TCP连接四元组: 服务器的IP+服务器的POST+客户端的IP+客户端的POST

端口的范围一般是1到65535:

配置优化

现在在虚拟机上安装两个linux系统,配置分别是:

地址CPU内存JDK作用192.168.15.130VM-4核8G1.8客户端192.168.15.128VM-4核8G1.8服务端

启动服务端: java -Xmx4g -Xms4g -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Server > out.log 2>&1 & 启动客户端: java -Xmx4g -Xms4g -Dserver.host=192.168.15.128 -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Client

启动服务端后可以使用tail -f命令查看out.log中的日志:

客户端启动后,如果报了以下错误,需要修改系统的文件最大句柄和进程的文件最大句柄:

Caused by: java.io.IOException: Too many open files        at sun.nio.ch.FileDispatcherImpl.init(Native Method)        at sun.nio.ch.FileDispatcherImpl.(FileDispatcherImpl.java:35)        ... 8 more

优化系统最大句柄: 查看操作系统最大文件句柄数,执行命令cat /proc/sys/fs/file-max,查看最大句柄数是否满足需要,如果不满足,通过vim /etc/sysctl.conf命令插入如下配置:

fs.file-max = 1000000
  1. 设置单进程打开的文件最大句柄数,执行命令ulimit -a查看当前设置是否满足要求:
[root@test-server2 download]# ulimit -a | grep "open files"open files                      (-n) 1024

当并发接入的Tcp连接数超过上限时,就会提示“Too many open files”,所有的新客户端接入将会失败。通过vim /etc/security/limits.conf 修改配置参数:

* soft nofile 1000000* hard nofile 1000000

修改配置参数后注销生效。

  • 如果程序被中断,或报了异常
java.io.IOException: 设备上没有空间    at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method)    at sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:299)    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:268)    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)    at sun.nio.ch.SelectorImpl.selectNow(SelectorImpl.java:105)    at io.netty.channel.nio.SelectedSelectionKeySetSelector.selectNow(SelectedSelectionKeySetSelector.java:56)    at io.netty.channel.nio.NioEventLoop.selectNow(NioEventLoop.java:750)    at io.netty.channel.nio.NioEventLoop$1.get(NioEventLoop.java:71)    at io.netty.channel.DefaultSelectStrategy.calculateStrategy(DefaultSelectStrategy.java:30)    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:426)    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)    at java.lang.Thread.run(Thread.java:748)
  • 此时可以查看操作系统的日志more /var/log/messages,或在程序启动时执行tail -f /var/log/messages 监控日志。如果日志中出现以下内容,说明需要优化TCP/IP参数
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun  4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun  4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun  4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun  4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun  4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets

==优化TCP/IP相关参数:==

  • 查看客户端端口范围限制
cat /proc/sys/net/ipv4/ip_local_port_range
  • 通过vim /etc/sysctl.conf 修改网络参数
  • 客户端修改端口范围的限制
net.ipv4.ip_local_port_range = 1024 65535
  • 优化TCP参数
net.ipv4.tcp_mem = 786432 2097152 3145728net.ipv4.tcp_wmem = 4096 4096 16777216net.ipv4.tcp_rmem = 4096 4096 16777216net.ipv4.tcp_keepalive_time = 1800net.ipv4.tcp_keepalive_intvl = 20net.ipv4.tcp_keepalive_probes = 5net.ipv4.tcp_tw_reuse = 1net.ipv4.tcp_tw_recycle = 1net.ipv4.tcp_fin_timeout = 30

==参数说明:==

net.ipv4.tcp_mem: 分配给tcp连接的内存,单位是page(1个Page通常是4KB,可以通过getconf PAGESIZE命令查看),三个值分别是最小、默认、和最大。比如以上配置中的最大是3145728,那分配给tcp的最大内存=31457284 / 1024 / 1024 = 12GB。一个TCP连接大约占7.5KB,粗略可以算出百万连接≈7.51000000/4=1875000 3145728足以满足测试所需。

net.ipv4.tcp_wmem: 为每个TCP连接分配的写缓冲区内存大小,单位是字节。三个值分别是最小、默认、和最大。

net.ipv4.tcp_rmem: 为每个TCP连接分配的读缓冲区内存大小,单位是字节。三个值分别是最小、默认、和最大。

net.ipv4.tcp_keepalive_time: 最近一次数据包发送与第一次keep alive探测消息发送的事件间隔,用于确认TCP连接是否有效。

net.ipv4.tcp_keepalive_intvl: 在未获得探测消息响应时,发送探测消息的时间间隔。

net.ipv4.tcp_keepalive_probes: 判断TCP连接失效连续发送的探测消息个数,达到之后判定连接失效。

net.ipv4.tcp_tw_reuse: 是否允许将TIME_WAIT Socket 重新用于新的TCP连接,默认为0,表示关闭。

net.ipv4.tcp_tw_recycle: 是否开启TIME_WAIT Socket 的快速回收功能,默认为0,表示关闭。

net.ipv4.tcp_fin_timeout: 套接字自身关闭时保持在FIN_WAIT_2 状态的时间。默认为60。

转载于:https://juejin.im/post/6861560765200105486

作者:狐言不胡言

bootstrap外不引用连接_网络编程Netty IoT百万长连接优化,万字长文精讲相关推荐

  1. plsql保持长连接_知乎千万级高性能长连接网关是如何搭建的

    作者:@faceair @安江泽原文:https://zhuanlan.zhihu.com/p/66807833 实时的响应总是让人兴奋的,就如你在微信里看到对方正在输入,如你在王者峡谷里一呼百应,如 ...

  2. 【性能优化】网络编程 - PHP - 使用TCP长连接的一种优化思路 - 学习/实践

    1.应用场景 主要了解学习如何基于PHP使用TCP长连接的一种优化思路. 2.学习/参考 文档阅读 PHP - CGI, Fast-FGI, PHP-FPM - 学习/实践 php使用tcp长连接的一 ...

  3. JAVA网络编程Socket常见问题 【长连接专题】

    一. 网络程序运行过程中的常见异常及处理 第1个异常是 java.net.BindException:Address already in use: JVM_Bind. 该异常发生在服务器端进行new ...

  4. mqtt连接失败_Netty实战:如何让单机下Netty支持百万长连接?

    单机下能不能让我们的网络应用支持百万连接?可以,但是有很多的工作要做.而且要考虑到单机的系统资源消耗能否支撑百万并发 一.操作系统优化 首先就是要突破操作系统的限制. 在Linux平台上,无论编写客户 ...

  5. python网络编程自学_五分钟搞定Python网络编程实现TCP和UDP连接

    Python网络编程实现TCP和UDP连接, 使用socket模块, 所有代码在python3下测试通过. 实现TCP#!/usr/bin/env python3 # -*- coding: utf- ...

  6. 【Linux网络编程笔记】TCP短连接产生大量TIME_WAIT导致无法对外建立新TCP连接的原因及解决方法—实践篇

    1. 查看系统网络配置和当前TCP状态         在定位并处理应用程序出现的网络问题时,了解系统默认网络配置是非常必要的.以x86_64平台Linux kernelversion 2.6.9的机 ...

  7. python requests 异步调用_构建高效的python requests长连接池详解

    前文: 最近在搞全网的CDN刷新系统,在性能调优时遇到了requests长连接的一个问题,以前关注过长连接太多造成浪费的问题,但因为系统都是分布式扩展的,针对这种各别问题就懒得改动了. 现在开发的缓存 ...

  8. 复工后一次百万长连接压测Nginx与OOM的问题排查分析,我裂开了!

    在最近的一次百万长连接压测中,32C 128G 的四台 Nginx 频繁出现 OOM,出现问题时的内存监控如下所示. 排查的过程记录如下. 现象描述 这是一个 websocket 百万长连接收发消息的 ...

  9. 一次百万长连接压测 Nginx OOM 的问题排查分析

    转载来源 : 一次百万长连接压测 Nginx OOM 的问题排查分析 : https://mp.weixin.qq.com/s/thSoTHeS26Y4Nf7ryUtygg 在最近的一次百万长连接压测 ...

最新文章

  1. Apache Kylin v2.5.0正式发布,开源分布式分析引擎
  2. oracle:sql约束
  3. 【Paper】2021_Observer-based distributed consensus for multi-agent systems with directed networks and
  4. Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和区别
  5. android mvc mvp 简书,浅析 MVP,MVC,MVVM模式(Android)
  6. 深入理解 main 方法
  7. JavaScript学习总结(14)——12个令人惊叹的JavaScript技巧
  8. linux查看某个时间段的日志
  9. linux ls 参数列表过长,ls提示参数列表过长解决办法
  10. 山东烟建借沟通CTBS实现财务数据大集中
  11. WPF 程序加载PGIS性能问题
  12. 千方百剂创建账套服务器文件,如何修改sql server 2000身份验证模式和系统管理员_数据库技巧...
  13. Java类加载机制与反射 jvm学习
  14. 锂电池是什么材料做的
  15. 计算机丢失MSVCR71.dll处理方法
  16. iPhone/苹果手机不用数据线传输文件到电脑的方法/步骤
  17. 四叶草的python代码_python绘图四叶草
  18. darknet源码理解(二)---图片的读取
  19. root后怎么删除授权管理,Root授权管理
  20. Python 爬虫 批量下载论坛图片

热门文章

  1. 数据数据库学通MongoDB——第一天 基础入门
  2. (转)解读NTFS(一)
  3. 用python绘制玫瑰花的代码_python也能玩出玫瑰花!程序员的表白代码
  4. 程序员的自我修养——远离“外包思维”
  5. 内网安装nginx+keepalived环境配置及简单使用
  6. 超越培训班同学的独门绝技
  7. oracle 多个with as
  8. php 取数值整数的函数是,PHP取整数函数常用的四种方法小结
  9. timer purge_Java Timer purge()方法与示例
  10. Python | 如何使用pip升级所有Python软件包?