写给大家的话

最近我收到很多读者的来信,对如何学习分布式、如何进行项目实践和提高编程能力,存在很多疑问。

分布式那么难,怎么学?为什么看了那么多书还是掌握不了?

开源的框架比如Dubbo代码太多了,完全一头雾水,应该怎么学习?

在学校没有项目经验,找工作在即,怎么把这块补起来?你还仅限于XXX管理系统么「面试官都疲劳了」?

我在学校的时候,也有和大家一样的困惑。

毕业去阿里工作了几年后,通过参与实际的项目开发,关于如何学习新知识、如何快速上手并应用有了一些体会,在这里给大家分享一下:

  • 一定要动手,动手写代码,实现demo,去debug,在调试的过程中学习。没有必要抱着大块头的书看完了以后再动手写代码,在实践中学习是最快速的方法。
  • 在学习框架的过程中,尽量先从框架的初始版本开始看,因为开源框架往往功能复杂,代码庞大,很容易劝退。比如学习linux内核,可以从早期版本开始看。
  • 「造轮子」。掌握知识最好的办法是去做项目、实现它。关于项目,很多推荐XXX管理系统的,我认为,此类XXX管理系统些在简历上目前大厂是一点竞争力都没有的,面试官都疲了,培训机构清一色的XXX管理系统,springboot全家桶。必须得差异化竞争
  • 我在这里给大家推荐几个优秀的项目,后面我也会逐个实现给大家:自己实现spring ioc/aop、RPC框架、MQ框架、KV存储、分布式锁。这些项目和互联网大厂技术栈无缝结合,通过自己实现分布式组件「也就是大家平时说的造轮子」,为什么要造轮子?一方面是避免成为调包侠或CRUD工程师,另一方面是提高自己的技术深度,让自己的职业道路更宽。

这一系列文章我目前已经写了5篇,后面会在本公众号陆续分享给大家,大家可以关注我的宫伀号【编程学习指南】追更。

本文非常值得点赞+收藏,因为内容非常多,包含完整的代码实现,真的是手把手教你们怎么实现。要想让自己的简历让面试官眼前一亮,这些项目肯定是加分项。

Github地址: (欢迎star)

https://github.com/xiajunhust/tinywheel/tree/main/RPC%20framework


分布式RPC框架,WHY?

RPC是指远程过程调用(Remote Procedure Call)。可以使得我们在分布式环境下调用远程服务像调用本机服务一样方便。在分布式应用中使用非常广泛。

有人会问:“有了开源的RPC框架,为什么要自己去实现?

RPC基本原理不难,但是在实际实现的过程中还是会遇到很多坑,涉及很多知识点:线程模型、通信协议设计、负载均衡、动态代理等。

通过自己动手实现的方式一个简易的RPC框架,包含RPC的核心功能「麻雀虽小五脏俱全」,可以检验自己对知识的掌握情况,学会在实际中灵活运用,加深理解。当然了,生产环境中,建议大家还是用成熟的开源框架。


RPC框架理论基础

BRUCE JAY NELSON在其1984年的论文《Implementing Remote Procedure Calls》中描述到,当我们在程序中发起RPC调用时,会涉及5个模块:

  • user:发起调用的应用模块,发起rpc调用 会和发起本地调用一样,不感知rpc底层逻辑。
  • user-stub:负责调用请求的打包以及结果的解包。
  • RPCRuntime:RPC运行时,负责处理远程网络交互,如网络包重传、加密等。
  • server-stub:负责请求的解包以及结果的打包。
  • server:真正提供服务处理的应用模块。

这5部分的关系如下图所示:

我的宫伀号【编程学习指南】有各种学习资料,关注即可在菜单栏领取


主流开源RPC框架

(1)dubbo:阿里巴巴出品的RPC框架,经历了电商海量场景的考验,github 36.7star。支持java语言。

官网:https://dubbo.apache.org/zh/

github:https://github.com/apache/dubbo

(2)grpc:谷歌开源rpc框架,支持多种语言。github star 33.2k。

官网:https://grpc.io/

github:https://github.com/grpc/grpc

(3)motan:新浪开源的rpc框架,仅支持java语言。

github:https://github.com/weibocom/motan

(4)spring cloud:Pivotal公司2014年对外开源的RPC框架,仅支持Java。

(5)brpc:百度开源的rpc框架,C++实现。

github:https://github.com/apache/incubator-brpc


RPC框架设计

整体结构如下图,给大家展示了一个「麻雀虽小五脏俱全」的RPC框架,去除了管控平台等辅助功能。通过对核心功能进行设计和实现,理解整个RPC框架的设计原理。

我的宫伀号【编程学习指南】有各种学习资料,关注即可在菜单栏领取

