netty实现简单的rpc,支持服务集群

  • 前言
    • 简介
    • 环境准备
    • Netty 处理器链设计
    • 消费者RPC代理工厂设计
    • netty rpc消费者核心设计
    • netty rpc生产者核心设计
    • 服务注册、发现以集群
    • 演示Demo
    • 尾言
      • 相关链接

前言

简介

最近了解了下netty相关知识,简单实现一个基于nettyrpc demo,参考了几篇文章,其中这篇清幽之地大佬的RPC基本原理以及如何用Netty来实现RPC 非常不错 ,给我不少启迪,关于rpc的相关知识可以阅读该文,本文主要对如何阐述如何使用netty实现rpc。与清幽之地大佬的demo相比,增添

1. 按服务名加载集群,采用随机选择服务方式负载
2. 每个实例可以同时作为消费者和生产者

demo主要围绕以下两图开发:

Dubbo 架构图


rpc调用示意图

环境准备

主要环境nettySpringBootzookeeperzkclient

  • netty 作为通信基础框架
  • SpringBoot 作为基础框架,打包、测试、运行
  • zookeeper 作为服务注册中心
  • zkclient zookeeper客户端

Netty 处理器链设计

netty开发,处理器链pipeline设计是非常重要的一个过程,从byte数据入站,设计不同的入站处理器将其一步步转换我们可直接使用的数据类型,且因为tcp连接保证数据传输的先后顺序,但由于网络传输拥塞窗口等原因影响,可能会发生拆包、粘包情况,可以借助netty框架内置的处理器和自定义数据协议方式解决。出站处理器设计则是逆向上述过程,将我们的数据转化为byte的过程。

使用到的处理器和相关实体类
1. ByteMsgToDataMsgDecoder 借助netty本身的ReplayingDecoder 加自定义数据协议的方式解决拆、粘包问题,将入站数数解码为DataMsg类型的解码器

public class ByteMsgToDataMsgDecoder extends ReplayingDecoder<DataMsg> {protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);DataMsg msg = new DataMsg(length, content);out.add(msg);}
}

数据传输实体DataMsg

@Data
@AllArgsConstructor
public class DataMsg {//数据长度private int length;//数据private byte[] data;
}

2. DataMsgToByteMsgEncoder 将出站DataMsg编码为Byte的解码器

public class DataMsgToByteMsgEncoder extends MessageToByteEncoder<DataMsg> {protected void encode(ChannelHandlerContext ctx, DataMsg msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getData());}
}

3.DataMsgToRequestOrResponseConvert 根据入参将DataMsg其中数据转换为请求、响应数据,根据服务消费者和生产者入站数据不同,进行不同的反序列化操作。

public class DataMsgToRequestOrResponseConvert extends SimpleChannelInboundHandler<DataMsg> {private Logger logger= LoggerFactory.getLogger(DataMsgToRequestOrResponseConvert.class);//需要转换目标类,即Response 或者 Requestprivate Class<?> target;public DataMsgToRequestOrResponseConvert(Class<?> target) {this.target = target;}protected void channelRead0(ChannelHandlerContext ctx, DataMsg msg) throws Exception {final byte[] data = msg.getData();Object object = JsonUtil.bytesToObject(data, target);ctx.fireChannelRead(object);logger.info("{}数据:{}",target.getSimpleName(),JsonUtil.objectToJsonString(object));}
}

请求数据类Request 封装消费者rpc服务时的参数

