-     前言     - 

单纯netty结合protostuff进行rpc对象传输的demo网上有很多,大部分都是一个模子刻出来的,一开始我也是抄了一个,本地测试畅通无阻,未发生任何异常。

部署预发环境,进行压测后,问题巨多,各种报错层出不穷。当然,压测时我用的数据量大、发送请求非常密集,单机是每秒前100ms发送2万个对象,其他900ms歇息,死循环发送,共计40台机器作为客户端,同时往2台netty Server服务器发送对象,那么平均每个server每秒大概要接收40万个对象,由于后面还有业务逻辑,逻辑每秒只能处理35万实测。

对于网上的代码,进行了多次修改,反复测试,最终是达到了不报错无异常,单机秒级接收35万个对象以上,故写篇文章记录一下,文中代码会和线上逻辑保持一致。

-     Protostuff 序列化和反序列化     - 

这个没什么特殊的,网上找个工具类就好了。

引入pom:

<protostuff.version>1.7.2</protostuff.version>
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>${protostuff.version}</version>
</dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>${protostuff.version}</version>
</dependency>

public class ProtostuffUtils {/*** 避免每次序列化都重新申请Buffer空间* 这句话在实际生产上没有意义,耗时减少的极小,但高并发下,如果还用这个buffer,会报异常说buffer还没清空,就又被使用了*/
//    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);/*** 缓存Schema*/private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();/*** 序列化方法,把指定对象序列化成字节数组** @param obj* @param <T>* @return*/@SuppressWarnings("unchecked")public static <T> byte[] serialize(T obj) {Class<T> clazz = (Class<T>) obj.getClass();Schema<T> schema = getSchema(clazz);LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);byte[] data;try {data = ProtobufIOUtil.toByteArray(obj, schema, buffer);
//            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);} finally {buffer.clear();}return data;}/*** 反序列化方法,将字节数组反序列化成指定Class类型** @param data* @param clazz* @param <T>* @return*/public static <T> T deserialize(byte[] data, Class<T> clazz) {Schema<T> schema = getSchema(clazz);T obj = schema.newMessage();ProtobufIOUtil.mergeFrom(data, obj, schema);
//        ProtostuffIOUtil.mergeFrom(data, obj, schema);return obj;}@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> clazz) {Schema<T> schema = (Schema<T>) schemaCache.get(clazz);if (Objects.isNull(schema)) {//这个schema通过RuntimeSchema进行懒创建并缓存//所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的schema = RuntimeSchema.getSchema(clazz);if (Objects.nonNull(schema)) {schemaCache.put(clazz, schema);}}return schema;}
}

此处有坑,就是最上面大部分网上代码都是用了static的buffer。在单线程情况下没有问题。在多线程情况下,非常容易出现buffer一次使用后尚未被clear,就再次被另一个线程使用,会抛异常。而所谓的避免每次都申请buffer空间,实测性能影响极其微小。

另里面两次ProtostuffIOUtil都改成了ProtobufIOUtil,因为也是出过异常,修改后未见有异常。

-     自定义序列化方式     - 

解码器decoder:

