基于Flink1.8版本,分析Flink各节点之间的RPC实现:

  • 介绍RPC相关的主要接口
  • RPC节点之间的通信方式

Flink老版本处理Rpc时,各节点通过继承FlinkActor接口,接收Actor消息,根据消息类型进行不同的业务处理。此种方式将流程业务和具体通信组件耦合在一起,不利于后期更换通信组件(如使用netty),因此Flink引入了RPC调用,各节点通过GateWay方式回调,隐藏通信组件的细节,实现解耦。

RPC相关的主要接口

  • RpcEndpoint
  • RpcService
  • RpcGateway

RpcEndpoint:远程过程调用(remote procedure calls) 的基类

RpcEndpoint是Flink RPC终端的基类,所有提供远程过程调用的分布式组件必须扩展RpcEndpoint, RpcEndpoint功能由RpcService支持。

RpcEndpoint子类


如上图所示,RpcEndpoint的子类只有四类组件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有这四个组件有RPC的能力,换句话说只有这四个组件有RPC的这个需求。

这也对应了Flink这的四大组件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,彼此之间的通信需要依赖RPC实现。(目前通信组件依然是Akka)

RpcService:RPC服务提供者

RpcServer是RpcEndpoint的成员变量,为RpcService提供RPC服务,连接远程Server,其只有一个子类实现:AkkaRpcService,可见目前Flink的通信方式依然是Akka。

RpcServer用于启动和连接到RpcEndpoint, 连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。

Flink四大组件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的实现,所以构建四大组件时,同步需要初始化RpcServer。

如JobManager的构造方式,第一个参数就是需要知道RpcService :

public JobMaster(RpcService rpcService,JobMasterConfiguration jobMasterConfiguration,ResourceID resourceId,JobGraph jobGraph,HighAvailabilityServices highAvailabilityService,SlotPoolFactory slotPoolFactory,SchedulerFactory schedulerFactory,JobManagerSharedServices jobManagerSharedServices,HeartbeatServices heartbeatServices,JobManagerJobMetricGroupFactory jobMetricGroupFactory,OnCompletionActions jobCompletionActions,FatalErrorHandler fatalErrorHandler,ClassLoader userCodeLoader){}

所有的RpcService都是通过AkkaRpcServiceUtils这个工具类的createRpcService方法创建的。

RpcGateway:RPC调用的网关

RpcGateway主要实现接口有:FencedRpcEndpoint和TaskExecutorGateway,而这两个接口又分别被Flink四大组件继承,即Dispatcher,JobMaster,ResourceManager,TaskExecutor可通过各自的Gateway实现RPC调用。

  • Rpc gateway interface,所有Rpc组件的网关,定义了各组件的Rpc接口
  • 常见的就是Rpc实现,如JobMasterGateway,DispatcherGateway,ResourceManagerGateway,TaskExecutorGateway等
  • 各组件类的成员变量都有需要通信的其他组件的GateWay实现类,便于Rpc调用

以JobMaster为例,JobMaster实现JobMasterGateway接口,JobMasterGateway接口中定义的方法如下:

public interface JobMasterGateway extendsCheckpointCoordinatorGateway,FencedRpcGateway<JobMasterId>,KvStateLocationOracle,KvStateRegistryGateway {/*** 取消正在执行的任务(与TaskExecutorGateway交互)*/CompletableFuture<Acknowledge> cancel(@RpcTimeout Time timeout);/*** 取消正在执行的任务(与TaskExecutorGateway交互)*/CompletableFuture<Acknowledge> stop(@RpcTimeout Time timeout);/*** 修改正在运行的任务的并行度(与TaskExecutorGateway交互)*/CompletableFuture<Acknowledge> rescaleJob(int newParallelism,RescalingBehaviour rescalingBehaviour,@RpcTimeout Time timeout);/*** 修改指定算子的并行度(与TaskExecutorGateway交互)*/CompletableFuture<Acknowledge> rescaleOperators(Collection<JobVertexID> operators,int newParallelism,RescalingBehaviour rescalingBehaviour,@RpcTimeout Time timeout);CompletableFuture<Acknowledge> updateTaskExecutionState(final TaskExecutionState taskExecutionState);CompletableFuture<SerializedInputSplit> requestNextInputSplit(final JobVertexID vertexID,final ExecutionAttemptID executionAttempt);CompletableFuture<ExecutionState> requestPartitionState(final IntermediateDataSetID intermediateResultId,final ResultPartitionID partitionId);CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID,@RpcTimeout final Time timeout);CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause);/*** 和ResourceManager断开连接(与ResourceManager交互)*/void disconnectResourceManager(final ResourceManagerId resourceManagerId,final Exception cause);/*** Offers the given slots to the job manager. The response contains the set of accepted slots.** @param taskManagerId identifying the task manager* @param slots         to offer to the job manager* @param timeout       for the rpc call* @return Future set of accepted slots.*/CompletableFuture<Collection<SlotOffer>> offerSlots(final ResourceID taskManagerId,final Collection<SlotOffer> slots,@RpcTimeout final Time timeout);void failSlot(final ResourceID taskManagerId,final AllocationID allocationId,final Exception cause);
ableFuture<RegistrationResponse> registerTaskManager(final String taskManagerRpcAddress,final TaskManagerLocation taskManagerLocation,@RpcTimeout final Time timeout);void heartbeatFromTaskManager(final ResourceID resourceID,final AccumulatorReport accumulatorReport);/*** Sends heartbeat request from the resource manager.** @param resourceID unique id of the resource manager*/void heartbeatFromResourceManager(final ResourceID resourceID);CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout);CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);CompletableFuture<ArchivedExecutionGraph> requestJob(@RpcTimeout Time timeout);CompletableFuture<String> triggerSavepoint(@Nullable final String targetDirectory,final boolean cancelJob,@RpcTimeout final Time timeout);CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);void notifyAllocationFailure(AllocationID allocationID, Exception cause);CompletableFuture<Object> updateGlobalAggregate(String aggregateName, Object aggregand, byte[] serializedAggregationFunction);
}