涉及核心技术:

  • 注册中心:服务端将发布的服务注册到注册中心,调用端从注册中心订阅服务,获得服务的地址,才能发起调用。
  • 分布式环境不同服务器之间需要通过网络通信(RPC client)。
  • 网络通信必然涉及到编解码
  • 避免每次寻址都需要调用注册中心,服务调用端还需要对服务信息进行缓存
  • 动态代理:方便对客户端调用透明化。

详细设计&技术实现

01

技术选型

  • spring-boot,依赖管理,强大的配置化能力。可以方便制作RPC框架的starter,集成使用起来非常便捷。
  • netty
  • zookeeper
  • protobuf

02

RPC调用流程分析

一次RPC调用整个过程,到底发生了什么事情呢?如下通过序列图的方式展示了详细步骤:

我的宫伀号【编程学习指南】有各种学习资料,关注即可在菜单栏领取

03

工程模块依赖

代码模块分层如下:

我的宫伀号【编程学习指南】有各种学习资料,关注即可在菜单栏领取

  • util:基础工具类。
  • model:基础领域模型。
  • annotation:注解。提供注解功能,可以非常方便的发布RPC服务和引用RPC服务。
  • registry:注册中心,给出了zk的实现。
  • io:编码和解码实现。
  • provider:服务提供者实现。
  • consumer:服务消费者实现。

代码包详细情况:

04

代码详细介绍

采用spring-boot框架。将RPC框架实现为一个starter,方便集成使用。

注解

为了方便使用此RPC框架,我们通过定义注解,让使用者能直接通过一行注解进行服务的发布和引用。

/*** RPC provider注解*/
@Retention(RetentionPolicy.RUNTIME)
//注解打在类上
@Target(ElementType.TYPE)
@Component
public @interface SimpleRpcProvider {Class<?> serviceInterface() default Object.class;String serviceVersion() default "1.0.0";
}/*** RPC consumer** @author summer* @version $Id: SimpleRpcProviderBean.java, v 0.1 2022年01月16日 11:53 AM summer Exp $*/
@Retention(RetentionPolicy.RUNTIME)
//注解打在属性上
@Target(ElementType.FIELD)
@Component
public @interface SimpleRpcConsumer {/*** 服务版本号* @return*/String serviceVersion() default "1.0.0";/*** 注册中心类型-默认zk* @return*/String registerType() default "zookeeper";/*** 注册中心地址* @return*/String registerAddress() default "127.0.0.1:2181";
}

注册中心

常见的注册中心有很多种,比如zookepper、eureka、nacos、consul等。注册中心的原理不是本文的重点,因此不做详细描述。

我的宫伀号【编程学习指南】有各种学习资料,关注即可在菜单栏领取

此处采用zookeeper的实现,有兴趣的童鞋可以自行进行其他实现,只需要实现一个子类即可。

/*** 注册中心服务接口定义*/
public interface ServiceRegistry {/*** 注册服务** @param serviceMetaConfig 服务元数据配置* @throws Exception*/void register(ServiceMetaConfig serviceMetaConfig) throws Exception;/*** 取消注册服务** @param serviceMetaConfig 服务元数据配置* @throws Exception*/void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception;/*** 服务发现** @param serviceName 服务名* @return* @throws Exception*/ServiceMetaConfig discovery(String serviceName) throws Exception;
}

zk实现(采用curator):

import com.summer.simplerpc.registry.cache.ServiceProviderCache;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.util.ServiceUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.*;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;/*** 服务注册中心-zk实现*/
public class ZkServiceRegistry implements ServiceRegistry {/*** zk base path*/private final static String ZK_BASE_PATH = "/simplerpc";/*** serviceProvider锁*/private final Object lock = new Object();/*** zk framework client*/private CuratorFramework client;/*** 服务发现*/private ServiceDiscovery<ServiceMetaConfig> serviceDiscovery;/*** serviceProvider缓存*/private ServiceProviderCache serviceProviderCache;/*** 构造函数** @param address 地址*/public ZkServiceRegistry(String address, ServiceProviderCache serviceProviderCache) throws Exception {this.client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));this.client.start();this.serviceProviderCache = serviceProviderCache;JsonInstanceSerializer<ServiceMetaConfig> serializer = new JsonInstanceSerializer<>(ServiceMetaConfig.class);serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaConfig.class).client(client).serializer(serializer).basePath(ZK_BASE_PATH).build();serviceDiscovery.start();}@Overridepublic void register(ServiceMetaConfig serviceMetaConfig) throws Exception {ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder();ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder.name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion())).address(serviceMetaConfig.getAddress()).port(serviceMetaConfig.getPort()).payload(serviceMetaConfig).uriSpec(new UriSpec("{scheme}://{address}:{port}")).build();serviceDiscovery.registerService(serviceInstance);}@Overridepublic void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception {ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder();ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder.name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion())).address(serviceMetaConfig.getAddress()).port(serviceMetaConfig.getPort()).payload(serviceMetaConfig).uriSpec(new UriSpec("{scheme}://{address}:{port}")).build();serviceDiscovery.unregisterService(serviceInstance);}@Overridepublic ServiceMetaConfig discovery(String serviceName) throws Exception {//先读缓存ServiceProvider<ServiceMetaConfig> serviceProvider = serviceProviderCache.queryCache(serviceName);//缓存miss,需要调serviceDiscoveryif (serviceProvider == null) {synchronized (lock) {serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).providerStrategy(new RoundRobinStrategy<>()).build();serviceProvider.start();//更新缓存serviceProviderCache.updateCache(serviceName, serviceProvider);}}ServiceInstance<ServiceMetaConfig> serviceInstance = serviceProvider.getInstance();return serviceInstance != null ? serviceInstance.getPayload() : null;}
}

