Flink 读取文本文件,聚合每一行的uid
文本数据大约30W行,内容格式如下:
001 jack
001 jack
001 rose
004 tom
004 jerry
001 sofia
005 natasha
006 catalina
006 jennifer
要求输出结果如下:
001 [jack,rose,sofia]
004 [tom,jerry]
005 [natasha]
006 [catalina, jennifer]
首先将文件的格式进行整理
public class Test2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/uid_person.txt");SingleOutputStreamOperator<Tuple2<String, Set<String>>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> map(String s) throws Exception {String[] split = s.split("\t");String uid = split[0];String name = split[1];Set<String> set = new HashSet();set.add(name);return Tuple2.of(uid, set);}});map.writeAsText("E:/test/mytest.txt").setParallelism(1);env.execute("Test");}
}
输出文件内容:
(004,[tom])
(004,[jerry])
(001,[sofia])
(001,[jack])
(001,[jack])
(001,[rose])
(006,[jennifer])
(005,[natasha])
(006,[catalina])
每行数据都变为Tuple2<String, Set<String>>,它主要是用来将两个同类型的值操作为一个同类型的值,第一个参数为前面reduce的结果,第二参数为当前的元素,注意reduce操作只能对相同类型的数据进行处理。将数据合并成一个新的数据,返回单个的结果值,
最每行数据进行keyBy-reduce操作
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/uid_person.txt");SingleOutputStreamOperator<Tuple2<String, Set<String>>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> map(String s) throws Exception {String[] split = s.split("\t");String uid = split[0];String name = split[1];Set<String> set = new HashSet();set.add(name);return Tuple2.of(uid, set);}});map.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> reduce(Tuple2<String, Set<String>> stringSetTuple2, Tuple2<String, Set<String>> t1) throws Exception {stringSetTuple2.f1.addAll(t1.f1);return Tuple2.of(stringSetTuple2.f0, stringSetTuple2.f1);}}).writeAsText("E:/test/mytest.txt").setParallelism(1);env.execute("Test");}
输出结果如下:
(001,[sofia])
(001,[sofia, jack])
(001,[sofia, jack])
(001,[sofia, rose, jack])
(006,[catalina])
(006,[jennifer, catalina])
(005,[natasha])
(004,[tom])
(004,[tom, jerry])
这样每个uid的最右一条数据就是最完整的数据。
Flink 读取文本文件,聚合每一行的uid相关推荐
- python 读取文本文件遍历每一行
本文将利用readlines 读取 以txt为格式的文本文件里的内容, 将每一行提取出来 把每一行的前导/尾随空格符删除 并将每一行存入了列表中. 利用strip split 等python自带的字符 ...
- java读取文件跳过_在Java中读取文本文件-为什么跳过行?
我是新来的,只是努力尝试读取文本文件.每行上都有一个单词和相应的数字代码.想法是将其读取并将代码和单词放在单独的变量中.我对这个领域了解不多,但是我一直在网上四处寻找,并提出了以下建议: try{ F ...
- python怎么读取txt文件第二行-Python:如何选择文本文件的第一行,以及第二行……?...
本问题已经有最佳答案,请猛点这里访问. 我遇到了一个关于Python的问题.我有一个文本文件(textfile1.txt),有几行例子: 1 2 3This is the line 1 This is ...
- python读取txt文件第一行-python读取文本文件数据
本文要点刚要: (一)读文本文件格式的数据函数:read_csv,read_table 1.读不同分隔符的文本文件,用参数sep 2.读无字段名(表头)的文本文件 ,用参数names 3.为文本文件制 ...
- flink读取不到文件_日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践...
简介: 友信金服公司推行全域的数据体系战略,通过打通和整合集团各个业务线数据,利用大数据.人工智能等技术构建统一的数据资产,如 ID-Mapping.用户标签等.友信金服用户画像项目正是以此为背景成立 ...
- flink读取不到文件_flink批处理从0到1
一.DataSet API之Data Sources(消费者之数据源) 介绍: flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口 ...
- 如何使用 System.IO 和 Visual C# 读取文本文件
本文分步介绍了如何从文本文件 (.txt) 检索信息,然后使用 ArrayList 类向用户显示该信息. 回到顶端 要求 Microsoft Visual C# 2005 或 Microsoft Vi ...
- 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
<!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...
- JAVA——读取文本文件的倒数第N行(模拟Linux中tail命令)
解决方案 1.引入jar包commons-io.jar /*** 读取文件的倒数第3行,并打印其内容*/import java.io.File; import java.io.IOException; ...
最新文章
- c语言实现文件数据删除视频,如何用c语言实现删除文件中指定的数据;例如
- C#提取HTML代码中的文字(转)
- frame buffer编程--画点功能和新增字符串代替RGBT
- Cookie和会话Session
- 小程序影视APP/追剧吧/脱离微擎/可用火车头采集小程序影视
- python程序设计与应用第2章实验
- OkHttp3详细使用教程(2)
- java 使用vsphere 创建虚拟机‘_Java数组的创建及使用
- MySQL字符集详解
- mysql索引卡死_SQL2000数据库运行缓慢、卡、死锁!请重建索引!
- 笔记本独显无输出_笔记本屏幕太小?如何拓展视野边界?
- 文件大小超过配置限制(2560000),代码洞察功能不可用怎么办?
- r语言 linux使用教程,R语言初级教程: R编程环境的搭建
- h5实现海报分享功能
- C#实现百度AI-实时语音识别转写-附源码
- Ubuntu关闭自动更新
- html实现太极图效果
- 女生学大数据好就业吗?前景如何?
- Investigating Typed Syntactic Dependencies for Targeted Sentiment Classification Using GAT(2020)
- 腾讯云服务器IP地址打不开网站注意80端口的问题