Flink 1.7.2 dataset transformation 示例

源码

  • https://github.com/opensourceteams/flink-maven-scala

概述

  • Flink transformation示例
  • map,flatMap,filter,reduce,groupBy,reduceGroup,combineGroup,Aggregate(sum,max,min)
  • distinct,join,join funtion,leftOuterJoin,rightOuterJoin,fullOuterJoin,union,first,coGroup,cross

transformation

map

  • 对集合元素,进行一一遍历处理
  • 示例功能:给集合中的每一一行,都拼接字符串
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mapimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.map(_.toUpperCase + "字符串连接")dataSet2.print()}}
  • 输出结果
C A B D A C字符串连接
D C A B C D字符串连接

flatMap

  • 对集合元素,进行一一遍历处理,并把子集合中的数据拉到一个集合中
  • 示例功能:把行进行拆分后,再把不同的行拆分之后的元素,汇总到一个集合中

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.flatmapimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" "))dataSet2.print()}}
  • 输出结果
C
A
B
D
A
C
D
C
A
B
C
D

filter

  • 对集合元素,进行一一遍历处理,只过滤满足条件的元素
  • 示例功能:过滤空格数据
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.filterimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** filter 过滤器,对数据进行过滤处理*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a    c","d c   a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")).filter(_.nonEmpty)dataSet2.print()}}
  • 输出结果
C
A
B
D
A
C
D
C
A
B
C
D

reduce

  • 对集合中所有元素,两两之间进行reduce函数表达式的计算
  • 示例功能:统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mappackage com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于进行所有元素的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(3,5,8,9)//  3 + 5 + 8 + 9val dataSet2 = dataSet.reduce((a,b) => {println(s"${a} + ${b} = ${a +b}")a + b})dataSet2.print()}}
  • 输出结果
3 + 5 = 8
8 + 8 = 16
16 + 9 = 25
25

reduce (先groupBy)

  • 对集合中所有元素,按指定的key分组,按组执行reduce
  • 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object ReduceGroupRun2 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).reduce((x,y) => {(x._1,x._2 + y._2)})dataSet2.print()}}
  • 输出结果
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)

groupBy (class Fields)

  • 对集合中所有元素,按用例类中的属性,进行分组
  • 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByClassFieldsimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map(WordCount(_,1)).groupBy("word").reduce((x,y) => WordCount(x.word, x.count + y.count))dataSet2.print()}case class WordCount(word:String,count:Int)}
  • 输出结果
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)

groupBy (key Selector)

  • 对集合中所有元素,按key 选择器进行分组
  • 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByKeySelectorimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map((_,1)).groupBy(_._1).reduce((x,y) => (x._1,x._2 +y._2))dataSet2.print()}}
  • 输出结果
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)

reduceGroup

  • 对集合中所有元素,按指定的key分组,把相同key的元素,做为参数,调用reduceGroup()函数
  • 示例功能:按key分组统计所有数据的和

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做为参数传进来了*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中间数据* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).reduceGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next  = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
  • 输出结果
(a,3)
(b,1)
(c,1)

combineGroup

  • 对集合中所有元素,按指定的key分组,把相同key的元素,做为参数,调用combineGroup()函数,会在本地进行合并
  • 示例功能:按key分组统计所有数据的和

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.combineGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做为参数传进来了*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中间数据* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).combineGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next  = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
  • 输出结果
(a,3)
(b,1)
(c,1)

Aggregate sum

  • 按key分组 对Tuple2(String,Int) 中value进行求和操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.sum(1)dataSet2.print()}}
  • 输出结果
(f,15)

Aggregate max

  • 按key分组 对Tuple2(String,Int) 中value进行求最大值操作

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.max(1)dataSet2.print()}}
  • 输出结果
(f,5)

Aggregate min

  • 按key分组 对Tuple2(String,Int) 中value进行求最小值操作

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.min(1)dataSet2.print()}}
  • 输出结果
(f,1)

