目录

原算子

准备工作,环境搭建

读取数据

从文件中读取数据

从集合中读取数据

从元素中读取数据

从source文件中读取数据

从kafka中读取数据

自定义source类型输出

转换算子

map转换

Filter转换

FlatMap转换


原算子

准备工作,环境搭建

为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的 urrl,用户访问 url 的时间戳),所以在这里,我们可以创建一个类 Event,将用户行为包装成它的一个对象。

import java.sql.Timestamp;
/*
应用场景*/
public class Event {public String user;public String ur1;//用户访问的urlpublic Long timestape;//用户访问url的时间public Event(){};public Event(String user,String ur1,Long timestape){this.timestape=timestape;this.ur1=ur1;this.user=user;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", ur1='" + ur1 + '\'' +", timestape=" + new Timestamp(timestape)  +'}';}
}

读取数据

要先创建读取数据的环境

//创建执行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

从文件中读取数据

运用readTextFile的方法

1.创建一个文件clicks.txt

Mary, ./home, 1000
DOUDOU, ./cart, 2000
Bob, ./porp?id=100, 3000
DOUDOU, ./home, 4000

2.读取文件中的数据

 //从文件中读取数据DataStreamSource<String> stream1=env.readTextFile("input/clicks.txt");stream1.print("1");env.execute();

从集合中读取数据

 //从集合里读取数据ArrayList<Event> events=new ArrayList<>();events.add(new Event("DOUDOU","./home",1000L));DataStreamSource<Event> stream2=env.fromCollection(events);stream2.print("2");env.execute();

从元素中读取数据

//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));

从source文件中读取数据

首先要打开hadoop102的端口

  //从socket文本流中读取数据DataStreamSource<String> stream4=env.socketTextStream("hadoop102",7777);stream4.print("4");

从kafka中读取数据

//从kafka中读取数据Properties properties=new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaStream=env.addSource(new FlinkKafkaConsumer<String>("clicks",new SimpleStringSchema(),properties));kafkaStream.print();env.execute();

开启zookeeper和kafka

zk.sh start
kf.sh start

创建用户

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks

自定义source类型输出

1.创建一个实现SourceFunction<Event>的类,创造数据

import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;public class ClicksSource implements SourceFunction<Event> {//声明一个标志位private Boolean running=true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {//生成随机数据Random random=new Random();//自定义选取的数据集String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};//循环生成的数据while (running){String user=users[random.nextInt(users.length)];String ur1=urls[random.nextInt(urls.length)];Long timestap= Calendar.getInstance().getTimeInMillis();sourceContext.collect(new Event(user,ur1,timestap));Thread.sleep(1000L);}}@Overridepublic void cancel() {running=false;}
}

2.实现自定义source输出

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
用户自定义source测试*/
public class SourceCustomTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> customStream=env.addSource(new ClicksSource());customStream.print();env.execute();}
}

转换算子

map转换

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//进行转换计算,提取user字段//1.使用自定义类实现MapFunction接口SingleOutputStreamOperator<String> result=stream3.map(new MyMapper());//2.使用匿名类实现SingleOutputStreamOperator<String> result2=stream3.map(new MapFunction<Event, String>() {@Overridepublic String map(Event event) throws Exception {return event.user;}});//3.传入Lambda表达式SingleOutputStreamOperator<String> relult3=stream3.map(data -> data.user);result2.print();result.print();relult3.print();env.execute();}//自定义MapFunctionpublic static class MyMapper implements MapFunction<Event,String>{@Overridepublic String map(Event event) throws Exception {return event.user;}}
}

Filter转换

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformFilterTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义类对象SingleOutputStreamOperator<Event> result1= stream3.filter(new MyFilter());//2.传入匿名类SingleOutputStreamOperator<Event> result2=stream3.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}});//输入Lambda表达式SingleOutputStreamOperator<Event> result3=stream3.filter(data ->data.user.equals("DOUDOU"));result3.print();result1.print();result2.print();env.execute();}//自定义对象public static class MyFilter implements FilterFunction<Event>{@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}}
}