import com.jd.platform.hotkey.common.model.HotKeyMsg;
import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;/*** @author wuweifeng* @version 1.0* @date 2020-07-29*/
public class MsgDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) {try {byte[] body = new byte[in.readableBytes()];  //传输正常in.readBytes(body);list.add(ProtostuffUtils.deserialize(body, HotKeyMsg.class));//            if (in.readableBytes() < 4) {
//                return;
//            }
//            in.markReaderIndex();
//            int dataLength = in.readInt();
//            if (dataLength < 0) {
//                channelHandlerContext.close();
//            }
//            if (in.readableBytes() < dataLength) {
//                in.resetReaderIndex();
//                return;
//            }
//
//            byte[] data = new byte[dataLength];
//            in.readBytes(data);
//
//            Object obj = ProtostuffUtils.deserialize(data, HotKeyMsg.class);
//            list.add(obj);} catch (Exception e) {e.printStackTrace();}}
}

编码器 encoder:


import com.jd.platform.hotkey.common.model.HotKeyMsg;
import com.jd.platform.hotkey.common.tool.Constant;
import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** @author wuweifeng* @version 1.0* @date 2020-07-30*/
public class MsgEncoder extends MessageToByteEncoder {@Overridepublic void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {if (in instanceof HotKeyMsg) {byte[] bytes = ProtostuffUtils.serialize(in);byte[] delimiter = Constant.DELIMITER.getBytes();byte[] total = new byte[bytes.length + delimiter.length];System.arraycopy(bytes, 0, total, 0, bytes.length);System.arraycopy(delimiter, 0, total, bytes.length, delimiter.length);out.writeBytes(total);}}
}

先看Decoder解码器,这个是用来netty收到消息后,进行解码,将字节转为对象(自定义的HotKeyMsg)用的。里面有一堆被我注释掉了,注释掉的,应该在网上找到的帖子都是那么写的。这种方式本身在普通场景下是没问题的,解码还算正常,但是当上几十万时非常容易出现粘包问题。所以我是在这个解码器前增加了一个DelimiterBasedFrameDecoder分隔符解码器。

当收到消息时,先过这个分隔符解码器,之后到MsgDecoder那里时,就是已经分隔好的一个对象字节流了,就可以直接用proto工具类进行反序列化的。Constant.DELIMITER是我自定义的一个特殊字符串,用来做分隔符。

再看encoder,编码器,首先将要传输的对象用ProtostuffUtils序列化为byte[],然后在尾巴上挂上我自定义的那个分隔符。这样在对外发送对象时,就会走这个编码器,并被加上分隔符。

对应的server端代码大概是这样:

之后在Handler里就可以直接使用这个传输的对象了。

再看client端:

和Server端是一样的,也是这几个编解码器,没有区别。因为netty和server之间通讯,我都是用的同一个对象定义。

同理handler也是一样的。

-     单机和集群     - 

以上都写完后,其实就可以测试了,我们可以启动一个client,一个server,然后搞个死循环往Server发这个对象了,然后你在server端在收到这个对象后,再直接把这个对象也写回来,原样发送到客户端。会发现运行的很顺畅,每秒发N万个没问题,编解码都正常,client和server端都比较正常,当前前提是ProtoBuf的工具类和我的一样,不要共享那个buffer。网上找的文章基本上到这样也就结束了,随便发几个消息没问题也就算OK。然而实际上,这种代码上线后,会坑的不要不要的。

其实本地测试也很容易,再启动几个客户端,都连同一个Server,然后给他死循环发对象,再看看两端会不会有异常。这种情况下,和第一种的区别其实客户端没什么变化,Server端就有变化了,之前同时只给一个client发消息,现在同时给两个client发消息,这一步如果不谨慎就会出问题了,建议自行尝试。

之后,我们再加点料,我启动两个Server,分别用两个端口,线上其实是两台不同的server服务器,client会同时往两台server死循环发对象,如下图代码。

发消息,我们常用的就是channel.writeAndFlush(),大家可以把那个sync去掉,然后跑一下代码看看。会发现异常抛的一坨一坨的。我们明明是往两个不同的channel发消息,只不过时间是同时,结果就是发生了严重的粘包。server端收到的消息很多都是不规范的,会大量报错。如果在两个channel发送间隔100ms,情况就解决了。当然,最终我们可以使用sync同步发送,这样就不会抛异常了。

以上代码经测试,40台client,2台Server,平均每个server每秒大概接收40万个对象,可以持续稳定运行。

作者:天涯泪小武

https://blog.csdn.net/tianyaleixiaowu/article/details/107714868

Netty 如何做到单机秒级接收 35 万个对象?相关推荐

  1. 使用Netty如何做到单机秒级接收35万个对象

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源 | https://urlify.cn/Bvueuq ...

  2. Netty结合Protostuff传输对象案例,单机压测秒级接收35万个对象

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 单纯netty结合protostuff进行rpc对象传输的 ...

  3. 一秒级接收20W+消息落库比Mysql快1000倍

    项目地址  flowback: 亿级消息落库,大数据收集,秒级10w+数据落库,亿级数据检索秒级响应解决方案 最近有大数据落库需求秒级达到10w+于是写了个开源项目 总体使用Netty 与 堆外内存 ...