核心领域模型和本地缓存:

/*** 服务元数据配置领域模型*/
@Data
public class ServiceMetaConfig {/*** 服务名*/private String name;/*** 服务版本*/private String version;/*** 服务地址*/private String address;/*** 服务端口*/private Integer port;
}/**** @author summer* @version $Id: ServiceProviderCache.java, v 0.1 2022年01月16日 11:41 AM summer Exp $*/
public interface ServiceProviderCache {/*** 查询缓存* @param serviceName* @return*/ServiceProvider<ServiceMetaConfig> queryCache(String serviceName);/*** 更新缓存** @param serviceName 服务名* @param serviceProvider 服务provider* @return*/void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider);
}/*** 本地缓存实现** @author summer* @version $Id: ServiceProviderLocalCache.java, v 0.1 2022年01月16日 11:43 AM summer Exp $*/
public class ServiceProviderLocalCache implements ServiceProviderCache {/*** 本地缓存map*/private Map<String, ServiceProvider<ServiceMetaConfig>> serviceProviderMap = new ConcurrentHashMap<>();@Overridepublic ServiceProvider<ServiceMetaConfig> queryCache(String serviceName) {return serviceProviderMap.get(serviceName);}@Overridepublic void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider) {serviceProviderMap.put(serviceName, serviceProvider);}
}

服务提供方

我前面提到过,在实际使用的时候会通过注解的方式来发布服务。那么,我们需要在bean初始化后去扫描带SimpleRpcProvider注解的bean,将服务注册到注册中心。另外,我们还需要在初始化后启动netty服务端。因此,我定义服务提供方bean实现SimpleRpcProviderBean,继承InitializingBean、BeanPostProcessor:

  • 在postProcessAfterInitialization方法中判断bean是否带SimpleRpcProvider注解,如果是则解析服务信息,注册到注册中心。
  • 在afterPropertiesSet方法中启动netty服务端。
  • 接收服务调用请求,通过动态代理执行实际调用
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.summer.simplerpc.annotation.SimpleRpcProvider;
import com.summer.simplerpc.io.RPCDecoder;
import com.summer.simplerpc.io.RPCEncoder;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;import java.util.Map;
import java.util.concurrent.*;/*** rpc provider功能实现。** 负责扫描服务provider注解bean,注册服务到注册中心,启动netty监听。* 提供RPC请求实际处理。*/
@Slf4j
public class SimpleRpcProviderBean implements InitializingBean, BeanPostProcessor {/*** 地址*/private String          address;/*** 服务注册中心*/private ServiceRegistry serviceRegistry;/*** 服务提供bean的缓存map*/private Map<String, Object> providerBeanMap = new ConcurrentHashMap<>(64);/*** 处理实际rpc请求的线程池*/private static ThreadPoolExecutor rpcThreadPoolExecutor;private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simplerpc-provider-pool-%d").build();/*** netty相关*/private EventLoopGroup bossGroup   = null;private EventLoopGroup workerGroup = null;/*** 构造函数** @param address 地址* @param serviceRegistry 服务注册中心*/public SimpleRpcProviderBean(String address, ServiceRegistry serviceRegistry) {this.address = address;this.serviceRegistry = serviceRegistry;}@Overridepublic void afterPropertiesSet() throws Exception {//启动netty服务监听new Thread(() -> {try {startNettyServer();} catch (InterruptedException e) {log.error("startNettyServer exception,", e);}}).start();}/*** 提交rpc处理任务** @param task 任务*/public static void submit(Runnable task) {if (rpcThreadPoolExecutor == null) {synchronized (SimpleRpcProviderBean.class) {if (rpcThreadPoolExecutor == null) {rpcThreadPoolExecutor = new ThreadPoolExecutor(100, 100,600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),threadFactory);}}}rpcThreadPoolExecutor.submit(task);}/*** 启动netty服务监听** @throws InterruptedException*/private void startNettyServer() throws InterruptedException {if (workerGroup != null && bossGroup != null) {return;}log.info("startNettyServer begin");bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,4,0,0)).addLast(new RPCDecoder()).addLast(new RPCEncoder()).addLast(new SimpleRpcProviderNettyHandler(providerBeanMap));}}).option(ChannelOption.SO_BACKLOG, 512).childOption(ChannelOption.SO_KEEPALIVE, true);String[] array = address.split(":");String host = array[0];int port = Integer.parseInt(array[1]);//启动服务ChannelFuture future = serverBootstrap.bind(host, port).sync();log.info(String.format("startNettyServer,host=%s,port=%s", host, port));future.channel().closeFuture().sync();}@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {//获取bean上的注解SimpleRpcProvider simpleRpcProvider = bean.getClass().getAnnotation(SimpleRpcProvider.class);if (simpleRpcProvider == null) {//无注解直接return原始的beanreturn bean;}//缓存保存String serviceName = simpleRpcProvider.serviceInterface().getName();String version = simpleRpcProvider.serviceVersion();providerBeanMap.put(ServiceUtils.buildServiceKey(serviceName, version), bean);log.info("postProcessAfterInitialization find a simpleRpcProvider[" + serviceName + "," + version + "]");//将服务注册到注册中心String[] addressArray = address.split(ServiceUtils.SPLIT_CHAR);String host = addressArray[0];String port = addressArray[1];ServiceMetaConfig serviceMetaConfig = new ServiceMetaConfig();serviceMetaConfig.setAddress(host);serviceMetaConfig.setName(serviceName);serviceMetaConfig.setVersion(version);serviceMetaConfig.setPort(Integer.parseInt(port));try {serviceRegistry.register(serviceMetaConfig);log.info("register service success,serviceMetaConfig=" + serviceMetaConfig.toString());} catch (Exception e) {log.error("register service fail,serviceMetaConfig=" + serviceMetaConfig.toString(), e);}return bean;}
}

