进行这个章节之前,需要去看一下RMI的实现哈,如果了解过的童鞋可以直接跳过,如果没有或者不知道RMI的童鞋,移驾到下面的链接看完之后再回来继续看这篇

RPC系列之入门_阿小冰的博客-CSDN博客RPC系列之入门https://blog.csdn.net/qq_38377525/article/details/123507599?spm=1001.2014.3001.5502


介绍

说到RPC,应该能想的到Dubbo吧, Dubbo的底层是使用了Netty作为网络通讯框架,Netty赋予了Dubbo使用RPC远程调用服务,那接下来我们体验一下用Netty实现一个简单的RPC框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务

1、创建一个接口,定义一个抽象方法,用于约定消费者和提供者之间的调用

2、创建一个提供者,需要监听消费者的请求,并按照1中的约束返回数据

3、创建一个消费者,需要透明的调用自己不存在的方法,内部需要使用Netty实现数据通信

4、提供者与消费者数据传输使用json字符串数据格式

5、提供者使用netty集成SpringBoot环境来实现需求

需求案例

客户端远程调用服务端提供一个根据id查询订单的方法

代码实现

1、服务端代码

  • 编写Rpc注解

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface Rpc {
    }
  • 编写接口类
    public interface IOrderService {Order getById(int id);
    }
  • 编写实现类
    @Service
    @Rpc
    public class OrderServiceImpl implements IOrderService {Map<Object, Order> orderMap = new HashMap();@Overridepublic Order getById(int id) {Order order1 = new Order();order1.setId(1);order1.setTitle("铅笔盒");Order order2 = new Order();order2.setId(1);order2.setTitle("A4");orderMap.put(order1.getId(), order1);orderMap.put(order2.getId(), order2);return orderMap.get(id);}
    }
  • 服务业务处理类RpcServerHandler
    @Component
    @ChannelHandler.Sharable
    public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {//本地缓存private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();/*** @Description: 将@Rpc修饰的bean缓存起来*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(Rpc.class);if (serviceMap != null && serviceMap.size() > 0) {Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();for (Map.Entry<String, Object> entry : entries) {Object entryValue = entry.getValue();if (entryValue.getClass().getInterfaces().length == 0) {throw new RuntimeException("服务必须实现接口");}//默认获取第一个接口作为缓存bean的名称String name = entryValue.getClass().getInterfaces()[0].getName();SERVICE_INSTANCE_MAP.put(name, entryValue);}}}/*** @Description: 通道读取就绪时间*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {//接收客户端请求,将msg装换为RpcRequest对象RpcRequest request = JSON.parseObject(s, RpcRequest.class);RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());try {//业务处理response.setResult(handler(request));} catch (Exception e) {e.printStackTrace();response.setError(e.getMessage());}//返回客户端channelHandlerContext.write(JSON.toJSONString(response));}/*** 业务处理逻辑 ** @return */public Object handler(RpcRequest rpcRequest) throws InvocationTargetException {// 3.根据传递过来的beanName从缓存中查找到对应的beanObject serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());if (serviceBean == null) {throw new RuntimeException("根据beanName找不到服务,beanName:" + rpcRequest.getClassName());}//4.解析请求中的方法名称. 参数类型 参数信息Class<?> serviceBeanClass = serviceBean.getClass();String methodName = rpcRequest.getMethodName();Class<?>[] parameterTypes = rpcRequest.getParameterTypes();Object[] parameters = rpcRequest.getParameters();//5.反射调用bean的方法- CGLIB反射调用FastClass fastClass = FastClass.create(serviceBeanClass);FastMethod method = fastClass.getMethod(methodName, parameterTypes);return method.invoke(serviceBean, parameters);}}
  • 编写Netty启动类RpcServer
    @Service
    public class RpcServer implements DisposableBean {private NioEventLoopGroup bossGroup;private NioEventLoopGroup workerGroup;@AutowiredRpcServerHandler rpcServerHandler;public void startServer(String ip, int port) {bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();//绑定端口try {//2.创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();//3.设置参数serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());//业务处理pipeline.addLast(rpcServerHandler);}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("====================服务端启动成功!=====================");channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}}}@Overridepublic void destroy() throws Exception {if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}}
    }
  • 编写启动类ServerBootstrapApplication 
    @SpringBootApplication
    public class ServerBootstrapApplication implements CommandLineRunner {@AutowiredRpcServer rpcServer;public static void main(String[] args) {SpringApplication.run(ServerBootstrapApplication.class, args);}@Overridepublic void run(String... args) throws Exception {new Thread(new Runnable() {@Overridepublic void run() {rpcServer.startServer("127.0.0.1",8099);}}).start();}
    }
  • 编写客户端业务处理类RpcClientHandler
    /*** 发送消息* 接收消息**/
    public class RpcClientHandler extends SimpleChannelInboundHandler<String> implements Callable {ChannelHandlerContext context;//客户端的消息String requestMsg;//服务端的消息String responseMsg;public void setRequestMsg(String requestMsg) {this.requestMsg = requestMsg;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {context = ctx;}/*** @Description: 通道准备读取事件*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {requestMsg = s;notify();}@Overridepublic Object call() throws Exception {//消息发送context.writeAndFlush(requestMsg);//线程等待wait();return responseMsg;}
    }
  • 编写客户端Netty启动类 
    /*** 客户端* 1.连接Netty服务端* 2.提供给调用者主动关闭资源的方法* 3.提供消息发送的方法*/
    public class RpcClient {private EventLoopGroup group;private Channel channel;private String ip;private int port;private RpcClientHandler rpcClientHandler = new RpcClientHandler();//线程池private ExecutorService executorService = Executors.newCachedThreadPool();public RpcClient(String ip, int port) {this.ip = ip;this.port = port;initClient();}public void initClient() {//创建线程组group = new NioEventLoopGroup();//创建启动助手Bootstrap bootstrap = new Bootstrap();//设置启动参数bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();//String类型编解码器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());//添加客户端处理类pipeline.addLast(rpcClientHandler);}});//4.连接Netty服务端try {channel = bootstrap.connect(ip, port).sync().channel();} catch (InterruptedException e) {e.printStackTrace();if (channel != null) {channel.close();}if (group != null) {group.shutdownGracefully();}}}/*** @Description: 提供给调用者主动关闭资源的方法*/public void close() {if (channel != null) {channel.close();}if (group != null) {group.shutdownGracefully();}}/*** @Description: 提供消息发送的方法*/public Object send(String msg) throws ExecutionException, InterruptedException {rpcClientHandler.setRequestMsg(msg);Future submit = executorService.submit(rpcClientHandler);return submit.get();}
    }
  • 编写Rpc代理类
    /*** 客户端代理类-创建代理对象* 1.封装request请求对象* 2.创建RpcClient对象* 3.发送消息* 4.返回结果* */
    public class RpcClientProxy {public static Object createProxy(Class cla) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{cla}, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//封装request请求对象RpcRequest request = new RpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);//创建RpcClient对象RpcClient rpcClient = new RpcClient("127.0.0.1", 8099);try {//发送消息Object requestMsg = rpcClient.send(JSON.toJSONString(request));RpcResponse response = JSON.parseObject(requestMsg.toString(), RpcResponse.class);if (response.getError() != null) {throw new RuntimeException(response.getError());}//返回结果Object result = response.getResult();return JSON.parseObject(result.toString(), method.getReturnType());} catch (Exception e) {throw e;} finally {rpcClient.close();}}});}
    }
  • 编写客户端启动类ClinetBootStrapApplication 
    public class ClinetBootStrapApplication {public static void main(String[] args) {IOrderService orderService=(IOrderService)RpcClientProxy.createProxy(IOrderService.class);Order order = orderService.getById(1);System.out.println(order);}
    }
  • 启动服务端,在启动客户端,观察日志 

到此就完成了使用Netty自定义RPC框架

RPC系列之Netty实现自定义RPC框架相关推荐

  1. Netty之自定义RPC

    需求分析 使用netty实现方法远程调用, 在client调用本地接口中方法时, 使用反射进行远程调用, server执行完结果后, 将执行结果进行封装, 发送到client RPC调用模型: 1. ...

  2. 【RPC】---- 基于Netty实现的RPC

    基于Netty实现的RPC 一.Netty服务端和客户端 1.服务端server 1.1 NettyServer 1.2 NettyServerHandler 2.客户端client 2.1 Nett ...

  3. 【专栏】RPC系列(实战)-负重前行的“动态代理”

    关注公众号:离心计划,一起离开地球表面  [RPC系列合集] [专栏]RPC系列(理论)-夜的第一章 [专栏]RPC系列(理论)-协议与序列化 [专栏]RPC系列(理论)-动态代理 [专栏]RPC系列 ...

  4. 手撕RPC系列(2)—客户端基于stub动态代理的RPC

    一.前言 二.原理 三.前置基础 四.举例说明 五.总结 一.前言 上一节 手撕RPC系列(1)-最原始的RPC通俗理解 中讲了一个最最简单的rpc思想的例子.那种方法的缺陷太多,平常写代码一般不会那 ...

  5. 还发愁项目经验吗?基于Netty实现分布式RPC框架[附完整代码]

    写给大家的话 最近我收到很多读者的来信,对如何学习分布式.如何进行项目实践和提高编程能力,存在很多疑问. 分布式那么难,怎么学?为什么看了那么多书还是掌握不了? 开源的框架比如Dubbo代码太多了,完 ...

  6. 分布式理论、架构设计(自定义RPC一 NIO NETTY)

    分布式理论.架构设计自定义RPC 第一部分-RPC框架设计 1. Socket回顾与I/0模型 1.1 Socket网络编程回顾 1.1.1 Socket概述 1.1.2 Socket整体流程 1.1 ...

  7. 自定义 RPC框架4——RMI+Zookeeper实现RPC框架

    准备工作 这次我们用RMI+Zookeeper实现一个远程调用的RPC框架,RMI实现远程调用,Zookeeper作为注册中心,具体的操作之前的文章都提到过,这里不再做过多赘述. 自定义 RPC框架2 ...

  8. java基础巩固-宇宙第一AiYWM:为了维持生计,手写RPC~Version07(RPC原理、序列化框架们、网络协议框架们 、RPC 能帮助我们做什么呢、RPC异常排查:ctrl+F搜超时)整起

    上次Version06说到了咱们手写迷你版RPC的大体流程, 对咱们的迷你版RPC的大体流程再做几点补充: 为什么要封装网络协议,别人说封装好咱们就要封装?Java有这个特性那咱就要用?好像是这样.看 ...

  9. 分布式理论、架构设计(自定义RPC)

    会不断更新!冲冲冲!跳转连接 https://blog.csdn.net/qq_35349982/category_10317485.html 分布式理论.架构设计(自定义RPC) 1.分布式架构 1 ...

最新文章

  1. 2019-03-20 Python爬取需要登录的有验证码的网站
  2. 「杂谈」如何系统性地学习生成对抗网络GAN
  3. graphic头文件函数_graphics.h头文件详解
  4. bootstrap缩小后div互相叠加_纯 JS 实现放大缩小拖拽踩坑之旅
  5. Django之路——6 Django的模型层(二)
  6. 团队项目个人进展——Day08
  7. iOS之深入解析Xcode的拼写检查
  8. Nacos深入浅出(三)
  9. 【渝粤教育】 国家开放大学2020年春季 1373特殊教育概论 参考试题
  10. sqlite3使用sqlite2创建的数据库
  11. java 获取文件大小_利用百度AI OCR图片识别,Java实现PDF中的图片转换成文字
  12. 【iCore4 双核心板_uC/OS-II】例程八:消息邮箱
  13. 【2014年计划】IT之路
  14. opengl 多边形线框_OpenGL - 在纹理多边形上创建边框
  15. 计算机视觉之边缘提取
  16. Android开发中长度单位简介
  17. 【Designing ML Systems】第 11 章 :机器学习的人性方面
  18. 【漏洞复现】绿盟BAS日志数据安全性分析系统未授权访问
  19. gps定位,根据经纬度;
  20. iOS开发之Objective-C(面试篇)-李飞-专题视频课程

热门文章

  1. Solidity基础四
  2. unicode UTF-8和GBK编码
  3. 个人爱好-摄影入门书籍思维整理
  4. 学计算机的感想300字,计算机毕业的自我鉴定范文300字(精选5篇)
  5. openGauss数据库备库重启报错,有没有解决方案呢
  6. [ 数据集 ] CIFAR-10 数据集介绍
  7. SAT OG阅读填空题真题词汇
  8. 特征工程到底是什么?
  9. 灼口综合征的症状和危害要找到解决办法
  10. Python之基础详解(十一):PTL各模块讲解:Image、ImageFilter、ImageChops、ImageColor、ImageEnhance、ImageOps、ImageDraw