从flink-example分析flink组件(3)WordCount 流式实战及源码分析
前面介绍了批量处理的WorkCount是如何执行的
<从flink-example分析flink组件(1)WordCount batch实战及源码分析>
<从flink-example分析flink组件(2)WordCount batch实战及源码分析----flink如何在本地执行的?>
这篇从WordCount的流式处理开始
/*** Implements the "WordCount" program that computes a simple word occurrence* histogram over text files in a streaming fashion.** <p>The input is a plain text file with lines separated by newline characters.** <p>Usage: <code>WordCount --input <path> --output <path></code><br>* If no parameters are provided, the program is run with default data from* {@link WordCountData}.** <p>This example shows how to:* <ul>* <li>write a simple Flink Streaming program,* <li>use tuple data types,* <li>write and use user-defined functions.* </ul>*/ public class WordCount {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// Checking input parametersfinal ParameterTool params = ParameterTool.fromArgs(args);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface env.getConfig().setGlobalJobParameters(params);// get input dataDataStream<String> text;if (params.has("input")) {// read the text file from given input pathtext = env.readTextFile(params.get("input"));} else {System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");// get default test text datatext = env.fromElements(WordCountData.WORDS);} DataStream<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1); //1// emit resultif (params.has("output")) {counts.writeAsText(params.get("output"));} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}// execute program env.execute("Streaming WordCount");//2}// *************************************************************************// USER FUNCTIONS// *************************************************************************/*** Implements the string tokenizer that splits sentences into words as a* user-defined FlatMapFunction. The function takes a line (String) and* splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,* Integer>}).*/public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}}}
整个执行流程如下图所示:
第1~4步:main方法读取文件,增加算子
private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,TypeInformation<OUT> typeInfo,String sourceName,FileProcessingMode monitoringMode,long interval) {Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,"The path monitoring interval cannot be less than " +ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");ContinuousFileMonitoringFunction<OUT> monitoringFunction =new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);ContinuousFileReaderOperator<OUT> reader =new ContinuousFileReaderOperator<>(inputFormat); SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader); //1return new DataStreamSource<>(source);}
增加算子的方法,当调用execute方法时,此时增加的算子会被执行。
/*** Adds an operator to the list of operators that should be executed when calling* {@link #execute}.** <p>When calling {@link #execute()} only the operators that where previously added to the list* are executed.** <p>This is not meant to be used by users. The API methods that create operators must call* this method.*/@Internalpublic void addOperator(StreamTransformation<?> transformation) {Preconditions.checkNotNull(transformation, "transformation must not be null.");this.transformations.add(transformation);}
第5步:产生StreamGraph,从而可以得到JobGraph,即将Stream程序转换成JobGraph
// transform the streaming program into a JobGraphStreamGraph streamGraph = getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph = streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);
第6~8步启动MiniCluster,为执行job做准备
/*** Starts the mini cluster, based on the configured properties.** @throws Exception This method passes on any exception that occurs during the startup of* the mini cluster.*/public void start() throws Exception {synchronized (lock) {checkState(!running, "MiniCluster is already running");LOG.info("Starting Flink Mini Cluster");LOG.debug("Using configuration {}", miniClusterConfiguration);final Configuration configuration = miniClusterConfiguration.getConfiguration();final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;try {initializeIOFormatClasses(configuration);LOG.info("Starting Metrics Registry");metricRegistry = createMetricRegistry(configuration);// bring up all the RPC servicesLOG.info("Starting RPC Service(s)");AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;if (useSingleRpcService) {// we always need the 'commonRpcService' for auxiliary callscommonRpcService = createRpcService(akkaRpcServiceConfig, false, null);final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);taskManagerRpcServiceFactory = commonRpcServiceFactory;dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;} else {// we always need the 'commonRpcService' for auxiliary callscommonRpcService = createRpcService(akkaRpcServiceConfig, true, null);// start a new service per component, possibly with custom bind addressesfinal String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();dispatcherResourceManagreComponentRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, jobManagerBindAddress);taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, taskManagerBindAddress);}RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration,commonRpcService.getAddress());metricRegistry.startQueryService(metricQueryServiceRpcService, null);ioExecutor = Executors.newFixedThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("mini-cluster-io"));haServices = createHighAvailabilityServices(configuration, ioExecutor);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = HeartbeatServices.fromConfiguration(configuration);blobCacheService = new BlobCacheService(configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()));startTaskManagers();MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(configuration,dispatcherResourceManagreComponentRpcServiceFactory,haServices,blobServer,heartbeatServices,metricRegistry,metricQueryServiceRetriever,new ShutDownFatalErrorHandler()));resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();webMonitorLeaderRetrievalService = haServices.getWebMonitorLeaderRetriever();dispatcherGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,DispatcherGateway.class,DispatcherId::fromUuid,20,Time.milliseconds(20L));resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,20,Time.milliseconds(20L));webMonitorLeaderRetriever = new LeaderRetriever();resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);}catch (Exception e) {// cleanup everythingtry {close();} catch (Exception ee) {e.addSuppressed(ee);}throw e;}// create a new termination futureterminationFuture = new CompletableFuture<>();// now officially mark this as runningrunning = true;LOG.info("Flink Mini Cluster started successfully");}}
第9~12步 执行job
/*** This method runs a job in blocking mode. The method returns only after the job* completed successfully, or after it failed terminally.** @param job The Flink job to execute* @return The result of the job execution** @throws JobExecutionException Thrown if anything went amiss during initial job launch,* or if the job terminally failed.*/@Overridepublic JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {checkNotNull(job, "job is null");final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose((JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));final JobResult jobResult;try {jobResult = jobResultFuture.get();} catch (ExecutionException e) {throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));}try {return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(job.getJobID(), e);}}
先上传jar包文件,此时需要DispatcherGateway来执行上转任务,异步等待结果执行完毕
总结:
batch和stream的执行流程很相似,又有不同。
不同:Stream传递的是DataStream,Batch传递的是DataSet
相同:都转换成JobGraph执行
转载于:https://www.cnblogs.com/davidwang456/p/11015594.html
从flink-example分析flink组件(3)WordCount 流式实战及源码分析相关推荐
- 小朱笔记之hadoop应用实战、源码分析-目录
小朱笔记之hadoop应用实战.源码分析 1.1 背景目的 该笔记从宏观架构.安装配置.源码分析.使用案例四个方面剖析了Hadoop1.0.3,希望能对同学们提供帮助,赠人玫瑰,手留余香.能够把had ...
- Apache Storm 实时流处理系统通信机制源码分析
我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...
- SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析(下)
源码资料 文档资料 <<Nacos架构与原理>>书籍于2021.12.21发布,并在Nacos官方网站非常Nice的提供其电子书的下载.我们学习Nacos源码更多是要吸取其优秀 ...
- tomcat源码分析_百战卓越108天tomcat和servlet源码分析
训练大纲(第105天) 大家如果想快速有效的学习,思想核心是"以建立知识体系为核心",具体方法是"守破离".确保老师课堂上做的操作,反复练习直到熟练. 第209 ...
- 【ES源码分析】强制合并分段(_forcemerge API)源码分析
_forcemerge API 源码分析 文章目录 _forcemerge API 源码分析 合并方式 只合并删除文档 没有限制最大segment数的合并 限制了最大segment数的合并 合并策略 ...
- Spring Boot - security 实战与源码分析
2019独角兽企业重金招聘Python工程师标准>>> 一.实现步骤 1.在application.yml中添加起步依赖 2.自定义安全类 package com.example.d ...
- Go语言之 Context 实战与源码分析
来自:指月 https://www.lixueduan.com 原文:https://www.lixueduan.com/post/go/context/ 本文主要简单介绍了Go语言(golang)中 ...
- 从flink-example分析flink组件(1)WordCount batch实战及源码分析
上一章<windows下flink示例程序的执行> 简单介绍了一下flink在windows下如何通过flink-webui运行已经打包完成的示例程序(jar),那么我们为什么要使用fli ...
- sentinel限流相关指标统计源码分析
文章目录 *前言* *官方架构图* 滑动窗口模型 *StatisticSlot* *Node* *NodeSelectorSlot* *ClusterBuilderSlot* *FlowSlot如何使 ...
最新文章
- hdu 5410(背包问题变形)
- java string 类 上机_java上机试题
- 第一节:别出心裁的HTML5简介
- spring自定义生命周期的几种方法
- Linux的首次登录操作总结
- 鸿蒙OS最快6月正式推送;微信Windows版将支持刷朋友圈;GCC 11.1正式发布|极客头条...
- Keras Model AttributeError:’str‘ object has no attribute ’call‘
- C语言数据类型大小分析(基于VC2005编译器)
- 海量数据实时计算系统在高并发互联网应用中的原理和实践
- 前端使用阿里巴巴矢量图库的图标大全
- 红米手机计算机软件,红米手机怎么连接电脑,手把手教你红米手机连接电脑的方法...
- 动态规划法(入门)——最大正方形、最大长方形
- 国王游戏(贪心算法)
- android补间动画有哪几种,Android补间动画、属性动画 常用功能总结
- Red hat查找命令所属的rpm包
- Amazon为 EC2 Auto Scaling 增加目标跟踪支持
- 化痰止咳平喘药题库【1】
- ps 图层解锁后变成全格子(全透明)的解决方法
- 线程同步,为什么要引入线程同步?
- python发邮件详解,smtplib和email模块详解
热门文章
- 数组模拟栈和队列板子
- oracle命令报01034,ORA-01034错误的解决办法-数据库专栏,ORACLE
- mysql 日志的存放形式_mysql日志详细解析
- python递归函数讲解_Python递归函数实例讲解
- linux指令笔试,Linux笔试常用命令
- oracle回收ddl权限,oracle禁止指定用户DDL操作触发器
- python和anaconda一定要对应版本安装吗_Anaconda与Python安装版本对应关系 --- 转载
- 构造先存储再计算的加法器电路
- java jtextfield设置不可见_java – JPanel设置为不可见,除默认值之外的组合框选择将其设置为可见,但组件丢失...
- python链表翻转_反转链表(两种Python解法)