Aggregate sum (groupBy)

  • 按key分组 对Tuple2(String,Int) 中的所有元素进行求和操作
  • 示例功能:按key分组统计所有数据的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).sum(1)dataSet2.print()}}
  • 输出结果
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)

Aggregate max (groupBy) 等于 maxBy

  • 按key分组 对Tuple2(String,Int) 中value 进行求最大值
  • 示例功能:按key分组统计最大值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).max(1)dataSet2.print()}}
  • 输出结果
(d,1)
(a,2)
(f,1)
(b,1)
(c,4)
(g,1)

Aggregate min (groupBy) 等于minBy

  • 按key分组 对Tuple2(String,Int) 中value 进行求最小值
  • 示例功能:按key分组统计最小值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).min(1)dataSet2.print()}}
  • 输出结果
(d,1)
(a,1)
(f,1)
(b,1)
(c,1)
(g,1)

distinct 去重

  • 按指定的例,去重
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相当于按key进行分组,然后对组内的元素进行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.distinct(1)dataSet2.print()}}
  • 输出结果

(a,3)
(b,1)
(c,5)

join

  • 连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0)dataSet3.print()}}
  • 输出结果

((d,1),(d,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((g,1),(g,1))

join (Function)

  • 连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinFunctionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0){(x,y) => (x._1,x._2+ y._2)}dataSet3.print()}}
  • 输出结果

(f,3)
(g,6)

leftOuterJoin

  • 左外连接,左边的Dataset中的每一个元素,去连接右边的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.leftOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(y != null ){count = y._2}(x._1,x._2+ count)}}dataSet3.print()}}
  • 输出结果

(d,1)
(a,3)
(a,1)
(f,3)
(b,1)
(c,5)
(c,1)
(g,6)

rightOuterJoin

  • 右外连接,左边的Dataset中的每一个元素,去连接左边的元素

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(x != null ){count = x._2}(x._1,y._2 + count)}}dataSet3.print()}}
  • 输出结果

(f,2)
(g,2)

fullOuterJoin

  • 全外连接,左右两边的元素,全部连接

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外连接val dataSet3 = dataSet.fullOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var countY = 0;if(y != null ){countY = y._2}var countX = 0;if(x != null ){countX = x._2}(x._1,countX + countY)}}dataSet3.print()}}
  • 输出结果

(f,2)
(g,2)

union

  • 连接

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.union(dataSet2)dataSet3.print()}}
  • 输出结果

(a,1)
(d,1)
(g,1)
(f,1)
(f,1)
(g,1)
(f,1)

first n

  • 前面几条数据

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.firstimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.first(3)dataSet3.print()}}
  • 输出结果

(a,3)
(b,1)
(c,5)

coGroup

  • 相当于,取出两个数据集的所有去重的key,然后,再把第一个DataSet中的这个key的所有元素放到可迭代对象中,再把第二个DataSet中的这个key的所有元素放到可迭代对象中

package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cogroupimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collectorobject Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("a",1))val dataSet2 = env.fromElements(("a",1),("f",1))//全外连接val dataSet3 = dataSet.coGroup(dataSet2).where(0).equalTo(0){new CoGroupFunction[(String,Int),(String,Int), Collector[(String,Int)]] {override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, Int)], out: Collector[Collector[(String, Int)]]): Unit = {println("==============开始")println("first")println(first)val iteratorFirst = first.iterator()while (iteratorFirst.hasNext()){println(iteratorFirst.next())}println("second")println(second)val iteratorSecond = second.iterator()while (iteratorSecond.hasNext()){println(iteratorSecond.next())}println("==============结束")}}}dataSet3.print()}}
  • 输出结果

==============开始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@3500e7b0
(a,1)
(a,1)
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@41230ea2
(a,1)
==============结束
==============开始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@14602d0a
(g,1)
second
[]
==============结束
==============开始
first
[]
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@2b0a15b5
(f,1)
==============结束Process finished with exit code 0

cross

  • 交叉连接
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.crossimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外连接val dataSet3 = dataSet.cross(dataSet2)dataSet3.print()}}
  • 输出结果
