Flink 1.7.2 dataset transformation 示例
Flink 1.7.2 dataset transformation 示例
源码
- https://github.com/opensourceteams/flink-maven-scala
概述
- Flink transformation示例
- map,flatMap,filter,reduce,groupBy,reduceGroup,combineGroup,Aggregate(sum,max,min)
- distinct,join,join funtion,leftOuterJoin,rightOuterJoin,fullOuterJoin,union,first,coGroup,cross
transformation
map
- 对集合元素,进行一一遍历处理
- 示例功能:给集合中的每一一行,都拼接字符串
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mapimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.map(_.toUpperCase + "字符串连接")dataSet2.print()}}
- 输出结果
C A B D A C字符串连接
D C A B C D字符串连接
flatMap
- 对集合元素,进行一一遍历处理,并把子集合中的数据拉到一个集合中
- 示例功能:把行进行拆分后,再把不同的行拆分之后的元素,汇总到一个集合中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.flatmapimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" "))dataSet2.print()}}
- 输出结果
C
A
B
D
A
C
D
C
A
B
C
D
filter
- 对集合元素,进行一一遍历处理,只过滤满足条件的元素
- 示例功能:过滤空格数据
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.filterimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** filter 过滤器,对数据进行过滤处理*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")).filter(_.nonEmpty)dataSet2.print()}}
- 输出结果
C
A
B
D
A
C
D
C
A
B
C
D
reduce
- 对集合中所有元素,两两之间进行reduce函数表达式的计算
- 示例功能:统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mappackage com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于进行所有元素的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(3,5,8,9)// 3 + 5 + 8 + 9val dataSet2 = dataSet.reduce((a,b) => {println(s"${a} + ${b} = ${a +b}")a + b})dataSet2.print()}}
- 输出结果
3 + 5 = 8
8 + 8 = 16
16 + 9 = 25
25
reduce (先groupBy)
- 对集合中所有元素,按指定的key分组,按组执行reduce
- 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object ReduceGroupRun2 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).reduce((x,y) => {(x._1,x._2 + y._2)})dataSet2.print()}}
- 输出结果
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)
groupBy (class Fields)
- 对集合中所有元素,按用例类中的属性,进行分组
- 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByClassFieldsimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map(WordCount(_,1)).groupBy("word").reduce((x,y) => WordCount(x.word, x.count + y.count))dataSet2.print()}case class WordCount(word:String,count:Int)}
- 输出结果
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)
groupBy (key Selector)
- 对集合中所有元素,按key 选择器进行分组
- 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByKeySelectorimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map((_,1)).groupBy(_._1).reduce((x,y) => (x._1,x._2 +y._2))dataSet2.print()}}
- 输出结果
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)
reduceGroup
- 对集合中所有元素,按指定的key分组,把相同key的元素,做为参数,调用reduceGroup()函数
- 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做为参数传进来了*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中间数据* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).reduceGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
- 输出结果
(a,3)
(b,1)
(c,1)
combineGroup
- 对集合中所有元素,按指定的key分组,把相同key的元素,做为参数,调用combineGroup()函数,会在本地进行合并
- 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.combineGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做为参数传进来了*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中间数据* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).combineGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
- 输出结果
(a,3)
(b,1)
(c,1)
Aggregate sum
- 按key分组 对Tuple2(String,Int) 中value进行求和操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.sum(1)dataSet2.print()}}
- 输出结果
(f,15)
Aggregate max
- 按key分组 对Tuple2(String,Int) 中value进行求最大值操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.max(1)dataSet2.print()}}
- 输出结果
(f,5)
Aggregate min
- 按key分组 对Tuple2(String,Int) 中value进行求最小值操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.min(1)dataSet2.print()}}
- 输出结果
(f,1)
Aggregate sum (groupBy)
- 按key分组 对Tuple2(String,Int) 中的所有元素进行求和操作
- 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).sum(1)dataSet2.print()}}
- 输出结果
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)
Aggregate max (groupBy) 等于 maxBy
- 按key分组 对Tuple2(String,Int) 中value 进行求最大值
- 示例功能:按key分组统计最大值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).max(1)dataSet2.print()}}
- 输出结果
(d,1)
(a,2)
(f,1)
(b,1)
(c,4)
(g,1)
Aggregate min (groupBy) 等于minBy
- 按key分组 对Tuple2(String,Int) 中value 进行求最小值
- 示例功能:按key分组统计最小值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).min(1)dataSet2.print()}}
- 输出结果
(d,1)
(a,1)
(f,1)
(b,1)
(c,1)
(g,1)
distinct 去重
- 按指定的例,去重
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.distinct(1)dataSet2.print()}}
- 输出结果
(a,3)
(b,1)
(c,5)
join
- 连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0)dataSet3.print()}}
- 输出结果
((d,1),(d,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((g,1),(g,1))
join (Function)
- 连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinFunctionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0){(x,y) => (x._1,x._2+ y._2)}dataSet3.print()}}
- 输出结果
(f,3)
(g,6)
leftOuterJoin
- 左外连接,左边的Dataset中的每一个元素,去连接右边的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.leftOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(y != null ){count = y._2}(x._1,x._2+ count)}}dataSet3.print()}}
- 输出结果
(d,1)
(a,3)
(a,1)
(f,3)
(b,1)
(c,5)
(c,1)
(g,6)
rightOuterJoin
- 右外连接,左边的Dataset中的每一个元素,去连接左边的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(x != null ){count = x._2}(x._1,y._2 + count)}}dataSet3.print()}}
- 输出结果
(f,2)
(g,2)
fullOuterJoin
- 全外连接,左右两边的元素,全部连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.fullOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var countY = 0;if(y != null ){countY = y._2}var countX = 0;if(x != null ){countX = x._2}(x._1,countX + countY)}}dataSet3.print()}}
- 输出结果
(f,2)
(g,2)
union
- 连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.union(dataSet2)dataSet3.print()}}
- 输出结果
(a,1)
(d,1)
(g,1)
(f,1)
(f,1)
(g,1)
(f,1)
first n
- 前面几条数据
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.firstimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.first(3)dataSet3.print()}}
- 输出结果
(a,3)
(b,1)
(c,5)
coGroup
- 相当于,取出两个数据集的所有去重的key,然后,再把第一个DataSet中的这个key的所有元素放到可迭代对象中,再把第二个DataSet中的这个key的所有元素放到可迭代对象中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cogroupimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collectorobject Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("a",1))val dataSet2 = env.fromElements(("a",1),("f",1))//全外连接val dataSet3 = dataSet.coGroup(dataSet2).where(0).equalTo(0){new CoGroupFunction[(String,Int),(String,Int), Collector[(String,Int)]] {override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, Int)], out: Collector[Collector[(String, Int)]]): Unit = {println("==============开始")println("first")println(first)val iteratorFirst = first.iterator()while (iteratorFirst.hasNext()){println(iteratorFirst.next())}println("second")println(second)val iteratorSecond = second.iterator()while (iteratorSecond.hasNext()){println(iteratorSecond.next())}println("==============结束")}}}dataSet3.print()}}
- 输出结果
==============开始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@3500e7b0
(a,1)
(a,1)
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@41230ea2
(a,1)
==============结束
==============开始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@14602d0a
(g,1)
second
[]
==============结束
==============开始
first
[]
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@2b0a15b5
(f,1)
==============结束Process finished with exit code 0
cross
- 交叉连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.crossimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.cross(dataSet2)dataSet3.print()}}
- 输出结果
((a,1),(d,1))
((a,1),(f,1))
((a,1),(g,1))
((a,1),(f,1))
((g,1),(d,1))
((g,1),(f,1))
((g,1),(g,1))
((g,1),(f,1))
((f,1),(d,1))
((f,1),(f,1))
((f,1),(g,1))
((f,1),(f,1))
Flink 1.7.2 dataset transformation 示例相关推荐
- Flink教程(07)- Flink批流一体API(Transformation示例)
文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...
- flink中datastream和dataset各自print()的异同
根据[1] 负责输出的进程 默认能打印到提交任务的终端吗? print()结果 输出位置 在WEB UI查看位置 批处理(DataSet) Job Manager 能 Master节点的 提交任务 ...
- Flink基于Alink中文情感分析示例(Java版本)
Alink是基于Flink的机器学习算法平台,欢迎访问Alink的github获取下载链接及更多信息. alibaba/Alink github.com 情感分析是对带有情感色彩(褒义贬义/正向负向) ...
- flink CEP检测温度事件微型示例
flink CEP Apache Flink提供FlinkCEP库,该库提供用于执行复杂事件处理的API.该库由以下核心组件组成: 事件流 模式定义 模式检测 警报生成 FlinkCEP在Flink的 ...
- flink scala shell命令行使用示例
scala shell命令行 用于测试flink集群健康程度. start-scala-shell.sh remote node01 8081 启动flink-scala-shell: 批计算和流计算 ...
- 【08】Flink 之 DataSet API(二):Transformation 操作
1.DataSet Transformation 部分详解 Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作 FlatMap:输入一个元素,可以返回零个,一个或者多个元素 MapP ...
- flink dataset api使用及原理
随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...
- Flink教程(09)- Flink批流一体API(Connectors示例)
文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...
- Flink读取Netty数据示例代码
本示例代码记录每张表数据的爬取进度 每张表实时ID 每张表实时爬取数量 记录每张表记录总数 import lombok.AllArgsConstructor; import lombok.Builde ...
最新文章
- python教程第四版pdf下载-Python参考手册 第4版高清中文PDF下载
- Spark生态顶级项目汇总
- lly dependent on columns in GROUP BY clause; this is incompatible with sql_m
- 从零开始入门 K8s | 应用配置管理
- MoCoV3:何恺明团队新作!解决Transformer自监督训练不稳定问题!
- 一些前端常用工具的生命周期
- 参数展示初始三层架构
- 使用GDAL工具对FY3系列卫星数据进行校正
- 摊牌了!2021年3D视觉算法岗求职群
- Sentaurus SDE
- 编写myqq即时聊天脚本,实现相互通信(tcp协议)
- 电路中的电阻_电感_电容的特性
- 原型工具Axure:常用效果制作(选中、淘宝网导航、轮播图、toast效果、呼出键盘、省市二级联动、步进器、订单详情案例、中继器)
- 最受欢迎的网站 (转载)
- 目前我国每小时就有1名电动自行车骑行者死于交通事故
- Mac Apache php 配置域名
- win10不兼容 软件 ietest
- linux floating ip,neutron分配浮动IP(floating ip)的方法
- 新冠死亡率居高不下,为什么偏偏是意大利?
- ps 仿章工具的使用
热门文章
- android 中断处理流程,Android P的native crash处理流程
- 函数库属于计算机的,API库函数
- Caused by: java.lang.NoClassDefFoundError: org/apache/commons/pool/BasePoolableObjectFactory
- Oracle中关于计算时间差的例子:
- 最容易进的大厂工作,百度经典百题
- POSIX和SYSTEM的消息队列应该注意的问题
- Python的Pexpect详解 [图片]
- 剑指offer(刷题21-30)--c++,Python版本
- 《Python Cookbook 3rd》笔记(2.7):最短匹配模式
- java int 包_int readInt()