netty ChannelPipeline设计:

  • LengthFieldBasedFrameDecoder:解码器,解决自定义长度TCP粘包问题
  • RPCDecoder:解码器,解析出RPC请求参数对象
  • SimpleRpcProviderNettyHandler:实际的RPC请求处理逻辑,接收请求参数,返回RPC响应结果
  • RPCEncoder:编码器,将RPC响应结果编码序列化,返回

RPC核心逻辑处理handler-SimpleRpcProviderNettyHandler

import com.summer.simplerpc.model.SimpleRpcRequest;
import com.summer.simplerpc.model.SimpleRpcResponse;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cglib.reflect.FastClass;
import java.util.Map;@Slf4j
public class SimpleRpcProviderNettyHandler extends SimpleChannelInboundHandler<SimpleRpcRequest> {/*** 提供rpc服务的实例缓存map*/private Map<String, Object> handlerMap;/*** 构造函数** @param handlerMap*/public SimpleRpcProviderNettyHandler(Map<String, Object> handlerMap) {this.handlerMap = handlerMap;}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcRequest simpleRpcRequest) throws Exception {SimpleRpcProviderBean.submit(() -> {log.debug("Receive rpc request {}", simpleRpcRequest.getBizNO());SimpleRpcResponse simpleRpcResponse = new SimpleRpcResponse();simpleRpcResponse.setBizNO(simpleRpcRequest.getBizNO());try {Object result = doHandle(simpleRpcRequest);simpleRpcResponse.setData(result);} catch (Throwable throwable) {simpleRpcResponse.setMsg(throwable.toString());log.error("handle rpc request error", throwable);}channelHandlerContext.writeAndFlush(simpleRpcResponse).addListener((ChannelFutureListener) channelFuture ->log.info("return response for request " + simpleRpcRequest.getBizNO() + ",simpleRpcResponse=" + simpleRpcResponse));});}/*** 通过反射,执行实际的rpc请求* @param simpleRpcRequest* @return*/private Object doHandle(SimpleRpcRequest simpleRpcRequest) throws Exception {String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());if (handlerMap == null || handlerMap.get(key) == null) {log.error("doHandle,the provider {0} not exist,", simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());throw new RuntimeException("the provider not exist");}log.info("doHandle,simpleRpcRequest=" + simpleRpcRequest.toString());Object provider = handlerMap.get(key);//通过动态代理执行实际的调用FastClass fastClass = FastClass.create(provider.getClass());return fastClass.invoke(fastClass.getIndex(simpleRpcRequest.getMethodName(), simpleRpcRequest.getParamTypes()),provider, simpleRpcRequest.getParamValues());}
}