上面JobMasterGateway 定义的方法有两类返回值:Void和CompletableFuture

  • Void返回值:表示从其他组件(如Dispatcher)触发动作,JobMaster中定义此方法作为Dispatcher的回调;
  • CompletableFuture返回值:表示将此方法的实现由JobManager主动调用,并且该方法中一般都有其他组件的Gateway调用

总结:

之前版本跨节点的通信是直接基于Akka,现在Flink1.8基于业务需要,定义各组件的GateWay,方便直接使用Rpc,但是底层依然是Akka。好处在于,GateWay在具体组件中排出了Akka相关代码,将业务和通信方式进行分离,便于后期更换通信方式,如netty

弄清Flink1.8的远程过程调用(RPC)相关推荐

  1. 转:传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确 .

    近期在做淘宝客的项目,大家都知道,淘宝的商品详细描述字符长度很大,所以就导致了今天出现了一个问题 VS的报错是这样子的  " 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确 ...

  2. 修复远程过程调用 (RPC) 时发生的各种问题KB908521

    当系统出现RPC通讯问题时可以尝安装KB908521进行修复. 安装本更新程序可以解决当您在 Microsoft Windows Server 2003 和 Microsoft Windows XP ...

  3. 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确。参数 1 (“@xx“): 对于类型特定的元数据,数据类型 0x62 (sql_variant)的类型无效。

    传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确.参数 1 ("@xx"): 对于类型特定的元数据,数据类型 0x62 (sql_variant)的类型无效. 解决 ...

  4. 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确

    记录一次很少见的Sql异常排查,异常内容:         传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确.参数 5 ("@TestValue"): 提供的值不是 ...

  5. com.microsoft.sqlserver.jdbc.SQLServerException: 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确。此 RPC 请求中提供了过多的参数。

    sqlserver在做批量插入的时候出现这个错误: com.microsoft.sqlserver.jdbc.SQLServerException: 传入的表格格式数据流(TDS)远程过程调用(RPC ...

  6. RabbitMQ远程过程调用(RPC)

    RabbitMQ远程过程调用(RPC) 准备环境 使用Pika RabbitMQ客户端,客户端版本(1.0.0以上) 客户端接口示例 FibonacciRpcClient是一个简单的客户端类,它暴露了 ...

  7. 整合rpc远程调用_远程过程调用(RPC)

    分布式系统的主要特点是能够将一台机器上的一个任务分解到系统中其他的机器上运行,实现多个CPU的协同工作.远程过程调用RPC就是实现这一特点的有效方法之一 1.什么是RPC RPC的基本思想 (1984 ...

  8. 进程间通信 IPC 的本地过程调用 LPC(Local Procedure Call)和远程过程调用 RPC(Remote Procedure Call)

    进程间通信(IPC:Inter-Process Communication)是在多任务操作系统或联网的计算机之间运行的程序和进程所用的通信技术.有两种类型的进程间通信(IPC). 本地过程调用(LPC ...

  9. [译]RabbitMQ教程C#版 - 远程过程调用(RPC)

    先决条件 本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难, ...

最新文章

  1. 【一周算法实践集训】_【模型构建】_baseline
  2. 160个Crackme036
  3. Java并发包--线程池框架
  4. Kubernetes 稳定性保障手册(极简版)
  5. mysql基础之四:int(M)中M的含义
  6. 接口管理工具 - 资源篇
  7. fedora 16 x64 安装gnustep object-c开发环境
  8. 服务应用突然宕机了?别怕,Dubbo 帮你自动搞定服务隔离!
  9. 计算机性能怎么测试软件,如何测试电脑性能|检测电脑性能的方法
  10. 关于c# naudio的几个注意事项
  11. 【自然语言处理】词性标注
  12. [微信小程序]搜索功能实现,搜索框样式
  13. 孩子为什么不能玩抖音精彩回答,共勉
  14. 大连大学计算机科学与技术研究生毕业工资,大学研究生毕业的你,现在一个月的月薪多少?现实让人想哭!...
  15. python埃及分数_送你一份低折扣书单,Python就占了6本,人工智能2本
  16. Win11安装 eNSP模拟器
  17. 祖玛游戏python
  18. 微信O2O,卡在了“连接一切”的迷信上
  19. 什么是TSN,如何搭建TSN验证环境?
  20. 全国计算机考试cad,国家CAD考试CAD试题库.pdf

热门文章

  1. java JDBC使用简易教程
  2. 360日历精选弹窗如何关闭?
  3. 本地拷贝文件到服务器卡死,本地文件拷贝到服务器
  4. 私密计算机,360隐私安全计算机版
  5. 驱动人生2014 v6.0.9.70 绿色版
  6. 探索性数据分析的思路整理
  7. 中国石油暴跌近7%带动 石油板块跌幅居前
  8. 读取图片做OpenCV画的雷达图的背景
  9. LaTex使用笔记(转载)
  10. app能不能跳转外部h5_APP内部H5页面跳转 H5唤起APP 怎么做?