1.简介

Pigeon是一个分布式服务通信框架(RPC),在美团点评内部广泛使用,是美团点评最基础的底层框架之一

2.主要特色

除了支持spring schema等配置方式,也支持代码annotation方式发布服务、引用远程服务,并提供原生api接口的用法。

支持http协议,方便非java应用调用pigeon的服务。

序列化方式除了hessian,还支持thrift等。

提供了服务器单机控制台pigeon-console,包含单机服务测试工具。

创新的客户端路由策略,提供服务预热功能,解决线上流量大的service重启时大量超时的问题。

记录每个请求的对象大小、返回对象大小等监控信息。

服务端可对方法设置单独的线程池进行服务隔离,可配置客户端应用的最大并发数进行限流

github地址:https://github.com/dianping/pigeon

3.组成模块

1.pigeon-common

2.pigeon-config

3.pigeon-console

4.pigeon-extensions

5.pigeon-monitor

6.pigeon-registry

7.pigeon-remoting

4.pigeon-remoting模块

描述:具体实现各个通信模块

4.1 Netty通道(DefaultNettyChannel)

类图如下:

生命周期:

4.2 编解码器(codec)

备注:各个具体Serializer实现不同方法来序列化和反序列化对应类型

4.3 Config初始化

自定义SpringApplicationRefreshListener对象AnnotationBean,实现BeanFactoryPostProcessor,BeanPostProcessor等接口

类型定义:

public class AnnotationBean extends ServiceInitializeListener implements DisposableBean,BeanFactoryPostProcessor, BeanPostProcessor, ApplicationContextAware

postProcessBeanFactory:添加“pigeon.provider.interface.packages”配置项配置的包路径到ApplicationContext扫描内容中