@Data
public class Request {//请求IDprivate String id;// 类名private String className;// 函数名称private String methodName;// 参数类型private Class<?>[] parameterTypes;// 参数列表private Object[] parameters;}

响应数据类Response 封装rpc生产者返回的数据

@Data
public class Response {//请求IDprivate String requestId;//代码private int code;//错误信息private String errorMsg;//数据private Object data;
}
  1. NettyServerRequestHandler 生产者核心处理器(后续切入)
  2. NettyClientResponseHandler 消费者核心处理(后续切入)

服务生产者链式示意图:

服务消费者链式示意图:

消费者RPC代理工厂设计

为所有服务消费者对目标服务生产者的调用,生成代理,从代理转入NettyClientResponseHandler类处理,借助spring ApplicationContextAware、以及jdk的动态代理实现上述操作。
相关核心代码
spring bean工具类

@Component
public class SpringBeanUtil implements ApplicationContextAware, InitializingBean {static ApplicationContext context;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtil.context = applicationContext;}public static <T> T getBeanByClass(Class<?> clazz) {return (T) context.getBean(clazz);}@Overridepublic void afterPropertiesSet() throws Exception {//获取含有指定注解的beanMap<String, Object> beans = context.getBeansWithAnnotation(RpcClientTarget.class);Iterator<Object> iterator = beans.values().iterator();while (iterator.hasNext()) {Object target = iterator.next();Field[] fields = target.getClass().getDeclaredFields();for (Field field : fields) {//获取目标bean有需要被代理的fieldif (field.getAnnotationsByType(MyResource.class) != null) {field.setAccessible(true);try {field.set(target, ProxyBeanUtil.getProxy(field.getType()));} catch (IllegalAccessException e) {e.printStackTrace();}}}}}
}

jdk代理工具类

@Component
public class RpcFactory implements InvocationHandler {@AutowiredNettyClient client;Logger logger = LoggerFactory.getLogger(this.getClass());public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.getDeclaringClass() == Object.class) {logger.info("method:{}代理执行,为Object声明的不进行代理执行", method.getName());return method.invoke(proxy, args);}logger.info("method:{}代理执行", method.getName());Request request = new Request();request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameters(args);request.setParameterTypes(method.getParameterTypes());request.setId(UUID.randomUUID().toString());String serverName = method.getDeclaringClass().getAnnotation(RpcClient.class).name();Response response = client.sendMsgV2(serverName,request);Class<?> returnType = method.getReturnType();//返回错误if (response.getCode() == ErrorCodeEnums.FAIL.getCode()) {throw new Exception(response.getErrorMsg());}/**1.返回结果是基础类型或者String类型,直接返回*2.其他类型转换为该类型*/if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {return response.getData();} else if (Collection.class.isAssignableFrom(returnType)) {return JsonUtil.jsonStringToObject(response.getData().toString(), Object.class);} else if (Map.class.isAssignableFrom(returnType)) {return JsonUtil.jsonStringToObject(response.getData().toString(), Map.class);} else {Object data = response.getData();return JsonUtil.jsonStringToObject(data.toString(), returnType);}}
}

相关注解类集合

/*** @author jxy* @className RpcClientTarget* @description 含有rpc的目标类* @date 2021/3/12 21:52*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClientTarget {}
/*** @author : porteryao* @description : Rpc客户端注解* @date : 2021/3/11 11:14*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClient {String name();
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyResource {}

demo Controller

@RestController
@RpcClientTarget //方便命中
public class TestController {@MyResourceprivate StuService stuService;@GetMapping("/test")public String test() {StuInfo info = new StuInfo("jxt", 18);return stuService.sayHello(info);}
}

service 接口

@RpcClient(name = "stuservice")
public interface StuService {String sayHello(StuInfo stuInfo);
}

上述操作可将需要代理执行rpc的接口,转到我们自定义的netty 相关代码执行

netty rpc消费者核心设计

  1. 根据需要代理执行service上注解RpcClient获取其服务名,并从连接管理器ConnectManager获取通道
  2. 通道不为空,则发送数据,并在阻塞队列中等待返回
  3. 将返回结果按照service 上的method封装

相关方法代码:

public Response sendMsgV2(String appName, Request request) throws InterruptedException {Channel channel = connectManage.chooseChannelByServiceName(appName);if (channel == null || (!channel.isActive())) {logger.error(appName + "无可用服务!");Response res = new Response();res.setCode(ErrorCodeEnums.FAIL.getCode());res.setErrorMsg(appName + "无可用服务!");return res;}//发送请求return clientResponseHandler.sendMsg(channel, request).take();}/*** 等待响应队列*/private Map<String, BlockingQueue<Response>> questMap = new ConcurrentHashMap<>();/*** 将返回结果放在阻塞队列*/public BlockingQueue<Response> sendMsg(Channel channel, Request request) {return getResponses(channel, request, questMap);}static BlockingQueue<Response> getResponses(Channel channel, Request request,Map<String, BlockingQueue<Response>> questMap) {BlockingQueue<Response> blockingQueue = new LinkedBlockingQueue<>();questMap.put(request.getId(), blockingQueue);byte[] bytes = JsonUtil.objectToJsonByteArray(request);DataMsg dataMsg = new DataMsg(bytes.length, bytes);channel.writeAndFlush(dataMsg);return blockingQueue;}static BlockingQueue<Response> getResponses(Channel channel, Request request,Map<String, BlockingQueue<Response>> questMap) {BlockingQueue<Response> blockingQueue = new LinkedBlockingQueue<>();questMap.put(request.getId(), blockingQueue);byte[] bytes = JsonUtil.objectToJsonByteArray(request);DataMsg dataMsg = new DataMsg(bytes.length, bytes);channel.writeAndFlush(dataMsg);return blockingQueue;}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) {BlockingQueue<Response> responseBlockingQueue = questMap.get(response.getRequestId());responseBlockingQueue.add(response);questMap.remove(response.getRequestId());}