前面我提到过,我实现的是一个框架,需要很方便被集成和使用,因此会实现为一个springboot的starter:

import com.summer.simplerpc.model.RpcCommonProperty;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.cache.ServiceProviderCache;
import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;
import com.summer.simplerpc.registry.zk.ZkServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class SimplerRpcProviderAutoConfiguration {@Beanpublic SimpleRpcProviderBean initRpcProvider() throws Exception {RpcCommonProperty rpcCommonProperty = new RpcCommonProperty();rpcCommonProperty.setServiceAddress("127.0.0.1:50001");rpcCommonProperty.setRegistryAddress("127.0.0.1:2181");log.info("===================SimplerRpcProviderAutoConfiguration init,rpcCommonProperty=" + rpcCommonProperty.toString());ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache();ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(rpcCommonProperty.getRegistryAddress(), serviceProviderCache);return new SimpleRpcProviderBean(rpcCommonProperty.getServiceAddress(), zkServiceRegistry);}
}

IO

IO主要是序列化和反序列化,常见的序列化工具有很多,这里采用Hessian,对于不同序列化工具的详细比对这里不做赘述,后续单独开章节讲述。

服务端和消费端分别会实现编码器和解码器,加入到netty的ChannelPipeline中,具体见服务端和消费端讲解。

服务消费方

使用此框架进行服务消费,同样是通过注解,将注解打在一个bean上,那么则完成了对一个服务的引用。可以像直接使用本地bean一样发起RPC调用。其他操作都由RPC框架来实现:

  • 扫描所有带SimpleRpcConsumer注解的bean
  • 重定义BeanDefinition,使用代理类重新注入spring容器
  • 发起RPC服务调用,从本地缓存或注册中心拿到远端服务详情,发起网络调用
  • 获取服务返回结果

SimpleRpcConsumer注解

import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** RPC consumer注解*/
@Retention(RetentionPolicy.RUNTIME)
//注解打在属性上
@Target(ElementType.FIELD)
@Component
public @interface SimpleRpcConsumer {/*** 服务版本号* @return*/String serviceVersion() default "1.0.0";/*** 注册中心类型-默认zk* @return*/String registerType() default "zookeeper";/*** 注册中心地址* @return*/String registerAddress() default "127.0.0.1:2181";
}

生成代理类的FactoryBean:

import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.cache.ServiceProviderCache;
import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;
import com.summer.simplerpc.registry.zk.ZkServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.FactoryBean;import java.lang.reflect.Proxy;/*** 生成rpc consumer代理bean的FactoryBean*/
@Slf4j
public class SimpleRpcConsumerFactoryBean implements FactoryBean {/*** 调用的服务接口类*/private Class<?> interfaceClass;/*** 服务版本号*/private String serviceVersion;/*** 注册中心类型*/private String registryType;/*** 注册中心地址*/private String registryAddress;/*** 实际的bean*/private Object object;/*** init方法,通过动态代理生成bean** @throws Exception*/public void init() throws Exception {ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache();ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(registryAddress, serviceProviderCache);//动态代理this.object = Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass},new SimpleRpcInvokeHandler<>(this.serviceVersion, zkServiceRegistry));log.info("SimpleRpcConsumerFactoryBean getObject {}", interfaceClass.getName());}/*** 返回创建的bean实例** @return* @throws Exception*/@Overridepublic Object getObject() throws Exception {return this.object;}/*** 创建的bean实例的类型** @return*/@Overridepublic Class<?> getObjectType() {return interfaceClass;}/*** 创建的bean实例的作用域** @return*/@Overridepublic boolean isSingleton() {return true;}public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}public void setServiceVersion(String serviceVersion) {this.serviceVersion = serviceVersion;}public void setRegistryType(String registryType) {this.registryType = registryType;}public void setRegistryAddress(String registryAddress) {this.registryAddress = registryAddress;}
}

SimpleRpcInvokeHandler-执行实际网络调用的Handler:

import com.summer.simplerpc.model.SimpleRpcRequest;
import com.summer.simplerpc.model.SimpleRpcResponse;
import com.summer.simplerpc.registry.ServiceRegistry;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;/*** RPC调用动态代理handler实现*/
@Slf4j
public class SimpleRpcInvokeHandler<T> implements InvocationHandler {/*** 服务版本号*/private String serviceVersion;/*** 注册中心*/private ServiceRegistry serviceRegistry;/*** 默认构造函数*/public SimpleRpcInvokeHandler() {}public SimpleRpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) {this.serviceVersion = serviceVersion;this.serviceRegistry = serviceRegistry;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {SimpleRpcRequest simpleRpcRequest = new SimpleRpcRequest();simpleRpcRequest.setBizNO(UUID.randomUUID().toString());simpleRpcRequest.setClassName(method.getDeclaringClass().getName());simpleRpcRequest.setServiceVersion(this.serviceVersion);simpleRpcRequest.setMethodName(method.getName());simpleRpcRequest.setParamTypes(method.getParameterTypes());simpleRpcRequest.setParamValues(args);log.info("begin simpleRpcRequest=" + simpleRpcRequest.toString());SimpleRpcConsumerNettyHandler simpleRpcConsumerNettyHandler = new SimpleRpcConsumerNettyHandler(this.serviceRegistry);SimpleRpcResponse simpleRpcResponse = simpleRpcConsumerNettyHandler.sendRpcRequest(simpleRpcRequest);log.info("result simpleRpcResponse=" + simpleRpcResponse);return simpleRpcResponse.getData();}
}

由SimpleRpcConsumerNettyHandler发起netty网络调用,客户端的netty ChannelPipeline比服务端简单:

核心在于SimpleRpcConsumerNettyHandler:

import com.summer.simplerpc.io.RPCDecoder;
import com.summer.simplerpc.io.RPCEncoder;
import com.summer.simplerpc.model.SimpleRpcRequest;
import com.summer.simplerpc.model.SimpleRpcResponse;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;/*** consumer netty handler*/
@Slf4j
public class SimpleRpcConsumerNettyHandler extends SimpleChannelInboundHandler<SimpleRpcResponse> {/*** 注册中心*/private ServiceRegistry serviceRegistry;/*** netty EventLoopGroup*/private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);/*** netty channel*/private Channel channel;/*** rpc response*/private SimpleRpcResponse rpcResponse;/*** lock*/private final Object lock = new Object();/*** 构造函数** @param serviceRegistry*/public SimpleRpcConsumerNettyHandler(ServiceRegistry serviceRegistry) {this.serviceRegistry = serviceRegistry;}/*** 发起RPC网络调用请求** @param simpleRpcRequest 请求参数* @return*/public SimpleRpcResponse sendRpcRequest(SimpleRpcRequest simpleRpcRequest) {try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RPCEncoder()).addLast(new RPCDecoder())//通过.class获取此类型的实例(https://www.cnblogs.com/penglee/p/3993033.html).addLast(SimpleRpcConsumerNettyHandler.this);}});String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());ServiceMetaConfig serviceMetaConfig = this.serviceRegistry.discovery(key);if (serviceMetaConfig == null) {log.error("sendRpcRequest fail,serviceMetaConfig not found");throw new Exception("serviceMetaConfig not found in registry");}log.info("sendRpcRequest begin,serviceMetaConfig=" + serviceMetaConfig.toString() + ",key=" + key);final ChannelFuture channelFuture = bootstrap.connect(serviceMetaConfig.getAddress(), serviceMetaConfig.getPort()).sync();channelFuture.addListener((ChannelFutureListener)args0 -> {if (channelFuture.isSuccess()) {log.info("rpc invoke success,");} else {log.info("rpc invoke fail," + channelFuture.cause().getStackTrace());eventLoopGroup.shutdownGracefully();}});this.channel = channelFuture.channel();this.channel.writeAndFlush(simpleRpcRequest).sync();synchronized (this.lock) {log.info("sendRpcRequest lock.wait");this.lock.wait();}log.info("get rpc response=" + rpcResponse.toString());return this.rpcResponse;} catch (Exception e) {log.error("sendRpcRequest exception,", e);return null;} finally {//关闭相关连接if (this.channel != null) {this.channel.close();}if (this.eventLoopGroup != null) {this.eventLoopGroup.shutdownGracefully();}}}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcResponse simpleRpcResponse) throws Exception {this.rpcResponse = simpleRpcResponse;log.info("rpc consumer netty handler,channelRead0,rpcResponse=" + rpcResponse);//收到远程网络的rpc response,通知调用端synchronized (lock) {log.info("channelRead0 simpleRpcResponse lock.notifyAll");lock.notifyAll();}}
}

starter定义:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** rpc consumer starter*/
@Configuration
@Slf4j
public class SimplerConsumerAutoConfiguration {@Beanpublic static BeanFactoryPostProcessor initRpcConsumer() throws Exception {return new SimpleRpcConsumerPostProcessor();}
}

05

RPC框架的集成和使用

上述RPC框架代码,通过springboot打包安装到本地mvn仓库,然后新建一个springboot工程来集成和测试。

mvn依赖:

<dependency><groupId>com.summer</groupId><artifactId>simplerpc-starter</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>

用来测试的服务很简单,参数是一个String,然后服务端会构造返回值:入参拼接上一个随机UUUID字符串。

服务定义和实现:

/*** 服务接口定义*/
public interface HelloworldService {/*** 示例方法* @param param* @return*/String buildHelloworld(String param);
}