((a,1),(d,1))
((a,1),(f,1))
((a,1),(g,1))
((a,1),(f,1))
((g,1),(d,1))
((g,1),(f,1))
((g,1),(g,1))
((g,1),(f,1))
((f,1),(d,1))
((f,1),(f,1))
((f,1),(g,1))
((f,1),(f,1))

Flink 1.7.2 dataset transformation 示例相关推荐

  1. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  2. flink中datastream和dataset各自print()的异同

    根据[1]   负责输出的进程 默认能打印到提交任务的终端吗? print()结果 输出位置 在WEB UI查看位置 批处理(DataSet) Job Manager 能 Master节点的 提交任务 ...

  3. Flink基于Alink中文情感分析示例(Java版本)

    Alink是基于Flink的机器学习算法平台,欢迎访问Alink的github获取下载链接及更多信息. alibaba/Alink github.com 情感分析是对带有情感色彩(褒义贬义/正向负向) ...

  4. flink CEP检测温度事件微型示例

    flink CEP Apache Flink提供FlinkCEP库,该库提供用于执行复杂事件处理的API.该库由以下核心组件组成: 事件流 模式定义 模式检测 警报生成 FlinkCEP在Flink的 ...

  5. flink scala shell命令行使用示例

    scala shell命令行 用于测试flink集群健康程度. start-scala-shell.sh remote node01 8081 启动flink-scala-shell: 批计算和流计算 ...

  6. 【08】Flink 之 DataSet API(二):Transformation 操作

    1.DataSet Transformation 部分详解 Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作 FlatMap:输入一个元素,可以返回零个,一个或者多个元素 MapP ...

  7. flink dataset api使用及原理

    随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...

  8. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  9. Flink读取Netty数据示例代码

    本示例代码记录每张表数据的爬取进度 每张表实时ID 每张表实时爬取数量 记录每张表记录总数 import lombok.AllArgsConstructor; import lombok.Builde ...

最新文章

  1. python教程第四版pdf下载-Python参考手册 第4版高清中文PDF下载
  2. Spark生态顶级项目汇总
  3. lly dependent on columns in GROUP BY clause; this is incompatible with sql_m
  4. 从零开始入门 K8s | 应用配置管理
  5. MoCoV3:何恺明团队新作!解决Transformer自监督训练不稳定问题!
  6. 一些前端常用工具的生命周期
  7. 参数展示初始三层架构
  8. 使用GDAL工具对FY3系列卫星数据进行校正
  9. 摊牌了!2021年3D视觉算法岗求职群
  10. Sentaurus SDE
  11. 编写myqq即时聊天脚本,实现相互通信(tcp协议)
  12. 电路中的电阻_电感_电容的特性
  13. 原型工具Axure:常用效果制作(选中、淘宝网导航、轮播图、toast效果、呼出键盘、省市二级联动、步进器、订单详情案例、中继器)
  14. 最受欢迎的网站 (转载)
  15. 目前我国每小时就有1名电动自行车骑行者死于交通事故
  16. Mac Apache php 配置域名
  17. win10不兼容 软件 ietest
  18. linux floating ip,neutron分配浮动IP(floating ip)的方法
  19. 新冠死亡率居高不下,为什么偏偏是意大利?
  20. ps 仿章工具的使用

热门文章

  1. android 中断处理流程,Android P的native crash处理流程
  2. 函数库属于计算机的,API库函数
  3. Caused by: java.lang.NoClassDefFoundError: org/apache/commons/pool/BasePoolableObjectFactory
  4. Oracle中关于计算时间差的例子:
  5. 最容易进的大厂工作,百度经典百题
  6. POSIX和SYSTEM的消息队列应该注意的问题
  7. Python的Pexpect详解 [图片]
  8. 剑指offer(刷题21-30)--c++,Python版本
  9. 《Python Cookbook 3rd》笔记(2.7):最短匹配模式
  10. java int 包_int readInt()