由于netty是异步执行,所以这里需要发送者等待,笔者这里take()是无限期等待,可以设置一定时间等待,避免服务端长时间无响应造成调用方阻塞(后续会优化)。
致此,我们完成了rpc调用示意图,关于消费者调用设计的部分。

netty rpc生产者核心设计

生产者这块相对来说比较简单,根据消费者请求数据执行方法,并返回数据。

  1. 启动时将自身netty连接信息注册到注册中心
  2. 接受请求时按Request 其中class类型查询目标bean
  3. 执行目标bean Request 请求的方法
  4. 将结果序列化返回

核心方法

 protected void channelRead0(ChannelHandlerContext ctx, Request request)throws Exception {if (serviceMap.isEmpty()) {logger.warn("没有需要rpc调用的服务");return;}final Object instance = serviceMap.get(request.getClassName());if (instance != null) {Class<?> instanceClass = instance.getClass();Method method = instanceClass.getDeclaredMethod(request.getMethodName(), request.getParameterTypes());method.setAccessible(true);//避免权限问题Object o = method.invoke(instance,parametersConvert(request.getParameterTypes(), request.getParameters()));Response response = new Response();response.setCode(ErrorCodeEnums.SUCCESS.getCode());response.setData(o);response.setErrorMsg(ErrorCodeEnums.SUCCESS.getMsg());response.setRequestId(request.getId());String jsonString = JsonUtil.objectToJsonString(response);byte[] data = jsonString.getBytes(CharsetUtil.UTF_8);DataMsg dataMsg = new DataMsg(data.length, data);ctx.writeAndFlush(dataMsg);}}

服务注册、发现以集群

ps:本文篇幅较长,且这块由于逻辑处理复杂,笔者会在闲暇之余补充在单独的文章
主要步骤:

  1. 服务提供者按照application name在zookeeper注册其节点,节点类型为临时节点,便于zookeeper客户端感知
  2. 消费者根据服务调用时检测目标服务是否已存在通道,初次需从zookeeper获取,并设置监听
  3. 根据zookeeper节点信息创建通道连接
  4. zookeeper客户端监听到某服务注册的节点发送变化,并更新对应服务通道

演示Demo

到这了,来看看演示吧:

zookeeper已注册节点

idea启动情况:


clientApp:9002实例调用ServerApp


由于是随机数负载,可能需要尝试几次才能看到效果

ServerApp:9007实例调用clientApp



上述demo,即一个实例即可成为消费者也可成为生产者,并在调用时负载。

尾言

笔者仅做个人学习,措辞不合理,知识不够深入,请勿怪!Thank

相关链接

参考文献
[1]: https://www.jianshu.com/p/8876c9f3cd7f
仓库地址
gitee https://gitee.com/junxiaoyao/jxy_netty_rpc_study
注:master分支为不包含 zookeeper注册中心的demo,如需查看请切换至feature/auto_registry_zookeepe分支

netty实现简单的rpc,支持服务集群相关推荐

  1. .net core下简单构建高可用服务集群

    一说到集群服务相信对普通开发者来说肯定想到很复杂的事情,如zeekeeper ,反向代理服务网关等一系列的搭建和配置等等:总得来说需要有一定经验和规划的团队才能应用起来.在这文章里你能看到在.net ...

  2. 分布式IM及Netty服务集群解决方案

    一.概述 使用netty开发分布式Im,提供分布netty集群解决方案.服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发. 二.集群架构 三.项目地址 https:/ ...

  3. 【初识Netty使用Netty实现简单的客户端与服务端的通信操作Netty框架中一些重要的类以及方法的解析】

    一.Netty是什么? Netty 由 Trustin Lee(韩国,Line 公司)2004 年开发 本质:网络应用程序框架 实现:异步.事件驱动 特性:高性能.可维护.快速开发 用途:开发服务器和 ...

  4. linux的RHCS服务集群之Heartbeat集群简单搭建

    搭建Heartbeat服务器 Heartbeat名词解析: 所谓Heartbeat,顾名思义就是心跳同步的意思.在现在的网络中,是很重要稳定高效的时代.在很多的服务上都是通过搭建服务集群来提高效率:并 ...

  5. 在滴滴云快速搭建自己的简易服务集群(入门版)

    引言 万物互联的时代,各行各业都或多或少的接入线上,作为开发人员,我们有了小而美的产品或服务方面的想法,就把它她做出来放到线上,让它发展壮大. 我以滴滴云为例,教你一步一步的搭建自己的服务器集群,包括 ...

  6. HOD服务集群 torque maui

    前言 本文的目的在于从无到有的搭建一套HOD服务集群.在参考本文之前假设读者已经对hadoop系统及其下面 DFSShell,HDFS,MapReduce等已经有了相当的了解.由于Hadoop doc ...

  7. 简单部署 rancher 管理kubernetes集群(3)

    rancher 简单使用 运行docker 容器 环境部署 关闭防火墙与selinux systemctl stop firewalld systemctl disable firewalldsed ...

  8. 轻松实现基于Heartbeat的高可用web服务集群

    高可用集群就是为了保证某项服务能够时时在线,我们可以通过几个9来衡量一个高可用集群提供服务的稳定性,例如5个9的高可用集群必须保证服务一年在线的时间占99.999%,也就是说一年的时间中仅允许服务电线 ...

  9. Centos 6.4下 MySQL配置主从服务(集群)

    Centos 6.4下 MySQL配置主从服务(集群) 我们前面两篇文章都分别介绍了Mysql的安装及配置.备份及还原,今天我们继续前面的环境介绍一下Centos 6.4下MySQL配置主从实现数据同 ...

最新文章

  1. 分页存储过程性能比较 二分法
  2. 《深度学习导论及案例分析》一2.11概率图模型的推理
  3. 美国诚实签经验——着装,戒指,手表装土豪,医生预约单,流水、房产和工作证明...
  4. python pyyaml模块使用示例:读取yaml文件内容
  5. mac+修改+ssh文件夹权限_用SSH指令批量修改文件夹 文件权限和拥有者
  6. vs需要迁移_这可能是目前最全面的无服务器迁移实践
  7. 将计算机设置成交换机主机名,CISCO2950交换机的配置(设置密码、IP地址、主机名)...
  8. php数据库备份脚本
  9. LINUX 错误代码
  10. 车间调度建模系列2|复杂车间调度问题描述
  11. 终端一直显示 (master) ,即终端一直处于master分支下的取消办法
  12. css pseudo elements,CSS 伪元素 (Pseudo-elements)
  13. 优酷古永锵:最大对手是土豆网 做好内容监管
  14. 论文阅读: [3d]Audio-driven Talking Face Video Generation with Learning-based Personalized Head Pose
  15. linux虚拟机怎么联网
  16. tcp可靠传输的机制有哪些(面试必看
  17. postgrsql 增加字段
  18. AXI总线之AXI-LITE总线分析与实现
  19. 联想G50-70装机过程
  20. U3D中 TextMeshPro 的超链接使用笔记

热门文章

  1. 关于svchost占用巨大内存的问题
  2. XMail 安装配置使用 (1.27 )
  3. 网络安全等级保护确定定级对象
  4. 一张图看懂VLAN数据帧接收流程中Access端口与Trunk端口的工作流程
  5. Redis操作str、list、hash、set、sortedset、bitmap
  6. 第七周--数据结构--队列数组
  7. win 32 APP 项目简单创建窗体
  8. 基于MATLAB的数论运算与编写函数(附完整代码)
  9. STM32任意IO模拟8080时序驱动TFTLCD屏
  10. 我的大学(2001-2005,从文艺青年到2B青年)