  4. 京东热 Key 0.4 发布,单机 QPS 提升至 35 万

    点击▲关注 "爪哇笔记"   给公众号标星置顶 更多精彩 第一时间直达 发布 HotKey在618稳定版0.2版基础上,引入了proto序列化方式,并优化了传输对象. worker ...

  5. 京东热 key 探测框架新版发布,单机 QPS 可达 35 万

    △Hollis, 一个对Coding有着独特追求的人△ 这是Hollis的第 300 篇原创分享 作者 l Hollis 来源 l Hollis(ID:hollischuang) 对于大型的分布式系统 ...

  6. 京东App秒级百G日志传输存储架构设计与实战

    本文作者:平台业务研发部-武伟峰,数据与智能部-李阳 背景 在日常工作中,我们通常需要存储一些日志,譬如用户请求的出入参.系统运行时打印的一些info.error之类的日志,从而对系统在运行时出现的问 ...

  7. 直播回顾:准确性提升到 5 秒级,ssar 独创的 load5s 指标有多硬核?| 龙蜥技术

    简介: 你还在为分析机器负载高而苦恼?这款 ssar 工具独创 load5s 指标精准定位超硬核. 编者按:本文整理自龙蜥SIG技术周会,作者闻茂泉,阿里云计算平台事业部SRE运维专家,是龙蜥社区跟踪 ...

  8. 直播回顾:准确性提升到 5 秒级,ssar 独创的 load5s 指标有多硬核?| 龙蜥技术...

    编者按:本文整理自龙蜥SIG技术周会,作者闻茂泉,阿里云计算平台事业部SRE运维专家,是龙蜥社区跟踪诊断SIG核心成员.本文带你了解 ssar 的基本功能和使用.初步学习用 ssar 解决单机 OS ...

  9. PostgreSQL 百亿数据 秒级响应 正则及模糊查询

    原文: https://yq.aliyun.com/articles/7444?spm=5176.blog7549.yqblogcon1.6.2wcXO2 摘要: 正则匹配和模糊匹配通常是搜索引擎的特 ...

最新文章

  1. 扩增子图表解读6韦恩图:比较组间共有和特有OTU或分类单元
  2. 解决python多版本环境下pip报错Fatal error in launcher: Unable to create process using问题
  3. 获得最新纪录 sql
  4. mysql查询是否用index_如何查看sql查询是否用到索引(mysql)
  5. 华为鸿蒙与magic,如果荣耀Magic3搭载了屏下镜头和鸿蒙系统,你会做第一批吗?...
  6. .NET Core多平台项目模板eShopOnContainers编译手记
  7. MySQL常用语法记录
  8. jquery layout学习
  9. 原创音乐人炙手可热,中国原创音乐进入新周期?
  10. 修炼!!!——超越项目经理
  11. 制作一个小型linux
  12. Java之多线程断点下载的实现
  13. Java-综合项目-房屋出租系统(总结与反思)
  14. 大学本科基于html5毕业设计题目50例
  15. 苹果手机密码设置在哪里_oppo怎么设置SIM密码-oppo手机SIM卡密码设置详细教程
  16. win7打开计算机死机,win7系统进入桌面总是死机或者卡死怎么办
  17. wps永久关闭热点功能
  18. linux limits.conf 生效,linux修改limits.conf不生效
  19. IDEA运行web项目路径中去掉项目名称
  20. es的refresh和flush介绍

热门文章

  1. python写炒股软件_利用 Python 构建自己的股票投资系统
  2. office插件开发_Visual Studio Code有哪些你常用的插件?
  3. 简单介绍python迭代器和生成器
  4. PTA基础编程题目集-7-38 数列求和-加强版
  5. 图论 ---- CF1209F. Koala and Notebook(多位数字拆边+BFS)
  6. UVA1146 / LA3211(ACM-ICPC 2004 Europe - Southwestern) Now or later(2-SAT问题 + 二分答案)
  7. 蓝桥杯C++ AB组辅导课
  8. php持续写入文件,PHP:如何读取不断写入的文件
  9. asyncdata 获取参数_载入页面初始数据(asyncData)《 Nuxt.js:异步数据 》
  10. python内存管理机制_[转] Python内存管理机制