public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {if (annotationPackage == null || annotationPackage.length() == 0) {return;}if (beanFactory instanceof BeanDefinitionRegistry) {try {// init scannerClass<?> scannerClass = ClassUtils.loadClass("org.springframework.context.annotation.ClassPathBeanDefinitionScanner");Object scanner = scannerClass.getConstructor(new Class<?>[] { BeanDefinitionRegistry.class, boolean.class }).newInstance(new Object[] { (BeanDefinitionRegistry) beanFactory, true });// add filterClass<?> filterClass = ClassUtils.loadClass("org.springframework.core.type.filter.AnnotationTypeFilter");Object filter = filterClass.getConstructor(Class.class).newInstance(Service.class);Method addIncludeFilter = scannerClass.getMethod("addIncludeFilter",ClassUtils.loadClass("org.springframework.core.type.filter.TypeFilter"));addIncludeFilter.invoke(scanner, filter);// scan packagesString[] packages = Constants.COMMA_SPLIT_PATTERN.split(annotationPackage);Method scan = scannerClass.getMethod("scan", new Class<?>[] { String[].class });scan.invoke(scanner, new Object[] { packages });} catch (Throwable e) {// spring 2.0}}}

postProcessAfterInitialization:根据实现Service注解的bean对象生成ProviderConfig对象,并添加到ServiceFactory Map

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> beanClass = AopUtils.getTargetClass(bean);if (beanClass == null || !isMatchPackage(beanClass.getName())) {return bean;}Service service = beanClass.getAnnotation(Service.class);if (service != null) {Class serviceInterface = service.interfaceClass();if (void.class.equals(service.interfaceClass())) {serviceInterface = ServiceConfigUtils.getServiceInterface(beanClass);}if (serviceInterface == null) {serviceInterface = beanClass;}ProviderConfig<Object> providerConfig = new ProviderConfig<Object>(serviceInterface, bean);providerConfig.setService(bean);providerConfig.setUrl(service.url());providerConfig.setVersion(service.version());providerConfig.setSharedPool(service.useSharedPool());providerConfig.setActives(service.actives());ServerConfig serverConfig = new ServerConfig();serverConfig.setPort(getDefaultPort(service.port()));serverConfig.setSuffix(service.group());serverConfig.setAutoSelectPort(service.autoSelectPort());providerConfig.setServerConfig(serverConfig);ServiceFactory.addService(providerConfig);}postProcessBeforeInitialization(bean, beanName);return bean;}

postProcessBeforeInitialization:使用根据Reference注解生成的InvokerConfig对象做为键,查找出ServiceFactory Map对应的Service值,并将Service值给Spring Bean对象中声明了Reference注解的字段

public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (!isMatchPackage(bean.getClass().getName())) {return bean;}Method[] methods = bean.getClass().getMethods();for (Method method : methods) {String name = method.getName();if (name.length() > 3 && name.startsWith("set") && method.getParameterTypes().length == 1&& Modifier.isPublic(method.getModifiers()) && !Modifier.isStatic(method.getModifiers())) {try {Reference reference = method.getAnnotation(Reference.class);if (reference != null) {Object value = refer(reference, method.getParameterTypes()[0]);if (value != null) {method.invoke(bean, new Object[] {});}}} catch (Throwable e) {logger.error("Failed to init remote service reference at method " + name + " in class "+ bean.getClass().getName() + ", cause: " + e.getMessage(), e);}}}Class<?> superClass = bean.getClass().getSuperclass();while (superClass != null && isMatchPackage(superClass)) {referFields(bean, superClass.getDeclaredFields());superClass = superClass.getSuperclass();}referFields(bean, bean.getClass().getDeclaredFields());return bean;}

4.4 领域对象

4.4.1 thrift

4.4.2 GenericRequest和GenericResponse

GenericRequest对象内容包括:1.消息类型(MessageType) 2.序列化(Serialize) 3.消息顺序ID(sequence) 4.上下文(context) 5.大小(size)

1.调用类型(CallType) 2.超时(timeout) 3.创建时间(CreateMillisTime) 4.服务名(ServiceName) 5.方法名(methodName) 6.参数类型(paramClassName)

7.参数(Parameters) 8.版本(version) 9.应用(app) 10.全局变量(globalvalues) 11.请求变量(requestValues)

GenericResponse对象内容包括:1.消息类型(MessageType) 2.序列化(Serialize) 3.消息顺序ID(seq和seqId) 4.上下文(globalcontext 和 localcontext) 5.大小(size)

1.创建时间(CreateMillisTime) 2.服务名(ServiceName) 3.方法名(methodName) 4.返回值(returnVal) 5.压缩类型(compressType) 6.端口号(port)

4.4.3  AbstractInvocationContext

4.5 monitor

监控数据整体分为两部分内容:监控数据和跟踪数据。监控数据和跟踪数据又根据服务提供者和服务消费者进行细分

4.6 channel

描述:该模块提供了池化Channel,并负责管理channel的生命周期(创建,选择,关闭和重连)

类图如下:

4.7. Invoker

描述:实现服务消费者

4.7.1 client初始化过程

在Spring框架中,service client是通过referenceBean(实现Spring框架提供的FactoryBean代理)初始化的

public void init() throws Exception {if (StringUtils.isBlank(interfaceName)) {throw new IllegalArgumentException("invalid interface:" + interfaceName);}this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,this.serialize, this.callback, this.suffix, this.writeBufferLimit, this.loadBalance, this.cluster,this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);invokerConfig.setClassLoader(classLoader);invokerConfig.setSecret(secret);invokerConfig.setRegionPolicy(regionPolicy);if (!CollectionUtils.isEmpty(methods)) {Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();invokerConfig.setMethods(methodMap);for (InvokerMethodConfig method : methods) {methodMap.put(method.getName(), method);}}checkMock(); // 降级配置检查invokerConfig.setMock(mock);checkRemoteAppkey();invokerConfig.setRemoteAppKey(remoteAppKey);this.obj = ServiceFactory.getService(invokerConfig);configLoadBalance(invokerConfig);}

流程如下:

1.根据xml bean配置的interfaceName确定代理类型

2.根据xml bean配置内容创建InvokerConfig对象

3.检查降级配置和远程appkey

4.创建Invoker Service

5.配置InvokerConfig对象的负载均衡策略

Invoker Service创建过程如下:

1.启动InvokerBootStrap

2.通过代理方式创建InvokerService

代理格式:

3.将InvokerService添加到负载均衡管理器

4.将InvokerService添加到SET策略管理器

5.将InvokerService添加到客户端管理器

4.7.2 InvokerBootStrap生命周期

InvokerBootStrap生命周期包括两个活动:启动和关闭

InvokerBootStrap启动过程:todo

4.8  Provider

描述:实现服务提供者

Provider初始化过程:

ProviderProcessHandlerFactory初始化过程:

SerializerFactory初始化过程:登记序列化类型和生成器键值对

4.8.1 NettyServer

4.8.1.1 NettyServer架构

4.8.1.2 NettyServer启动过程

4.8.1.3 NettyServer添加Service过程

4.8.1.4  NettyServer请求处理过程

4.8.2 RequestThreadPoolProcessor

RequestThreadPoolProcessor生命周期

4.8.2.1  初始化过程

  1. 根据poolInfo的内容关闭旧的线程池和创建新的线程池(DynamicThreadPoolFactory)

  2. 创建apiPoolMapping

4.8.2.2  启动过程

1.  初始化Share线程池

4.8.2.3  处理请求过程

1.  建立Callable<InvocationRequest>

a.  选择ServiceInvocationHandler

b.  调用ServiceInvocationHandler.handle()

2.  选择线程池 ( 可使用的线程池包括 DynamicThreadPoolFactory, MethodThreadPools,ServiceThreadPools, Slow和Share)

a.  根据ProviderConfig中的Service和Method配置使用DynamicThreadPoolFactory的独立线程池

b.  MethodThreadPools→ServiceThreadPools

c.  根据lion配置使用DynamicThreadPoolFactory的独立线程池

d.  slow → Share

3.   提交执行

4.8.3  ServiceBean

4.8.3.1 初始化过程

4.8.3.2  组成部分

4.8.3.3 发布服务

ProviderBooStrap.startup:

ServicePublisher.addService:

pigeon-remoting模块功能分析相关推荐

  1. spring源码分析之spring-web remoting模块概况及基本概念

    spring-web总体分为三部分:caucho.httpinvoker.jaxws,其总体构造图如下: uml结构: 先看看网上搜索到的上述实现的原理吧:Spring RMI,Hessian/Bur ...

  2. python从指定文件夹导入模块_Python实现的在特定目录下导入模块功能分析

    本文实例讲述了Python实现的在特定目录下导入模块功能.分享给大家供大家参考,具体如下: 方法1.在指定的目录下导入特定模块,(tab.py换行自动补齐语法模块) root@kali:~# ls / ...

  3. 初学者python笔记(time模块、random模块功能分析)

    文章目录 一.time模块 1.时间戳: 2.两种结构化时间 二.random模块 1.说白了就是随机模块,得到随机数,或者随机的其他元素 2.写一个生成随机验证码的函数 本篇主要是关于Python中 ...

  4. Vpss 模块功能分析

    VPSS(Video Process Sub-System)支持对一幅输入图像进行统一预处理,如去噪.去隔行等,然后再对各通道分别进行缩放.锐化等处理,最后输出多种不同分辨率的图像. 我们按照这个流程 ...

  5. 动态数码管verilog模块功能分析

    学习正点原子FPGA开发板关于动态数码管章节: 实验任务是使用FPGA 开发板上的 6 位数码管以动态方式从 0 开始计数,每 100 ms 计数值增加 一,当计数值从 0 增加到 999999 后重 ...

  6. 好客租房153-地图找房模块功能分析

  7. 【HTCVR】VRTK插件模块功能分析之传送移动(一)

    在HTC Vive设备上,我们戴着头戴显示器可以在一定的范围内自由走动,在现实中的移动会反映到虚拟世界中,我们称这个范围为"房间".但是,在我们的游览项目中,虚拟场景里可移动的范围 ...

  8. 【HTCVR】VRTK插件模块功能分析之传送移动(二)

    上一篇中说明了两种传送方法:基础传送和高度调整传送,这一篇中我们将继续讨论其他的传送方法. 地形传送 示例场景 010_CameraRig_TerrainTeleporting  这个场景展示了在使用 ...

  9. spring源码分析之spring-jms模块详解

    0 概述 spring提供了一个jms集成框架,这个框架如spring 集成jdbc api一样,简化了jms api的使用. jms可以简单的分成两个功能区,消息的生产和消息的消费.JmsTempl ...

  10. spring源码分析之spring-web web模块分析

    0 概述 spring-web的web模块是更高一层的抽象,它封装了快速开发spring-web需要的基础组件.其结构如下: 1. 初始化Initializer部分 1.1  Servlet3.0 的 ...

最新文章

  1. 线段树练习 3P3372 【模板】线段树 1
  2. 无人驾驶还不如一个导盲犬可靠吗
  3. MySQL——基于CASE WHEN的常用查询
  4. MFC DLL 的三种类型
  5. QT使用插件开发界面
  6. Android开发:Menu选项菜单
  7. NIPS不改名再引风波:签名抗议活动开启,大咖纷纷发声支持
  8. The JRE you are running Eclipse with appears to not be a JDK .Spring Boot Live hovers will not work
  9. 安卓开发 实现文字渐变效果_AI教程!用网格工具做渐变字效
  10. linux配置命令route,linux路由配置命令route学习
  11. 毕业论文:vrml设计的虚拟校园
  12. 每次离开总是 装做轻松的样子 微笑着说回去吧 转身泪湿眼底
  13. An invalid domain [] was specified for this cookie问题解决
  14. AUC、KS值、PSI值理解
  15. HTML 和文字有关的标签(2)
  16. html5显示文件后缀,如何显示文件后缀名称
  17. 素数:素数判断、找素数、计算素数和、素数求和
  18. Python常用库的用法介绍都给大家整理出来啦,非常实用,建议收藏
  19. python人工智能计算器_python游戏dnf_招募:基于python的召唤师全时段全技能(含均值AI)计算器全程测试......
  20. 小程序开源项目库汇总

热门文章

  1. html5 正式发布 2014,万维网联盟将于2014年推出HTML5标准
  2. 爱荷华州立大学计算机科学,爱荷华州立计算机科学庆祝成立50周年
  3. php datedif,Datedif函数全面解析及BUG分析
  4. 人的精力是什么?如何强化精力
  5. java 保存文件 中文乱码_java文件中文乱码问题
  6. 通过无线网络实现两台计算机共享打印机共享,同一WiFi环境中两台电脑共享打印机技巧方法...
  7. 日本财险容灾案例揭示小公司的容灾之道
  8. mysql的month_MySQL month()函数
  9. 数字谐音记忆编码连连看网页应用
  10. 关于O、Θ、Ω、o、ω等数学符号