前面介绍了批量处理的WorkCount是如何执行的

<从flink-example分析flink组件(1)WordCount batch实战及源码分析>

<从flink-example分析flink组件(2)WordCount batch实战及源码分析----flink如何在本地执行的?>

这篇从WordCount的流式处理开始

/*** Implements the "WordCount" program that computes a simple word occurrence* histogram over text files in a streaming fashion.** <p>The input is a plain text file with lines separated by newline characters.** <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>* If no parameters are provided, the program is run with default data from* {@link WordCountData}.** <p>This example shows how to:* <ul>* <li>write a simple Flink Streaming program,* <li>use tuple data types,* <li>write and use user-defined functions.* </ul>*/
public class WordCount {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// Checking input parametersfinal ParameterTool params = ParameterTool.fromArgs(args);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);// get input dataDataStream<String> text;if (params.has("input")) {// read the text file from given input pathtext = env.readTextFile(params.get("input"));} else {System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");// get default test text datatext = env.fromElements(WordCountData.WORDS);}        DataStream<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1);                                                     //1// emit resultif (params.has("output")) {counts.writeAsText(params.get("output"));} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}// execute program
        env.execute("Streaming WordCount");//2}// *************************************************************************// USER FUNCTIONS// *************************************************************************/*** Implements the string tokenizer that splits sentences into words as a* user-defined FlatMapFunction. The function takes a line (String) and* splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,* Integer>}).*/public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}}}

整个执行流程如下图所示:

第1~4步:main方法读取文件,增加算子