服务实现:

import com.summer.simplerpc.annotation.SimpleRpcProvider;
import com.summer.simplerpctest.consumer.HelloworldService;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;/*** HelloworldService接口实现*/
@SimpleRpcProvider(serviceInterface=HelloworldService.class)
@Slf4j
public class HelloworldServiceImpl implements HelloworldService {@Overridepublic String buildHelloworld(String param) {log.info("HelloworldServiceImpl begin");return param + "_" + UUID.randomUUID().toString();}
}

我们定义一个bean,在其中发起对RPC服务的调用:

import com.summer.simplerpc.annotation.SimpleRpcConsumer;
import com.summer.simplerpctest.consumer.HelloworldService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;/*** 发起对HelloWorldService调用示例*/
@Slf4j
@Component
public class ConsumerSample {@SimpleRpcConsumer@Resourceprivate HelloworldService helloworldService;public String invokeHelloworldService() {String result = helloworldService.buildHelloworld("qwert");return result;}
}

然后我们开一个Controller,启动springboot工程,这样我们在浏览器中直接发起测试即可:

import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** 测试controller** url:http://127.0.0.1:8004/helloworld/do*/
@RestController
@RequestMapping("/helloworld")
public class TestController {@Resourceprivate ConsumerSample consumerSample;@GetMapping(value = "/do")public String say(){String helloServiceRes = consumerSample.invokeHelloworldService();return helloServiceRes;}
}

只需要在浏览器中输入如下url,则可以发起对rpc服务的调用:

http://127.0.0.1:8004/helloworld/do

注意一些前置工作:

  • 需要启动zk。

IDEA控制台日志打印:

2022-01-25 09:27:22.581  INFO 30366 --- [nio-8004-exec-1] c.s.s.consumer.SimpleRpcInvokeHandler    : begin simpleRpcRequest=SimpleRpcRequest(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, className=com.summer.simplerpctest.consumer.HelloworldService, methodName=buildHelloworld, serviceVersion=1.0.0, paramTypes=[class java.lang.String], paramValues=[qwert])
2022-01-25 09:27:22.698  INFO 30366 --- [nio-8004-exec-1] c.s.s.c.SimpleRpcConsumerNettyHandler    : sendRpcRequest begin,serviceMetaConfig=ServiceMetaConfig(name=com.summer.simplerpctest.consumer.HelloworldService, version=1.0.0, address=127.0.0.1, port=50001),key=com.summer.simplerpctest.consumer.HelloworldService:1.0.0
2022-01-25 09:27:22.715  INFO 30366 --- [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler    : rpc invoke success,
2022-01-25 09:27:22.759  INFO 30366 --- [nio-8004-exec-1] c.s.s.c.SimpleRpcConsumerNettyHandler    : sendRpcRequest lock.wait
2022-01-25 09:27:22.771  INFO 30366 --- [provider-pool-0] c.s.s.p.SimpleRpcProviderNettyHandler    : doHandle,simpleRpcRequest=SimpleRpcRequest(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, className=com.summer.simplerpctest.consumer.HelloworldService, methodName=buildHelloworld, serviceVersion=1.0.0, paramTypes=[class java.lang.String], paramValues=[qwert])
2022-01-25 09:27:22.772  INFO 30366 --- [provider-pool-0] c.s.s.provider.HelloworldServiceImpl     : HelloworldServiceImpl begin
2022-01-25 09:27:22.774  INFO 30366 --- [ntLoopGroup-3-1] c.s.s.p.SimpleRpcProviderNettyHandler    : return response for request 46154373-2cf7-4731-b4c0-208d6ca28b87,simpleRpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
2022-01-25 09:27:22.774  INFO 30366 --- [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler    : rpc consumer netty handler,channelRead0,rpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
2022-01-25 09:27:22.775  INFO 30366 --- [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler    : channelRead0 simpleRpcResponse lock.notifyAll
2022-01-25 09:27:22.775  INFO 30366 --- [nio-8004-exec-1] c.s.s.c.SimpleRpcConsumerNettyHandler    : get rpc response=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
2022-01-25 09:27:22.776  INFO 30366 --- [nio-8004-exec-1] c.s.s.consumer.SimpleRpcInvokeHandler    : result simpleRpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)

这篇文章花了我挺长时间写出来的,看看这些详细的代码实现,你们应该能感受到我想把你教会的诚意了吧~~~

如果觉得有用的话,点赞+转发+收藏,一键三连防止走丢哇~

还发愁项目经验吗?基于Netty实现分布式RPC框架[附完整代码]相关推荐

  1. 基于Opencv的虚拟键盘(附完整代码及报告)

    用到的库:opencv.cvzone.pynput 一.从Opencv到计算机视觉领域: Opencv是传统计算机视觉库,OpenCV用C++语言编写,它具有C ++,Python,Java和MATL ...

  2. 基于MATLAB的求解线性方程组(附完整代码和例题)

    目录 前言 一. 直接求解:矩阵除法 例题1 例题2 例题3 二. 直接求解:判断求解 2.1 m=n且rank(A)=rank(C)=n 2.2 rank(A)=rank(C)=r<> ...

  3. 基于Opencv-python人脸口罩检测(附完整代码)

    目录 一.开发环境 二.设计要求 三.设计原理 四.程序代码 五.结果展示 六.结论 一.开发环境 python 3.6.6 opencv-python 4.5.1 二.设计要求 · 1.使用open ...

  4. 基于MATLAB计算MIMO信道容量(附完整代码与分析)

    目录 一.介绍 二. 代码 三. 运行结果及分析 3.1  MIMO信道容量:固定发射天线数为4 3.2 MIMO信道容量:固定接收天线数为4 3.3 AWGN信道与瑞利信道容量 四. 总结 一.介绍 ...

  5. Matlab深度学习入门实例:基于AlexNet的红绿灯识别(附完整代码)

    AlexNet于2012年出现在ImageNet的图像分类比赛中,并取得了当年冠军,从此卷积神经网络开始受到人们的强烈关注.AlexNet是深度卷积神经网络研究热潮的开端,也是研究热点从传统视觉方法过 ...

  6. 【通信】基于Matlab实现延时波束形成附完整代码

    1 内容介绍 现代社会发展要求通信系统功能越来越强,性能越来越高,构成越来越复杂;另一方面,要求通信系统技术研究和产品开发缩短周期,降低成本,提高水平.这样尖锐对立的两个方面的要求,只有通过使用强大的 ...

  7. 基于Netty的分布式聊天系统

    基于Netty的分布式聊天系统 Gitee地址:https://gitee.com/yuyuuyuy/micro-mall 文章目录 基于Netty的分布式聊天系统 前言 一.IM系统架构的探讨 二. ...

  8. 【RPC】---- 基于Netty实现的RPC

    基于Netty实现的RPC 一.Netty服务端和客户端 1.服务端server 1.1 NettyServer 1.2 NettyServerHandler 2.客户端client 2.1 Nett ...

  9. 基于Netty手工实现springMVC框架-----两种方式加载控制器

    1.手写springMVC框架 本篇我们通过两种方式来加载控制器,一种是配置文件的方式:另外一种是通过注解的形式. 1.配置文件方式 1.自定义Controller配置文件XML 我定义的格式如下: ...

最新文章

  1. Rails 定时任务——whenever实现周期性任务
  2. JAX-RS 2.0:自定义内容处理
  3. boost 线程 linux,Boost Linux线程第一课
  4. 无法打开文件“python35_d.lib”
  5. 中间语言MicroSoft Intermediate Language(MSIL)
  6. c语言在中职的作用,C语言程序下的中职教学论文
  7. android访问setting权限,如何获得我的Android应用程序的可怕WRITE_SECURE_SETTINGS权限?...
  8. set vue 修改整个对象值_Vue修改对象或数据,页面没有相应更改
  9. Office | Office365 离线安装包选择安装word、ppt、excel
  10. 搭建Android开发环境——Eclipse
  11. OpenCV-Python图片叠加与融合,cv2.add与cv2.addWeighted的区别
  12. 2012年托福听力真题词汇总结
  13. 吞食天地2蜀汉英雄传1.5版图文攻略
  14. 一维码二维码识别(opencv c++)
  15. 离散数学实验(二)等值演算
  16. Ubuntu内核版本的降级
  17. 网狐荣耀斗地主等15合1(美女图)
  18. 论文阅读-Attention Bottlenecks for Multimodal Fusion(多模态特征融合)
  19. 服务器自带ddos工具,详解DDoS工具 一款流行DDoS木马工具
  20. settings.xml详解

热门文章

  1. add_argument函数action参数的store_true==》在运行程序添加参数时直接输入变量名,可以省略对应的默认值True或者False
  2. 湖北大学数学与计算机科学学院,2017年湖北大学数学与计算机科学学院811数据结构考研题库...
  3. 解决layui隐藏域:不显示问题(含案例、代码、截图)
  4. Eslint 配置 + 规则说明 - 综合引入篇
  5. 【zblog模板】随然响应式导航网址目录主题
  6. 二开版彩虹易支付全开源10套模板带风控实名系统源码
  7. 玫曦音乐播放器开源源码
  8. fidde调试手机_实操:手机上用Fiddler调试页面(嘎)
  9. 计算机代码图表,微信小程序图表插件(wx-charts)实例代码
  10. java 数组构造_java – 从数组构造(非二进制)树