文本数据大约30W行,内容格式如下:

001  jack
001 jack
001 rose
004 tom
004 jerry
001 sofia
005 natasha
006 catalina
006 jennifer

要求输出结果如下:

001  [jack,rose,sofia]
004 [tom,jerry]
005 [natasha]
006 [catalina, jennifer]

首先将文件的格式进行整理

public class Test2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/uid_person.txt");SingleOutputStreamOperator<Tuple2<String, Set<String>>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> map(String s) throws Exception {String[] split = s.split("\t");String uid = split[0];String name = split[1];Set<String> set = new HashSet();set.add(name);return Tuple2.of(uid, set);}});map.writeAsText("E:/test/mytest.txt").setParallelism(1);env.execute("Test");}
}

输出文件内容:

(004,[tom])
(004,[jerry])
(001,[sofia])
(001,[jack])
(001,[jack])
(001,[rose])
(006,[jennifer])
(005,[natasha])
(006,[catalina])

每行数据都变为Tuple2<String, Set<String>>,它主要是用来将两个同类型的值操作为一个同类型的值,第一个参数为前面reduce的结果,第二参数为当前的元素,注意reduce操作只能对相同类型的数据进行处理。将数据合并成一个新的数据,返回单个的结果值,

最每行数据进行keyBy-reduce操作

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/uid_person.txt");SingleOutputStreamOperator<Tuple2<String, Set<String>>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> map(String s) throws Exception {String[] split = s.split("\t");String uid = split[0];String name = split[1];Set<String> set = new HashSet();set.add(name);return Tuple2.of(uid, set);}});map.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> reduce(Tuple2<String, Set<String>> stringSetTuple2, Tuple2<String, Set<String>> t1) throws Exception {stringSetTuple2.f1.addAll(t1.f1);return Tuple2.of(stringSetTuple2.f0, stringSetTuple2.f1);}}).writeAsText("E:/test/mytest.txt").setParallelism(1);env.execute("Test");}

输出结果如下:

(001,[sofia])
(001,[sofia, jack])
(001,[sofia, jack])
(001,[sofia, rose, jack])
(006,[catalina])
(006,[jennifer, catalina])
(005,[natasha])
(004,[tom])
(004,[tom, jerry])

这样每个uid的最右一条数据就是最完整的数据。

Flink 读取文本文件,聚合每一行的uid相关推荐

  1. python 读取文本文件遍历每一行

    本文将利用readlines 读取 以txt为格式的文本文件里的内容, 将每一行提取出来 把每一行的前导/尾随空格符删除 并将每一行存入了列表中. 利用strip split 等python自带的字符 ...

  2. java读取文件跳过_在Java中读取文本文件-为什么跳过行?

    我是新来的,只是努力尝试读取文本文件.每行上都有一个单词和相应的数字代码.想法是将其读取并将代码和单词放在单独的变量中.我对这个领域了解不多,但是我一直在网上四处寻找,并提出了以下建议: try{ F ...

  3. python怎么读取txt文件第二行-Python:如何选择文本文件的第一行,以及第二行……?...

    本问题已经有最佳答案,请猛点这里访问. 我遇到了一个关于Python的问题.我有一个文本文件(textfile1.txt),有几行例子: 1 2 3This is the line 1 This is ...

  4. python读取txt文件第一行-python读取文本文件数据

    本文要点刚要: (一)读文本文件格式的数据函数:read_csv,read_table 1.读不同分隔符的文本文件,用参数sep 2.读无字段名(表头)的文本文件 ,用参数names 3.为文本文件制 ...

  5. flink读取不到文件_日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践...

    简介: 友信金服公司推行全域的数据体系战略,通过打通和整合集团各个业务线数据,利用大数据.人工智能等技术构建统一的数据资产,如 ID-Mapping.用户标签等.友信金服用户画像项目正是以此为背景成立 ...

  6. flink读取不到文件_flink批处理从0到1

    一.DataSet API之Data Sources(消费者之数据源) 介绍: flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口 ...

  7. 如何使用 System.IO 和 Visual C# 读取文本文件

    本文分步介绍了如何从文本文件 (.txt) 检索信息,然后使用 ArrayList 类向用户显示该信息. 回到顶端 要求 Microsoft Visual C# 2005 或 Microsoft Vi ...

  8. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

  9. JAVA——读取文本文件的倒数第N行(模拟Linux中tail命令)

    解决方案 1.引入jar包commons-io.jar /*** 读取文件的倒数第3行,并打印其内容*/import java.io.File; import java.io.IOException; ...

最新文章

  1. c语言实现文件数据删除视频,如何用c语言实现删除文件中指定的数据;例如
  2. C#提取HTML代码中的文字(转)
  3. frame buffer编程--画点功能和新增字符串代替RGBT
  4. Cookie和会话Session
  5. 小程序影视APP/追剧吧/脱离微擎/可用火车头采集小程序影视
  6. python程序设计与应用第2章实验
  7. OkHttp3详细使用教程(2)
  8. java 使用vsphere 创建虚拟机‘_Java数组的创建及使用
  9. MySQL字符集详解
  10. mysql索引卡死_SQL2000数据库运行缓慢、卡、死锁!请重建索引!
  11. 笔记本独显无输出_笔记本屏幕太小?如何拓展视野边界?
  12. 文件大小超过配置限制(2560000),代码洞察功能不可用怎么办?
  13. r语言 linux使用教程,R语言初级教程: R编程环境的搭建
  14. h5实现海报分享功能
  15. C#实现百度AI-实时语音识别转写-附源码
  16. Ubuntu关闭自动更新
  17. html实现太极图效果
  18. 女生学大数据好就业吗?前景如何?
  19. Investigating Typed Syntactic Dependencies for Targeted Sentiment Classification Using GAT(2020)
  20. 腾讯云服务器IP地址打不开网站注意80端口的问题

热门文章

  1. 数据库表扩展字段设计思路
  2. 通用权限实现的核心设计思想
  3. java kettle log_kettle使用log4j管理输出日志
  4. MySQL 使用 OR 条件导致索引失效
  5. xdebug 远程调试
  6. Python3教程Web开发实战梳理-day7(看着不错)
  7. redis和kafka读取代码
  8. nginx lua调用redis和mongo
  9. 为什么你总成为不了架构师?
  10. xss劫持 HTML 表单,XSS 之 form表单劫持(通用明文记录)