netty实现简单的rpc,支持服务集群
netty实现简单的rpc,支持服务集群
- 前言
- 简介
- 环境准备
- Netty 处理器链设计
- 消费者RPC代理工厂设计
- netty rpc消费者核心设计
- netty rpc生产者核心设计
- 服务注册、发现以集群
- 演示Demo
- 尾言
- 相关链接
前言
简介
最近了解了下netty相关知识,简单实现一个基于netty的rpc demo,参考了几篇文章,其中这篇清幽之地大佬的RPC基本原理以及如何用Netty来实现RPC 非常不错 ,给我不少启迪,关于rpc的相关知识可以阅读该文,本文主要对如何阐述如何使用netty实现rpc。与清幽之地大佬的demo相比,增添
1. 按服务名加载集群,采用随机选择服务方式负载
2. 每个实例可以同时作为消费者和生产者
demo主要围绕以下两图开发:
Dubbo 架构图
rpc调用示意图
环境准备
主要环境netty,SpringBoot,zookeeper、zkclient
- netty 作为通信基础框架
- SpringBoot 作为基础框架,打包、测试、运行
- zookeeper 作为服务注册中心
- zkclient zookeeper客户端
Netty 处理器链设计
netty开发,处理器链pipeline设计是非常重要的一个过程,从byte数据入站,设计不同的入站处理器将其一步步转换我们可直接使用的数据类型,且因为tcp连接保证数据传输的先后顺序,但由于网络传输拥塞窗口等原因影响,可能会发生拆包、粘包情况,可以借助netty框架内置的处理器和自定义数据协议方式解决。出站处理器设计则是逆向上述过程,将我们的数据转化为byte的过程。
使用到的处理器和相关实体类
1. ByteMsgToDataMsgDecoder 借助netty本身的ReplayingDecoder 加自定义数据协议的方式解决拆、粘包问题,将入站数数解码为DataMsg类型的解码器
public class ByteMsgToDataMsgDecoder extends ReplayingDecoder<DataMsg> {protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);DataMsg msg = new DataMsg(length, content);out.add(msg);}
}
数据传输实体DataMsg
@Data
@AllArgsConstructor
public class DataMsg {//数据长度private int length;//数据private byte[] data;
}
2. DataMsgToByteMsgEncoder 将出站DataMsg编码为Byte的解码器
public class DataMsgToByteMsgEncoder extends MessageToByteEncoder<DataMsg> {protected void encode(ChannelHandlerContext ctx, DataMsg msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getData());}
}
3.DataMsgToRequestOrResponseConvert 根据入参将DataMsg其中数据转换为请求、响应数据,根据服务消费者和生产者入站数据不同,进行不同的反序列化操作。
public class DataMsgToRequestOrResponseConvert extends SimpleChannelInboundHandler<DataMsg> {private Logger logger= LoggerFactory.getLogger(DataMsgToRequestOrResponseConvert.class);//需要转换目标类,即Response 或者 Requestprivate Class<?> target;public DataMsgToRequestOrResponseConvert(Class<?> target) {this.target = target;}protected void channelRead0(ChannelHandlerContext ctx, DataMsg msg) throws Exception {final byte[] data = msg.getData();Object object = JsonUtil.bytesToObject(data, target);ctx.fireChannelRead(object);logger.info("{}数据:{}",target.getSimpleName(),JsonUtil.objectToJsonString(object));}
}
请求数据类Request 封装消费者rpc服务时的参数
@Data
public class Request {//请求IDprivate String id;// 类名private String className;// 函数名称private String methodName;// 参数类型private Class<?>[] parameterTypes;// 参数列表private Object[] parameters;}
响应数据类Response 封装rpc生产者返回的数据
@Data
public class Response {//请求IDprivate String requestId;//代码private int code;//错误信息private String errorMsg;//数据private Object data;
}
- NettyServerRequestHandler 生产者核心处理器(后续切入)
- NettyClientResponseHandler 消费者核心处理(后续切入)
服务生产者链式示意图:
服务消费者链式示意图:
消费者RPC代理工厂设计
为所有服务消费者对目标服务生产者的调用,生成代理,从代理转入NettyClientResponseHandler类处理,借助spring ApplicationContextAware、以及jdk的动态代理实现上述操作。
相关核心代码
spring bean工具类
@Component
public class SpringBeanUtil implements ApplicationContextAware, InitializingBean {static ApplicationContext context;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtil.context = applicationContext;}public static <T> T getBeanByClass(Class<?> clazz) {return (T) context.getBean(clazz);}@Overridepublic void afterPropertiesSet() throws Exception {//获取含有指定注解的beanMap<String, Object> beans = context.getBeansWithAnnotation(RpcClientTarget.class);Iterator<Object> iterator = beans.values().iterator();while (iterator.hasNext()) {Object target = iterator.next();Field[] fields = target.getClass().getDeclaredFields();for (Field field : fields) {//获取目标bean有需要被代理的fieldif (field.getAnnotationsByType(MyResource.class) != null) {field.setAccessible(true);try {field.set(target, ProxyBeanUtil.getProxy(field.getType()));} catch (IllegalAccessException e) {e.printStackTrace();}}}}}
}
jdk代理工具类
@Component
public class RpcFactory implements InvocationHandler {@AutowiredNettyClient client;Logger logger = LoggerFactory.getLogger(this.getClass());public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.getDeclaringClass() == Object.class) {logger.info("method:{}代理执行,为Object声明的不进行代理执行", method.getName());return method.invoke(proxy, args);}logger.info("method:{}代理执行", method.getName());Request request = new Request();request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameters(args);request.setParameterTypes(method.getParameterTypes());request.setId(UUID.randomUUID().toString());String serverName = method.getDeclaringClass().getAnnotation(RpcClient.class).name();Response response = client.sendMsgV2(serverName,request);Class<?> returnType = method.getReturnType();//返回错误if (response.getCode() == ErrorCodeEnums.FAIL.getCode()) {throw new Exception(response.getErrorMsg());}/**1.返回结果是基础类型或者String类型,直接返回*2.其他类型转换为该类型*/if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {return response.getData();} else if (Collection.class.isAssignableFrom(returnType)) {return JsonUtil.jsonStringToObject(response.getData().toString(), Object.class);} else if (Map.class.isAssignableFrom(returnType)) {return JsonUtil.jsonStringToObject(response.getData().toString(), Map.class);} else {Object data = response.getData();return JsonUtil.jsonStringToObject(data.toString(), returnType);}}
}
相关注解类集合
/*** @author jxy* @className RpcClientTarget* @description 含有rpc的目标类* @date 2021/3/12 21:52*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClientTarget {}
/*** @author : porteryao* @description : Rpc客户端注解* @date : 2021/3/11 11:14*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClient {String name();
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyResource {}
demo Controller
@RestController
@RpcClientTarget //方便命中
public class TestController {@MyResourceprivate StuService stuService;@GetMapping("/test")public String test() {StuInfo info = new StuInfo("jxt", 18);return stuService.sayHello(info);}
}
service 接口
@RpcClient(name = "stuservice")
public interface StuService {String sayHello(StuInfo stuInfo);
}
上述操作可将需要代理执行rpc的接口,转到我们自定义的netty 相关代码执行
netty rpc消费者核心设计
- 根据需要代理执行service上注解RpcClient获取其服务名,并从连接管理器ConnectManager获取通道
- 通道不为空,则发送数据,并在阻塞队列中等待返回
- 将返回结果按照service 上的method封装
相关方法代码:
public Response sendMsgV2(String appName, Request request) throws InterruptedException {Channel channel = connectManage.chooseChannelByServiceName(appName);if (channel == null || (!channel.isActive())) {logger.error(appName + "无可用服务!");Response res = new Response();res.setCode(ErrorCodeEnums.FAIL.getCode());res.setErrorMsg(appName + "无可用服务!");return res;}//发送请求return clientResponseHandler.sendMsg(channel, request).take();}/*** 等待响应队列*/private Map<String, BlockingQueue<Response>> questMap = new ConcurrentHashMap<>();/*** 将返回结果放在阻塞队列*/public BlockingQueue<Response> sendMsg(Channel channel, Request request) {return getResponses(channel, request, questMap);}static BlockingQueue<Response> getResponses(Channel channel, Request request,Map<String, BlockingQueue<Response>> questMap) {BlockingQueue<Response> blockingQueue = new LinkedBlockingQueue<>();questMap.put(request.getId(), blockingQueue);byte[] bytes = JsonUtil.objectToJsonByteArray(request);DataMsg dataMsg = new DataMsg(bytes.length, bytes);channel.writeAndFlush(dataMsg);return blockingQueue;}static BlockingQueue<Response> getResponses(Channel channel, Request request,Map<String, BlockingQueue<Response>> questMap) {BlockingQueue<Response> blockingQueue = new LinkedBlockingQueue<>();questMap.put(request.getId(), blockingQueue);byte[] bytes = JsonUtil.objectToJsonByteArray(request);DataMsg dataMsg = new DataMsg(bytes.length, bytes);channel.writeAndFlush(dataMsg);return blockingQueue;}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) {BlockingQueue<Response> responseBlockingQueue = questMap.get(response.getRequestId());responseBlockingQueue.add(response);questMap.remove(response.getRequestId());}
由于netty是异步执行,所以这里需要发送者等待,笔者这里take()是无限期等待,可以设置一定时间等待,避免服务端长时间无响应造成调用方阻塞(后续会优化)。
致此,我们完成了rpc调用示意图,关于消费者调用设计的部分。
netty rpc生产者核心设计
生产者这块相对来说比较简单,根据消费者请求数据执行方法,并返回数据。
- 启动时将自身netty连接信息注册到注册中心
- 接受请求时按Request 其中class类型查询目标bean
- 执行目标bean Request 请求的方法
- 将结果序列化返回
核心方法
protected void channelRead0(ChannelHandlerContext ctx, Request request)throws Exception {if (serviceMap.isEmpty()) {logger.warn("没有需要rpc调用的服务");return;}final Object instance = serviceMap.get(request.getClassName());if (instance != null) {Class<?> instanceClass = instance.getClass();Method method = instanceClass.getDeclaredMethod(request.getMethodName(), request.getParameterTypes());method.setAccessible(true);//避免权限问题Object o = method.invoke(instance,parametersConvert(request.getParameterTypes(), request.getParameters()));Response response = new Response();response.setCode(ErrorCodeEnums.SUCCESS.getCode());response.setData(o);response.setErrorMsg(ErrorCodeEnums.SUCCESS.getMsg());response.setRequestId(request.getId());String jsonString = JsonUtil.objectToJsonString(response);byte[] data = jsonString.getBytes(CharsetUtil.UTF_8);DataMsg dataMsg = new DataMsg(data.length, data);ctx.writeAndFlush(dataMsg);}}
服务注册、发现以集群
ps:本文篇幅较长,且这块由于逻辑处理复杂,笔者会在闲暇之余补充在单独的文章
主要步骤:
- 服务提供者按照application name在zookeeper注册其节点,节点类型为临时节点,便于zookeeper客户端感知
- 消费者根据服务调用时检测目标服务是否已存在通道,初次需从zookeeper获取,并设置监听
- 根据zookeeper节点信息创建通道连接
- zookeeper客户端监听到某服务注册的节点发送变化,并更新对应服务通道
演示Demo
到这了,来看看演示吧:
zookeeper已注册节点
idea启动情况:
clientApp:9002实例调用ServerApp
由于是随机数负载,可能需要尝试几次才能看到效果
ServerApp:9007实例调用clientApp
上述demo,即一个实例即可成为消费者也可成为生产者,并在调用时负载。
尾言
笔者仅做个人学习,措辞不合理,知识不够深入,请勿怪!Thank
相关链接
参考文献:
[1]: https://www.jianshu.com/p/8876c9f3cd7f
仓库地址
gitee https://gitee.com/junxiaoyao/jxy_netty_rpc_study
注:master分支为不包含 zookeeper注册中心的demo,如需查看请切换至feature/auto_registry_zookeepe分支
netty实现简单的rpc,支持服务集群相关推荐
- .net core下简单构建高可用服务集群
一说到集群服务相信对普通开发者来说肯定想到很复杂的事情,如zeekeeper ,反向代理服务网关等一系列的搭建和配置等等:总得来说需要有一定经验和规划的团队才能应用起来.在这文章里你能看到在.net ...
- 分布式IM及Netty服务集群解决方案
一.概述 使用netty开发分布式Im,提供分布netty集群解决方案.服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发. 二.集群架构 三.项目地址 https:/ ...
- 【初识Netty使用Netty实现简单的客户端与服务端的通信操作Netty框架中一些重要的类以及方法的解析】
一.Netty是什么? Netty 由 Trustin Lee(韩国,Line 公司)2004 年开发 本质:网络应用程序框架 实现:异步.事件驱动 特性:高性能.可维护.快速开发 用途:开发服务器和 ...
- linux的RHCS服务集群之Heartbeat集群简单搭建
搭建Heartbeat服务器 Heartbeat名词解析: 所谓Heartbeat,顾名思义就是心跳同步的意思.在现在的网络中,是很重要稳定高效的时代.在很多的服务上都是通过搭建服务集群来提高效率:并 ...
- 在滴滴云快速搭建自己的简易服务集群(入门版)
引言 万物互联的时代,各行各业都或多或少的接入线上,作为开发人员,我们有了小而美的产品或服务方面的想法,就把它她做出来放到线上,让它发展壮大. 我以滴滴云为例,教你一步一步的搭建自己的服务器集群,包括 ...
- HOD服务集群 torque maui
前言 本文的目的在于从无到有的搭建一套HOD服务集群.在参考本文之前假设读者已经对hadoop系统及其下面 DFSShell,HDFS,MapReduce等已经有了相当的了解.由于Hadoop doc ...
- 简单部署 rancher 管理kubernetes集群(3)
rancher 简单使用 运行docker 容器 环境部署 关闭防火墙与selinux systemctl stop firewalld systemctl disable firewalldsed ...
- 轻松实现基于Heartbeat的高可用web服务集群
高可用集群就是为了保证某项服务能够时时在线,我们可以通过几个9来衡量一个高可用集群提供服务的稳定性,例如5个9的高可用集群必须保证服务一年在线的时间占99.999%,也就是说一年的时间中仅允许服务电线 ...
- Centos 6.4下 MySQL配置主从服务(集群)
Centos 6.4下 MySQL配置主从服务(集群) 我们前面两篇文章都分别介绍了Mysql的安装及配置.备份及还原,今天我们继续前面的环境介绍一下Centos 6.4下MySQL配置主从实现数据同 ...
最新文章
- 分页存储过程性能比较 二分法
- 《深度学习导论及案例分析》一2.11概率图模型的推理
- 美国诚实签经验——着装,戒指,手表装土豪,医生预约单,流水、房产和工作证明...
- python pyyaml模块使用示例:读取yaml文件内容
- mac+修改+ssh文件夹权限_用SSH指令批量修改文件夹 文件权限和拥有者
- vs需要迁移_这可能是目前最全面的无服务器迁移实践
- 将计算机设置成交换机主机名,CISCO2950交换机的配置(设置密码、IP地址、主机名)...
- php数据库备份脚本
- LINUX 错误代码
- 车间调度建模系列2|复杂车间调度问题描述
- 终端一直显示 (master) ,即终端一直处于master分支下的取消办法
- css pseudo elements,CSS 伪元素 (Pseudo-elements)
- 优酷古永锵:最大对手是土豆网 做好内容监管
- 论文阅读: [3d]Audio-driven Talking Face Video Generation with Learning-based Personalized Head Pose
- linux虚拟机怎么联网
- tcp可靠传输的机制有哪些(面试必看
- postgrsql 增加字段
- AXI总线之AXI-LITE总线分析与实现
- 联想G50-70装机过程
- U3D中 TextMeshPro 的超链接使用笔记