Flink系列文档-(YY02)-Flink编程基础-入门示例
1 环境准备-创建项目引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version>
</dependency>
如需使用scala API,则替换上面的: flink-java 为 flink-scala_2.12 flink-streaming-java_2.12 为 flink-streaming-scala_2.12
2 flink的DataStream抽象
- DataStream代表一个数据流,它可以是无界的,也可以是有界的;
- DataStream类似于spark的rdd,它是不可变的(immutable);
- 无法对一个datastream进行自由的添加或删除或修改元素;
- 只能通过算子对datastream中的数据进行转换,将一个datastream转成另一个datastream;
- datastream可以通过source算子加载、映射外部数据而来;或者从已存在的datastream转换而来
3 flink编程模板
YY-无论简单与复杂,flink程序都由如下几个部分组成
- 获取一个编程、执行入口环境env
- 通过数据源组件,加载、创建datastream
- 对datastream调用各种处理算子表达计算逻辑
- 通过sink算子指定计算结果的输出方式
- 在env上触发程序提交运行
4 WordCount代码示例
YY01--批处理示例
package cn.doitedu.base01;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** @Date: 22.11.07* @Author: Hang.Nian.YY* @qq: 598196583* @Tips: 学大数据 ,到多易教育* @Description: 批处理 统计文件中单词出现的次数*/
public class Base01_BatchWordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 流式处理数据的环境对象// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 获取批处理的环境对象ExecutionEnvironment batchEnv = ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);DataSource<String> ds = batchEnv.readTextFile("data/a.txt");/** * 泛型1 输入数据类型 String* 泛型2 返回数据类型 Tuple2<String, Integer>*/ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\s+");for (String word : words) {out.collect(Tuple2.of(word, 1));}}}).groupBy(0).sum(1).print();}
}
使用Lamda表达式编写成 ,凡是单方法的接口都可以使用Lamda表达式书写, 但是数据Java中的要指定返回的数据类型 !
FlatMapOperator<String, Tuple2<String, Integer>> ds2 = ds.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] words = line.split("\\s+");for (String word : words) {collector.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT));// 指定数据类型// 方法有返回值 ,指定返回值的类型// .returns(new TypeHint<Tuple2<String, Integer>>() {}) // 类型提示// 这种是最通用的方法// .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})) // 类型信息ds2.groupBy(tp -> tp.f0).sum(1).print();
YY02-流处理示例
<测试时,需要用nc -lk 9999(linux)或者 nc -L -p 9999(windows)在本机开启一个9999的sockect服务>
/*** @Date: 22.11.07* @Author: Hang.Nian.YY* @qq: 598196583* @Tips: 学大数据 ,到多易教育* @Description: */
public class Base03_StreamWordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 设置本地的运行环境 . 且带webUIconf.setInteger("rest.port" , 8081);// 获取流式处理数据的对象StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 设置并行度 为1 默认并行度是本地机器的cpu的核数env.setParallelism(1) ;SingleOutputStreamOperator<String> source = env.socketTextStream("doitedu01", 8899);// .setParallelism(1).slotSharingGroup("g1");// 同一个槽位共享组// 使用算子对数据进行各种转换操作SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = source.flatMap(// 传入转换算子函数对象new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 处理每行数据 , 切割出单词 String[] words = value.split("\\s+");for (String word : words) {// 将单词组装成 元组 收集记录数据 并转发返回out.collect(Tuple2.of(word, 1));}}});// 将数据按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});keyed.sum("f1").print();env.execute() ;}
}
5 flink程序的并行概念
- flink程序中,每一个算子都可以成为一个独立任务(task);
- flink程序中,视上下游算子间数据分发规则、并行度、共享槽位设置,可组成算子链成为一个task
- 每个任务在运行时都可拥有多个并行的运行实例(subTask);
- 且每个算子任务的并行度都可以在代码中显式设置;
Flink系列文档-(YY02)-Flink编程基础-入门示例相关推荐
- Flink系列文档-(YY03)-Flink编程基础API-Source
1 Flink编程入口 首先获取flink编程的核心入口对象 /*** 获取批处理入口对象*/// 1) 普通的批处理对象ExecutionEnvironment environment1 = E ...
- Flink系列文档-(YY05)-Flink编程API-多流算子
1 多流连接 connect connect连接(DataStream,DataStream→ConnectedStreams) connect翻译成中文意为连接,可以将两个数据类型一样也可以类型 ...
- Flink系列文档-(YY08)-Flink核心概念
1 核心概念 1.1 基础概念 用户通过算子api所开发的代码,会被flink任务提交客户端解析成jobGraph 然后,jobGraph提交到集群JobManager,转化成ExecutionGra ...
- Flink系列文档-(YY01)-初识Flink
1. 离线批计算与实时流式计算 批计算与流式计算,本质上就是对有界流和无界流的计算 YY-批计算 针对有界流:由于在产出计算结果前可以看到整个(完整)数据集,因而如下计算都可以实现:对数据排序,计 ...
- 玩转MFC文档视图架构编程1——深入浅出MFC文档/视图架构之基本概念深入浅出MFC文档/视图架构之文档
原创地址: 深入浅出MFC文档/视图架构之基本概念 http://iis.xrtvu.com/Tech/ShowArticle.asp?ArticleID=276 深入浅出MFC文档/视图架构之文档模 ...
- 老猿学5G扫盲贴:3GPP规范文档命名规则及同系列文档阅读指南
专栏:Python基础教程目录 专栏:使用PyQt开发图形界面Python应用 专栏:PyQt入门学习 老猿Python博文目录 老猿学5G博文目录 在学习5G规范过程中,有些内容把握不定的时候,有时 ...
- 价值1000元的Python原创文档,涉及“Python基础“、“Python自动化“等,免费分享!
感谢您来到,黄同学的<原创作品>所在地,这里将会给大家分享很多实在的干货文档,供大家学习. 这四个文档分别是:<Python自动化办公手册>.<Excel数据透视表大全手 ...
- java openoffice 打印_java调用openoffice将office系列文档转换为PDF的示例方法
前导: 发过程中经常会使用java将office系列文档转换为PDF, 一般都使用微软提供的openoffice+jodconverter 实现转换文档. openoffice既有windows版本也 ...
- Ext JS 6学习文档-第3章-基础组件
Ext JS 6学习文档-第3章-基础组件 基础组件 在本章中,你将学习到一些 Ext JS 基础组件的使用.同时我们会结合所学创建一个小项目.这一章我们将学习以下知识点: 熟悉基本的组件 – 按钮, ...
最新文章
- c语言 long和short区别,5分钟读懂Android 中的toast short 和long的区别
- Qt-捕获Windows消息
- vtk类之vtkImageReslice:基本算法,对体数据沿着轴进行切片
- 第2章 DOS循环:for命令详解
- 腾讯上海安全团队招聘实习生
- 基坑监测日报模板_静兴项目部工程日报(2020/7/7)
- C 标准库 –stdio.h 简介
- 摩尔斯电码对照表—Morse code
- 金融风控建模评分卡系列:机器学习特征选择方法
- 如何用js实现数组倒序输出
- SD卡、TF卡、MMC卡以及eMMC芯片的介绍
- 奈奎斯特定理和香农定理解释
- android 平板z97,生命在于折腾 iGame Z97也能玩Android
- 试图加载格式不正确的程序 解决方法
- 数学建模用python分析gdp_数学建模·中国GDP趋势分析与预测
- vue+h5仿微信网页版聊天室vueWebChat项目
- 基于标定板的手眼标定
- python中hasattr,getattr,setattr的区别
- WORD中的多级列表详解
- php 根路由器,Pux