1 环境准备-创建项目引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version>
</dependency>

如需使用scala API,则替换上面的: flink-java 为 flink-scala_2.12 flink-streaming-java_2.12 为 flink-streaming-scala_2.12

2 flink的DataStream抽象

  • DataStream代表一个数据流,它可以是无界的,也可以是有界的;
  • DataStream类似于spark的rdd,它是不可变的(immutable);
  • 无法对一个datastream进行自由的添加或删除或修改元素;
  • 只能通过算子对datastream中的数据进行转换,将一个datastream转成另一个datastream;
  • datastream可以通过source算子加载、映射外部数据而来;或者从已存在的datastream转换而来

3 flink编程模板

  YY-无论简单与复杂,flink程序都由如下几个部分组成

  • 获取一个编程、执行入口环境env
  • 通过数据源组件,加载、创建datastream
  • 对datastream调用各种处理算子表达计算逻辑
  • 通过sink算子指定计算结果的输出方式
  • 在env上触发程序提交运行

4 WordCount代码示例

  YY01--批处理示例

package cn.doitedu.base01;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** @Date: 22.11.07* @Author: Hang.Nian.YY* @qq: 598196583* @Tips: 学大数据 ,到多易教育* @Description: 批处理  统计文件中单词出现的次数*/
public class Base01_BatchWordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 流式处理数据的环境对象//  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 获取批处理的环境对象ExecutionEnvironment batchEnv = ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);DataSource<String> ds = batchEnv.readTextFile("data/a.txt");/**  * 泛型1     输入数据类型   String* 泛型2     返回数据类型  Tuple2<String, Integer>*/ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\s+");for (String word : words) {out.collect(Tuple2.of(word, 1));}}}).groupBy(0).sum(1).print();}
}

使用Lamda表达式编写成 ,凡是单方法的接口都可以使用Lamda表达式书写, 但是数据Java中的要指定返回的数据类型 !

  FlatMapOperator<String, Tuple2<String, Integer>> ds2 = ds.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] words = line.split("\\s+");for (String word : words) {collector.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT));// 指定数据类型// 方法有返回值  ,指定返回值的类型//  .returns(new TypeHint<Tuple2<String, Integer>>() {})  // 类型提示// 这种是最通用的方法//  .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})) // 类型信息ds2.groupBy(tp -> tp.f0).sum(1).print();

YY02-流处理示例

<测试时,需要用nc -lk 9999(linux)或者 nc -L -p 9999(windows)在本机开启一个9999的sockect服务>

/*** @Date: 22.11.07* @Author: Hang.Nian.YY* @qq: 598196583* @Tips: 学大数据 ,到多易教育* @Description: */
public class Base03_StreamWordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 设置本地的运行环境 . 且带webUIconf.setInteger("rest.port" , 8081);// 获取流式处理数据的对象StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 设置并行度 为1   默认并行度是本地机器的cpu的核数env.setParallelism(1) ;SingleOutputStreamOperator<String> source = env.socketTextStream("doitedu01", 8899);//  .setParallelism(1).slotSharingGroup("g1");// 同一个槽位共享组// 使用算子对数据进行各种转换操作SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = source.flatMap(// 传入转换算子函数对象new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 处理每行数据   , 切割出单词  String[] words = value.split("\\s+");for (String word : words) {// 将单词组装成  元组    收集记录数据   并转发返回out.collect(Tuple2.of(word, 1));}}});// 将数据按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});keyed.sum("f1").print();env.execute() ;}
}

flink程序的并行概念

  1. flink程序中,每一个算子都可以成为一个独立任务(task);
  2. flink程序中,视上下游算子间数据分发规则、并行度、共享槽位设置,可组成算子链成为一个task
  3. 每个任务在运行时都可拥有多个并行的运行实例(subTask);
  4. 且每个算子任务的并行度都可以在代码中显式设置;

