【2】flink数据流转换算子
【README】
本文记录了flink对数据的转换操作,包括
- 基本转换,map,flatMap,filter;
- 滚动聚合(min minBy max maxBy sum);
- 规约聚合-reduce;
- 分流;
- connect连接流;
- union合流;
- 富函数;
- 重分区;
本文使用的flink为 1.14.4 版本;maven依赖如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>
本文部分内容参考了 flink 官方文档:
概览 | Apache Flink算子 # 用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。这部分内容将描述 Flink DataStream API 中基本的数据转换 API,数据转换后各种数据分区方式,以及算子的链接策略。数据流转换 # Map # DataStream → DataStream # 输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:Java DataStream dataStream = //... dataStream.map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); Scala dataStream.map { x => x * 2 } Python data_stream = env.from_collection(collection=[1, 2, 3, 4, 5]) data_stream.map(lambda x: 2 * x, output_type=Types.,>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/overview/
【1】基本转换算子
包括 map-转换, flatMap-打散,filter-过滤;
1)代码如下:
/*** @Description flink对数据流的基本转换* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> baseStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 1-map-转换或映射或函数; 把string转为长度输出
// DataStream<Integer> mapStream = baseStream.map(x->x.length());DataStream<Integer> mapStream = baseStream.map(String::length);// 2-flatMap-打散-按照逗号分割字段DataStream<String> flatMapStream = baseStream.flatMap((String raw, Collector<String> collector)->{for (String rd : raw.split(",")) {collector.collect(rd);}}).returns(Types.STRING);// 3-filter-过滤-筛选 sensor_1 开头的结束DataStream<String> filterStream = baseStream.filter(x->x.startsWith("sensor_1"));// 打印输出mapStream.print("mapStream");flatMapStream.print("flatMapStream");filterStream.print("filterStream");// 执行env.execute("BaseTransformStreamJob");}
}
sensor 文本文件如下:
sensor_1,12341561,36.1
sensor_2,12341562,33.5
sensor_3,12341563,39.9
sensor_1,12341573,43.1
打印结果:
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341561
flatMapStream> 36.1
filterStream> sensor_1,12341561,36.1
mapStream> 22
flatMapStream> sensor_2
flatMapStream> 12341562
flatMapStream> 33.5
mapStream> 22
flatMapStream> sensor_3
flatMapStream> 12341563
flatMapStream> 39.9
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341573
flatMapStream> 43.1
filterStream> sensor_1,12341573,43.1
【2】滚动聚合算子
keyBy算子-根据key对数据流分组,因为聚合前必须前分组,类似于sql的group by;
keyBy算子的作用:
- 逻辑把一个数据流拆分为多个分区(但还是同一个流),每个分区包含相同key(相同hash)的元素,底层对key求hash来实现;
- 在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。
keyBy可以形成 KeyedStream;
然后滚动聚合算子可以对 KeyStream 进行操作,滚动聚合算子如下:
- sum
- min
- max
- minBy
- maxBy
【2.1】代码示例
/*** @Description 滚动聚合算子* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class TransformTest2_RollingAgg {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子对数据流分组,并做滚动聚合(单字段分组)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// keyBy 多字段分组
// KeyedStream<SensorReading, Tuple1<String>> keyedStream = sensorStream.keyBy(new KeySelector<SensorReading, Tuple1<String>>() {
// @Override
// public Tuple1<String> getKey(SensorReading sensorReading) throws Exception {
// return Tuple1.of(sensorReading.getId());
// }
// });// max聚合DataStream<SensorReading> maxTempratureStream = keyedStream.max("temperature");// maxBy 聚合DataStream<SensorReading> maxbyTempratureStream = keyedStream.maxBy("temperature");// 打印输出maxTempratureStream.print("maxTempratureStream");// 打印输出maxbyTempratureStream.print("maxbyTempratureStream");// 执行env.execute("maxTempratureStreamJob");}
}
sensor文本内容:
sensor_1,11,36.1 sensor_2,21,33.1 sensor_1,12,36.2 sensor_1,13,36.3 sensor_2,22,33.2
max聚合打印结果:
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.2}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.3}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.2}
maxBy聚合打印结果:
maxBy> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
maxBy> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
maxBy> SensorRd{id='sensor_1', timestamp=12, temperature=36.2}
maxBy> SensorRd{id='sensor_1', timestamp=13, temperature=36.3}
maxBy> SensorRd{id='sensor_2', timestamp=22, temperature=33.2}
小结,max与maxBy区别:
- max:把聚合字段(最大温度值)取出来,其他字段和第一条记录保持一致;
- maxBy:把聚合字段(最大温度值)取出来,且连同最大温度值所在记录的其他字段一并取出来;
同理 min与minby,本文不再演示;
补充: 聚合时要先分组,可以根据单字段分组,也可以根据多个字段分组;
上述代码注释部分给出了多个字段分组的例子,一个组记录称为Tuple,元组;
1个字段叫 Tuple1,2个字段叫Tuple2;....
【2.2】规约聚合-reduce
定义:
在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。
场景:根据sensorid分组后,形成keyedStream,然后查询最大温度,且最新时间戳;即多个聚合算子;
代码
/*** @Description reduce规约聚合算子 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class TransformTest3_Reduce {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子对数据流分组,并做滚动聚合(单字段分组)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// reduce规约聚合-查询最大温度,且最新时间戳DataStream<SensorReading> reduceStream = keyedStream.reduce((a,b)->new SensorReading(a.getId(), Math.max(a.getTimestamp(),b.getTimestamp()), Math.max(a.getTemperature(),b.getTemperature())));// 打印输出reduceStream.print("reduceStream");// 执行env.execute("reduceStreamJob");}
}
sensor文本:
sensor_1,11,36.1 sensor_2,21,33.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,31.2
打印结果:
reduceStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
reduceStream> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_2', timestamp=22, temperature=33.1}
【3】分流(把一个流切分为多个流)
flink 1.14.4 移除了 split 算子,refer2 https://issues.apache.org/jira/browse/FLINK-19083
转而使用 side output 侧输出实现,refer2
Side Outputs | Apache Flink
【3.1】 切分流(flink移除了split方法,需要使用 side output 来实现流切分)
1)代码,启动大于30度算高温,否则低温;
通过实现 ProcessFunction 来实现;
public class TransformTest4_SplitStream {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照温度值是否大于30度,分为两条流-高温和低温OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把数据发送到侧输出context.output(record.getTemperature()>30? highTag : lowTag, record);// 把数据发送到常规输出collector.collect(record);}});// 2, 选择流打印输出splitStream.getSideOutput(highTag).print("high");splitStream.getSideOutput(lowTag).print("low");// 执行env.execute("reduceStreamJob");}
}
sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2
打印结果:
high> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
low> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
high> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
high> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
low> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
以上分流代码refer2 Process function: a versatile tool in Flink datastream API | Develop Paper
【4】connect 连接流
1)定义: 把多个流连接为一个流,叫做连接流,连接流中的子流的各自元素类型可以不同;
2)步骤:
- 把2个流 connect 连接再一起形成 ConnectedStream;
- 把连接流 通过 map 得到数据流;
代码:
/*** @Description connect-连接流* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class TransformTest5_ConnectStream {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从kafka读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照温度值是否大于30度,分为两条流-高温和低温OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把数据发送到侧输出context.output(record.getTemperature() > 30 ? highTag : lowTag, record);// 把数据发送到常规输出collector.collect(record);}});// 得到高温和低温流DataStream<SensorReading> highStream = splitStream.getSideOutput(highTag);DataStream<SensorReading> lowStream = splitStream.getSideOutput(lowTag);// 2 把2个流连接为1个流(子流1的元素为3元组,子流2的元素为2元组)ConnectedStreams<SensorReading, SensorReading> connectedStreams = highStream.connect(lowStream);DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<SensorReading, SensorReading, Object>() {@Overridepublic Object map1(SensorReading rd) throws Exception {return new Tuple3<>(rd.getId(), rd.getTemperature(), "high"); // map1 作用于第1个流 highStream}@Overridepublic Object map2(SensorReading rd) throws Exception {return new Tuple2<>(rd.getId(), rd.getTemperature()); // map2 作用于第2个流 lowStream}});// 3 打印结果resultStream.print("connectedStream");// 执行env.execute("connectedStreamJob");}
}
sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2
打印结果:
connectedStream> (sensor_1,36.1,high)
connectedStream> (sensor_2,23.1)
connectedStream> (sensor_1,36.2,high)
connectedStream> (sensor_2,11.2)
connectedStream> (sensor_1,30.3,high)
【5】合流-union
上述connect,只能连接两条流,如果要合并多条流,connect需要多次连接,不太适合;
如果要合并多条流,需要用 union,前提是 多个流的元素数据类型需要相同;
1)代码
// 2 把3个流合并为1个流DataStream<SensorReading> inputStream2 = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor2.txt").map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});DataStream<SensorReading> unionStream = highStream.union(lowStream,inputStream2);// 3 打印结果unionStream.print("unionStream");// 执行env.execute("unionStreamJob");
打印结果:
unionStream> SensorRd{id='sensor2_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor2_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor2_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor2_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor2_2', timestamp=22, temperature=11.2}
unionStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
【6】自定义函数 UDF user-defined function
flink 暴露了所有udf 函数的接口,如MapFunction, FilterFunction, ProcessFunction等;可以理解为 java8引入的 函数式接口;
可以参考官方的udf文档:
ck自定义函数 | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/
【6.1】富函数
1)复函数可以获取上下文信息,而普通函数则不行;
代码:
/*** @Description 富函数* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class TransformTest7_RichFunction {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从文件读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 自定义富函数DataStream<Tuple3<String,Integer,Integer>> richMapStream = sensorStream.map(new MyRichMapFunction());// 3 打印结果richMapStream.print("richMapStream");// 执行env.execute("richMapStreamJob");}// 富函数类static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple3<String, Integer, Integer>> {@Overridepublic Tuple3<String, Integer, Integer> map(SensorReading record) throws Exception {// 富函数可以获取运行时上下文的属性 getRuntimeContext() ,普通map函数则不行return new Tuple3<String, Integer, Integer>(record.getId(), record.getId().length(), getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化工作,一般是定义状态, 或者建立数据库连接System.out.println("open db conn");}@Overridepublic void close() throws Exception {// 关闭连接,清空状态System.out.println("close db conn");}}
}
sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2
打印结果:
open db conn
open db conn
richMapStream:1> (sensor_1,8,0)
richMapStream:2> (sensor_1,8,1)
richMapStream:1> (sensor_2,8,0)
richMapStream:2> (sensor_2,8,1)
richMapStream:1> (sensor_1,8,0)
close db conn
close db conn
从打印结果可以看出,每个子任务(线程)都会执行 open close方法 ,tuple3中的第3个字段是 执行上下文的任务id(这是富函数才可以获得上下文);
【7】flink中的数据重分区
1)flink中的分区指的是: taskmanager中的槽,即线程;
分区操作有:
- shuffle-洗牌乱分区;
- keyBy-按照key分区;
global 把数据转到第1个分区
2)代码 :
/*** @Description 重分区* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class TransformTest8_Partition2 {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从文件读取数据DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 转换为 SensorReader pojo类型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1-shuffle 洗牌(乱分区)DataStream<SensorReading> shuffleStream = sensorStream.shuffle();shuffleStream.print("shuffleStream");// 2-keyby 按照key分区DataStream<SensorReading> keybyStream = sensorStream.keyBy(SensorReading::getId);
// keybyStream.print("keybyStream");// 3-global 把数据转到第1个分区DataStream<SensorReading> globalStream = sensorStream.global();
// globalStream.print("globalStream");// 执行env.execute("partitionJob");}
}
sensor文本:
sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2
原生分区结果:(重分区前)
rawStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
rawStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
rawStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
rawStream:2> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
rawStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
shuffle-洗牌乱分区结果:
shuffleStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
shuffleStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
shuffleStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
shuffleStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
shuffleStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
keyby-按照key进行分区的结果:
keybyStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
keybyStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
keybyStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
keybyStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
keybyStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
global-把数据转到第1个分区的打印结果:
globalStream:1> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
globalStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
globalStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
globalStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
globalStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
【2】flink数据流转换算子相关推荐
- Flink 常用API(2)——转换算子+聚合算子
转换算子(Transformation) 映射(map) 用于将数据流中的数据进行转换,形成新的数据流 "一一映射",消费一个元素就产出一个元素 参数:接口 MapFunction ...
- 0006-Flink原理(Flink数据流 执行图)
一.程序与数据流转换(DataFlow) • 所有的Flink程序都是由三部分组成的: Source .Transformation 和 Sink. • Source 负责读取数据源,Transfor ...
- Flink DataStream iterate算子的简单使用
Flink DataStream iterate算子的简单使用 由于DataStream程序可能永远不会完成,因此没有最大迭代次数.相反你需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转 ...
- Spark的RDD转换算子
目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...
- 《BI那点儿事》数据流转换——排序
<BI那点儿事>数据流转换--排序 原文:<BI那点儿事>数据流转换--排序 排序转换允许对数据流中的数据按照某一列进行排序.这是五个常用的转换之一.连接数据源打开编辑界面,编 ...
- spark应用程序转换_Spark—RDD编程常用转换算子代码实例
Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U] ...
- 《BI那点儿事》数据流转换——派生列
原文:<BI那点儿事>数据流转换--派生列 派生列转换通过对转换输入列应用表达式来创建新列值. 表达式可以包含来自转换输入的变量.函数.运算符和列的任意组合. 结果可作为新列添加,也可作为 ...
- 《BI那点儿事》数据流转换——透视
原文:<BI那点儿事>数据流转换--透视 这个和T-SQL中的PIVOT和UNPIVOT的作用是一样的.数据透视转换可以将数据规范或使它在报表中更具可读性. 通过透视列值的输入数据,透视转 ...
- spark常见转换算子(transformation)的操作
package com.meng.nan.day717import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkCon ...
最新文章
- Python基础15-函数闭包与装饰器
- Log4Net组件的应用详解
- Python 二分查找算法
- JPA J2SE 桌面应用范例
- ubuntu 命令行下查看网页 w3m
- [APIO2013]机器人[搜索、斯坦纳树]
- 使用Python扩展库spleeter分离MP3音乐文件中的伴奏和人声
- python pandas series加速原理_python pandas中对Series数据进行轴向连接的实例
- rhel6+apache2.4+mysql5.7+php5.6部署LAMP架构
- 如何成功发布一个MSMQ的Windows服务
- python程序设计实践教程答案江苏_《Python程序设计》习题与答案python教材答案
- 夜神模拟器连接手柄无反应_夜神模拟器怎么连接手柄?夜神模拟器连接手柄具体操作...
- 常用URLscheme整理+不同场景下打开URLscheme的方法
- 安卓中如何获取MD5码
- logback无忧配置文件
- BDBR和BD-PSNR
- rxjava背压_RxJava背压
- 无效的月份oracle,Oracle插入失败:无效的月份
- G003-186-07
- 特征选择(一)-维数问题与类内距离