DataStream API
目录
原算子
准备工作,环境搭建
读取数据
从文件中读取数据
从集合中读取数据
从元素中读取数据
从source文件中读取数据
从kafka中读取数据
自定义source类型输出
转换算子
map转换
Filter转换
FlatMap转换
原算子
准备工作,环境搭建
为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的 urrl,用户访问 url 的时间戳),所以在这里,我们可以创建一个类 Event,将用户行为包装成它的一个对象。
import java.sql.Timestamp;
/*
应用场景*/
public class Event {public String user;public String ur1;//用户访问的urlpublic Long timestape;//用户访问url的时间public Event(){};public Event(String user,String ur1,Long timestape){this.timestape=timestape;this.ur1=ur1;this.user=user;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", ur1='" + ur1 + '\'' +", timestape=" + new Timestamp(timestape) +'}';}
}
读取数据
要先创建读取数据的环境
//创建执行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
从文件中读取数据
运用readTextFile的方法
1.创建一个文件clicks.txt
Mary, ./home, 1000
DOUDOU, ./cart, 2000
Bob, ./porp?id=100, 3000
DOUDOU, ./home, 4000
2.读取文件中的数据
//从文件中读取数据DataStreamSource<String> stream1=env.readTextFile("input/clicks.txt");stream1.print("1");env.execute();
从集合中读取数据
//从集合里读取数据ArrayList<Event> events=new ArrayList<>();events.add(new Event("DOUDOU","./home",1000L));DataStreamSource<Event> stream2=env.fromCollection(events);stream2.print("2");env.execute();
从元素中读取数据
//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));
从source文件中读取数据
首先要打开hadoop102的端口
//从socket文本流中读取数据DataStreamSource<String> stream4=env.socketTextStream("hadoop102",7777);stream4.print("4");
从kafka中读取数据
//从kafka中读取数据Properties properties=new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaStream=env.addSource(new FlinkKafkaConsumer<String>("clicks",new SimpleStringSchema(),properties));kafkaStream.print();env.execute();
开启zookeeper和kafka
zk.sh start
kf.sh start
创建用户
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
自定义source类型输出
1.创建一个实现SourceFunction<Event>的类,创造数据
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;public class ClicksSource implements SourceFunction<Event> {//声明一个标志位private Boolean running=true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {//生成随机数据Random random=new Random();//自定义选取的数据集String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};//循环生成的数据while (running){String user=users[random.nextInt(users.length)];String ur1=urls[random.nextInt(urls.length)];Long timestap= Calendar.getInstance().getTimeInMillis();sourceContext.collect(new Event(user,ur1,timestap));Thread.sleep(1000L);}}@Overridepublic void cancel() {running=false;}
}
2.实现自定义source输出
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
用户自定义source测试*/
public class SourceCustomTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> customStream=env.addSource(new ClicksSource());customStream.print();env.execute();}
}
转换算子
map转换
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//进行转换计算,提取user字段//1.使用自定义类实现MapFunction接口SingleOutputStreamOperator<String> result=stream3.map(new MyMapper());//2.使用匿名类实现SingleOutputStreamOperator<String> result2=stream3.map(new MapFunction<Event, String>() {@Overridepublic String map(Event event) throws Exception {return event.user;}});//3.传入Lambda表达式SingleOutputStreamOperator<String> relult3=stream3.map(data -> data.user);result2.print();result.print();relult3.print();env.execute();}//自定义MapFunctionpublic static class MyMapper implements MapFunction<Event,String>{@Overridepublic String map(Event event) throws Exception {return event.user;}}
}
Filter转换
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformFilterTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义类对象SingleOutputStreamOperator<Event> result1= stream3.filter(new MyFilter());//2.传入匿名类SingleOutputStreamOperator<Event> result2=stream3.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}});//输入Lambda表达式SingleOutputStreamOperator<Event> result3=stream3.filter(data ->data.user.equals("DOUDOU"));result3.print();result1.print();result2.print();env.execute();}//自定义对象public static class MyFilter implements FilterFunction<Event>{@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}}
}
FlatMap转换
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransformFlatMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义stream3.flatMap(new MyFlatMap()).print();//2.传入Lamba表达式stream3.flatMap((Event value ,Collector<String> out) -> {if (value.user.equals("DOUDOU"))out.collect(value.ur1);else if (value.user.equals("DOUDOU")){out.collect(value.user);out.collect(value.ur1);out.collect(value.timestape.toString());}}) .returns(new TypeHint<String>() {}).print("2");env.execute();}//自定义类public static class MyFlatMap implements FlatMapFunction<Event,String>{@Overridepublic void flatMap(Event event, Collector<String> collector) throws Exception {collector.collect(event.user);collector.collect(event.ur1);collector.collect(event.timestape.toString());}}
}
DataStream API相关推荐
- flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...
- Flink 1.13,面向流批一体的运行时与 DataStream API 优化
简介:在 1.13 中,针对流批一体的目标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面完善有限流作业的退出语义. 本文由社 ...
- flink fi java_Flink DataStream API编程指南
Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating).数据流最初是 ...
- DataStream API及源算子
一个Flink程序,其实就是对DataStream的各种转换.具体来说,代码基本上都由以下几部分构成 获取执行环境(execution environment) 读取数据源(source) 定义基于数 ...
- [Flink]Flink DataStream API 概览
目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Flink DataStream API 介绍
Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...
- DataStream API【3】
Sink数据输出 Flink可以使用DataStream API将数据流输出到文件.Socket.外部系统等.Flink自带了各种内置的输出格式,说明如下. writeAsText():将元素转为St ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
- DataStream API【1】
一个Flink程序,就是对DataStream的各种转换.代码基本由以下几部分构成: 获取执行环境 读取数据源 定义基于数据的各种转换操作 定义计算结果的输出位置 触发程序执行 执行环境 创建执行环境 ...
最新文章
- C++知识点总结(纯C++!!)
- iOS 10、Xcode 8 遇到部分问题解决记录(包括控制台日志不输出)
- 虚拟内存——Windows核心编程学习手札之十四
- 高一数学集合知识点整理_高一数学 | 高一数学函数图像知识点总结,实用!
- 搭建Jenkins+Sonarqub+Mysql+Android(上篇)
- Shell 基础介绍 [1]
- 剑指offer58 二叉树的下一个结点
- 静音抑制_正在研究利润以抑制创新
- T430s BIOS白名单破解
- 详解傅里叶变换与拉普拉斯,Z变化的联系
- 数字经济是党和国家定下的重要发展战略
- I.MX RT1176笔记(3)-- 双核启动和通信 MU
- 喉咙肿痛症状似流感 常州一男子延误治疗险送命
- 讯飞语音转文字 PHP demo
- JS实现:哔哩哔哩2020校园招聘技术类笔试卷(二)
- JSON快速学习入门
- 前端性能优化,之还在为多种多样的知识点整理苦恼吗,进来看看吧。
- Lua判断一个对象是否为空,包含userdata
- 安卓免ROOT卸载预装应用程序简要流程
- 用python获取某年某月/(当前)的天数
热门文章
- 前端项目的总结——为什么要组件化?
- 【基于obs插件-4】-音频频谱
- ssis 转换中文字符乱码_SSIS软件包中的字符映射转换
- 每日一犬 · 布鲁克浣熊猎犬
- 首个搭载8MP摄像头的单SoC行泊一体方案来袭,已拿下多家车企定点
- Visual C++ 6.0 ( VC 6 )带 SP6 中英文双语版 下载
- XJTU_ 西安交通大学2020大学计算机作业-第六周
- windows平台下的oracle ORA-01031的解决方法
- docker部署环境
- 大漠Android模拟器中控,最新如意大漠多线程中控模板,适用于手游模拟器脚本...