Flink系列文档-(YY02)-Flink编程基础-入门示例相关推荐

  1. Flink系列文档-(YY03)-Flink编程基础API-Source

    1 Flink编程入口 首先获取flink编程的核心入口对象   /*** 获取批处理入口对象*/// 1) 普通的批处理对象ExecutionEnvironment environment1 = E ...

  2. Flink系列文档-(YY05)-Flink编程API-多流算子

    1 多流连接 connect   connect连接(DataStream,DataStream→ConnectedStreams) connect翻译成中文意为连接,可以将两个数据类型一样也可以类型 ...

  3. Flink系列文档-(YY08)-Flink核心概念

    1 核心概念 1.1 基础概念 用户通过算子api所开发的代码,会被flink任务提交客户端解析成jobGraph 然后,jobGraph提交到集群JobManager,转化成ExecutionGra ...

  4. Flink系列文档-(YY01)-初识Flink

    1. 离线批计算与实时流式计算 批计算与流式计算,本质上就是对有界流和无界流的计算   YY-批计算 针对有界流:由于在产出计算结果前可以看到整个(完整)数据集,因而如下计算都可以实现:对数据排序,计 ...

  5. 玩转MFC文档视图架构编程1——深入浅出MFC文档/视图架构之基本概念深入浅出MFC文档/视图架构之文档

    原创地址: 深入浅出MFC文档/视图架构之基本概念 http://iis.xrtvu.com/Tech/ShowArticle.asp?ArticleID=276 深入浅出MFC文档/视图架构之文档模 ...

  6. 老猿学5G扫盲贴:3GPP规范文档命名规则及同系列文档阅读指南

    专栏:Python基础教程目录 专栏:使用PyQt开发图形界面Python应用 专栏:PyQt入门学习 老猿Python博文目录 老猿学5G博文目录 在学习5G规范过程中,有些内容把握不定的时候,有时 ...

  7. 价值1000元的Python原创文档,涉及“Python基础“、“Python自动化“等,免费分享!

    感谢您来到,黄同学的<原创作品>所在地,这里将会给大家分享很多实在的干货文档,供大家学习. 这四个文档分别是:<Python自动化办公手册>.<Excel数据透视表大全手 ...

  8. java openoffice 打印_java调用openoffice将office系列文档转换为PDF的示例方法

    前导: 发过程中经常会使用java将office系列文档转换为PDF, 一般都使用微软提供的openoffice+jodconverter 实现转换文档. openoffice既有windows版本也 ...

  9. Ext JS 6学习文档-第3章-基础组件

    Ext JS 6学习文档-第3章-基础组件 基础组件 在本章中,你将学习到一些 Ext JS 基础组件的使用.同时我们会结合所学创建一个小项目.这一章我们将学习以下知识点: 熟悉基本的组件 – 按钮, ...

最新文章

  1. c语言 long和short区别,5分钟读懂Android 中的toast short 和long的区别
  2. Qt-捕获Windows消息
  3. vtk类之vtkImageReslice:基本算法,对体数据沿着轴进行切片
  4. 第2章 DOS循环:for命令详解
  5. 腾讯上海安全团队招聘实习生
  6. 基坑监测日报模板_静兴项目部工程日报(2020/7/7)
  7. C 标准库 –stdio.h 简介
  8. 摩尔斯电码对照表—Morse code
  9. 金融风控建模评分卡系列:机器学习特征选择方法
  10. 如何用js实现数组倒序输出
  11. SD卡、TF卡、MMC卡以及eMMC芯片的介绍
  12. 奈奎斯特定理和香农定理解释
  13. android 平板z97,生命在于折腾 iGame Z97也能玩Android
  14. 试图加载格式不正确的程序 解决方法
  15. 数学建模用python分析gdp_数学建模·中国GDP趋势分析与预测
  16. vue+h5仿微信网页版聊天室vueWebChat项目
  17. 基于标定板的手眼标定
  18. python中hasattr,getattr,setattr的区别
  19. WORD中的多级列表详解
  20. php 根路由器,Pux

热门文章

  1. LTE 各频段对应频点以及频率,频点号与频率之间的转换关系
  2. 发现新大陆>think-addons可以在自己的应用中做模块化开发
  3. PhpStorm中实现代码自动换行
  4. 为何国外的人都爱用电子邮箱?注册电子邮箱有哪些好处呢?
  5. 微信小程序商城购物车页 二维数组怎么做
  6. R_Studio(学生成绩)对数据缺失值md.pattern()、异常值分析(箱线图)
  7. 几何画板在教学中的常见应用
  8. MySQL 据库管理系统
  9. drupal 6.0 入门教程 - 第一章
  10. 推荐系统常用的策略算法—Bandits