FlatMap转换

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransformFlatMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义stream3.flatMap(new MyFlatMap()).print();//2.传入Lamba表达式stream3.flatMap((Event value ,Collector<String> out) -> {if (value.user.equals("DOUDOU"))out.collect(value.ur1);else if (value.user.equals("DOUDOU")){out.collect(value.user);out.collect(value.ur1);out.collect(value.timestape.toString());}}) .returns(new TypeHint<String>() {}).print("2");env.execute();}//自定义类public static class MyFlatMap implements FlatMapFunction<Event,String>{@Overridepublic void flatMap(Event event, Collector<String> collector) throws Exception {collector.collect(event.user);collector.collect(event.ur1);collector.collect(event.timestape.toString());}}
}

DataStream API相关推荐

  1. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  2. Flink 1.13,面向流批一体的运行时与 DataStream API 优化

    简介:在 1.13 中,针对流批一体的目标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面完善有限流作业的退出语义. 本文由社 ...

  3. flink fi java_Flink DataStream API编程指南

    Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating).数据流最初是 ...

  4. DataStream API及源算子

    一个Flink程序,其实就是对DataStream的各种转换.具体来说,代码基本上都由以下几部分构成 获取执行环境(execution environment) 读取数据源(source) 定义基于数 ...

  5. [Flink]Flink DataStream API 概览

    目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...

  6. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  7. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  8. DataStream API【3】

    Sink数据输出 Flink可以使用DataStream API将数据流输出到文件.Socket.外部系统等.Flink自带了各种内置的输出格式,说明如下. writeAsText():将元素转为St ...

  9. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  10. DataStream API【1】

    一个Flink程序,就是对DataStream的各种转换.代码基本由以下几部分构成: 获取执行环境 读取数据源 定义基于数据的各种转换操作 定义计算结果的输出位置 触发程序执行 执行环境 创建执行环境 ...

最新文章

  1. C++知识点总结(纯C++!!)
  2. iOS 10、Xcode 8 遇到部分问题解决记录(包括控制台日志不输出)
  3. 虚拟内存——Windows核心编程学习手札之十四
  4. 高一数学集合知识点整理_高一数学 | 高一数学函数图像知识点总结,实用!
  5. 搭建Jenkins+Sonarqub+Mysql+Android(上篇)
  6. Shell 基础介绍 [1]
  7. 剑指offer58 二叉树的下一个结点
  8. 静音抑制_正在研究利润以抑制创新
  9. T430s BIOS白名单破解
  10. 详解傅里叶变换与拉普拉斯,Z变化的联系
  11. 数字经济是党和国家定下的重要发展战略
  12. I.MX RT1176笔记(3)-- 双核启动和通信 MU
  13. 喉咙肿痛症状似流感 常州一男子延误治疗险送命
  14. 讯飞语音转文字 PHP demo
  15. JS实现:哔哩哔哩2020校园招聘技术类笔试卷(二)
  16. JSON快速学习入门
  17. 前端性能优化,之还在为多种多样的知识点整理苦恼吗,进来看看吧。
  18. Lua判断一个对象是否为空,包含userdata
  19. 安卓免ROOT卸载预装应用程序简要流程
  20. 用python获取某年某月/(当前)的天数

热门文章

  1. 前端项目的总结——为什么要组件化?
  2. 【基于obs插件-4】-音频频谱
  3. ssis 转换中文字符乱码_SSIS软件包中的字符映射转换
  4. 每日一犬 · 布鲁克浣熊猎犬
  5. 首个搭载8MP摄像头的单SoC行泊一体方案来袭,已拿下多家车企定点
  6. Visual C++ 6.0 ( VC 6 )带 SP6 中英文双语版 下载
  7. XJTU_ 西安交通大学2020大学计算机作业-第六周
  8. windows平台下的oracle ORA-01031的解决方法
  9. docker部署环境
  10. 大漠Android模拟器中控,最新如意大漠多线程中控模板,适用于手游模拟器脚本...