    private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,TypeInformation<OUT> typeInfo,String sourceName,FileProcessingMode monitoringMode,long interval) {Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,"The path monitoring interval cannot be less than " +ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");ContinuousFileMonitoringFunction<OUT> monitoringFunction =new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);ContinuousFileReaderOperator<OUT> reader =new ContinuousFileReaderOperator<>(inputFormat);        SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader);                //1return new DataStreamSource<>(source);}

增加算子的方法,当调用execute方法时,此时增加的算子会被执行。

    /*** Adds an operator to the list of operators that should be executed when calling* {@link #execute}.** <p>When calling {@link #execute()} only the operators that where previously added to the list* are executed.** <p>This is not meant to be used by users. The API methods that create operators must call* this method.*/@Internalpublic void addOperator(StreamTransformation<?> transformation) {Preconditions.checkNotNull(transformation, "transformation must not be null.");this.transformations.add(transformation);}

第5步:产生StreamGraph,从而可以得到JobGraph,即将Stream程序转换成JobGraph

        // transform the streaming program into a JobGraphStreamGraph streamGraph = getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph = streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);

第6~8步启动MiniCluster,为执行job做准备

/*** Starts the mini cluster, based on the configured properties.** @throws Exception This method passes on any exception that occurs during the startup of*                   the mini cluster.*/public void start() throws Exception {synchronized (lock) {checkState(!running, "MiniCluster is already running");LOG.info("Starting Flink Mini Cluster");LOG.debug("Using configuration {}", miniClusterConfiguration);final Configuration configuration = miniClusterConfiguration.getConfiguration();final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;try {initializeIOFormatClasses(configuration);LOG.info("Starting Metrics Registry");metricRegistry = createMetricRegistry(configuration);// bring up all the RPC servicesLOG.info("Starting RPC Service(s)");AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;if (useSingleRpcService) {// we always need the 'commonRpcService' for auxiliary callscommonRpcService = createRpcService(akkaRpcServiceConfig, false, null);final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);taskManagerRpcServiceFactory = commonRpcServiceFactory;dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;} else {// we always need the 'commonRpcService' for auxiliary callscommonRpcService = createRpcService(akkaRpcServiceConfig, true, null);// start a new service per component, possibly with custom bind addressesfinal String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();dispatcherResourceManagreComponentRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, jobManagerBindAddress);taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, taskManagerBindAddress);}RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration,commonRpcService.getAddress());metricRegistry.startQueryService(metricQueryServiceRpcService, null);ioExecutor = Executors.newFixedThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("mini-cluster-io"));haServices = createHighAvailabilityServices(configuration, ioExecutor);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = HeartbeatServices.fromConfiguration(configuration);blobCacheService = new BlobCacheService(configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()));startTaskManagers();MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(configuration,dispatcherResourceManagreComponentRpcServiceFactory,haServices,blobServer,heartbeatServices,metricRegistry,metricQueryServiceRetriever,new ShutDownFatalErrorHandler()));resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();webMonitorLeaderRetrievalService = haServices.getWebMonitorLeaderRetriever();dispatcherGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,DispatcherGateway.class,DispatcherId::fromUuid,20,Time.milliseconds(20L));resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,20,Time.milliseconds(20L));webMonitorLeaderRetriever = new LeaderRetriever();resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);}catch (Exception e) {// cleanup everythingtry {close();} catch (Exception ee) {e.addSuppressed(ee);}throw e;}// create a new termination futureterminationFuture = new CompletableFuture<>();// now officially mark this as runningrunning = true;LOG.info("Flink Mini Cluster started successfully");}}

第9~12步 执行job

    /*** This method runs a job in blocking mode. The method returns only after the job* completed successfully, or after it failed terminally.** @param job  The Flink job to execute* @return The result of the job execution** @throws JobExecutionException Thrown if anything went amiss during initial job launch,*         or if the job terminally failed.*/@Overridepublic JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {checkNotNull(job, "job is null");final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose((JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));final JobResult jobResult;try {jobResult = jobResultFuture.get();} catch (ExecutionException e) {throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));}try {return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(job.getJobID(), e);}}

先上传jar包文件,此时需要DispatcherGateway来执行上转任务,异步等待结果执行完毕

总结:

batch和stream的执行流程很相似,又有不同。

不同:Stream传递的是DataStream,Batch传递的是DataSet

相同:都转换成JobGraph执行

转载于:https://www.cnblogs.com/davidwang456/p/11015594.html

从flink-example分析flink组件(3)WordCount 流式实战及源码分析相关推荐

  1. 小朱笔记之hadoop应用实战、源码分析-目录

    小朱笔记之hadoop应用实战.源码分析 1.1 背景目的 该笔记从宏观架构.安装配置.源码分析.使用案例四个方面剖析了Hadoop1.0.3,希望能对同学们提供帮助,赠人玫瑰,手留余香.能够把had ...

  2. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  3. SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析(下)

    源码资料 文档资料 <<Nacos架构与原理>>书籍于2021.12.21发布,并在Nacos官方网站非常Nice的提供其电子书的下载.我们学习Nacos源码更多是要吸取其优秀 ...

  4. tomcat源码分析_百战卓越108天tomcat和servlet源码分析

    训练大纲(第105天) 大家如果想快速有效的学习,思想核心是"以建立知识体系为核心",具体方法是"守破离".确保老师课堂上做的操作,反复练习直到熟练. 第209 ...

  5. 【ES源码分析】强制合并分段(_forcemerge API)源码分析

    _forcemerge API 源码分析 文章目录 _forcemerge API 源码分析 合并方式 只合并删除文档 没有限制最大segment数的合并 限制了最大segment数的合并 合并策略 ...

  6. Spring Boot - security 实战与源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一.实现步骤 1.在application.yml中添加起步依赖 2.自定义安全类 package com.example.d ...

  7. Go语言之 Context 实战与源码分析

    来自:指月 https://www.lixueduan.com 原文:https://www.lixueduan.com/post/go/context/ 本文主要简单介绍了Go语言(golang)中 ...

  8. 从flink-example分析flink组件(1)WordCount batch实战及源码分析

    上一章<windows下flink示例程序的执行> 简单介绍了一下flink在windows下如何通过flink-webui运行已经打包完成的示例程序(jar),那么我们为什么要使用fli ...

  9. sentinel限流相关指标统计源码分析

    文章目录 *前言* *官方架构图* 滑动窗口模型 *StatisticSlot* *Node* *NodeSelectorSlot* *ClusterBuilderSlot* *FlowSlot如何使 ...

最新文章

  1. hdu 5410(背包问题变形)
  2. java string 类 上机_java上机试题
  3. 第一节:别出心裁的HTML5简介
  4. spring自定义生命周期的几种方法
  5. Linux的首次登录操作总结
  6. 鸿蒙OS最快6月正式推送;微信Windows版将支持刷朋友圈;GCC 11.1正式发布|极客头条...
  7. Keras Model AttributeError:’str‘ object has no attribute ’call‘
  8. C语言数据类型大小分析(基于VC2005编译器)
  9. 海量数据实时计算系统在高并发互联网应用中的原理和实践
  10. 前端使用阿里巴巴矢量图库的图标大全
  11. 红米手机计算机软件,红米手机怎么连接电脑,手把手教你红米手机连接电脑的方法...
  12. 动态规划法(入门)——最大正方形、最大长方形
  13. 国王游戏(贪心算法)
  14. android补间动画有哪几种,Android补间动画、属性动画 常用功能总结
  15. Red hat查找命令所属的rpm包
  16. Amazon为 EC2 Auto Scaling 增加目标跟踪支持
  17. 化痰止咳平喘药题库【1】
  18. ps 图层解锁后变成全格子(全透明)的解决方法
  19. 线程同步,为什么要引入线程同步?
  20. python发邮件详解,smtplib和email模块详解

热门文章

  1. 数组模拟栈和队列板子
  2. oracle命令报01034,ORA-01034错误的解决办法-数据库专栏,ORACLE
  3. mysql 日志的存放形式_mysql日志详细解析
  4. python递归函数讲解_Python递归函数实例讲解
  5. linux指令笔试,Linux笔试常用命令
  6. oracle回收ddl权限,oracle禁止指定用户DDL操作触发器
  7. python和anaconda一定要对应版本安装吗_Anaconda与Python安装版本对应关系 --- 转载
  8. 构造先存储再计算的加法器电路
  9. java jtextfield设置不可见_java – JPanel设置为不可见,除默认值之外的组合框选择将其设置为可见,但组件丢失...
  10. python链表翻转_反转链表(两种Python解法)