Flink自带的Source源算子以及自定义数据源Source
文章目录
- Flink的DataStream API(基础篇)
- Source源算子
- 从集合中读取数据
- 从文件中读取数据
- 从Scoket中读取数据
- 从Kafka中读取数据
- 自定义Source
Flink的DataStream API(基础篇)
Flink程序主要是分为
Source -> Transform -> Sink
本篇文章主要介绍的是Flink的源算子
Source源算子
POJO类的定义:
POJO类定义为一个数据类型,Flink会把这样的类作为一个特殊的POJO数据类型,方便数据的解析和序列化
POJO的规范:
- 是公有的
- 有一个无参的构造方法
- 所有的属性都是公有的
- 所有属性的数据类型都是可以序列化的
从集合中读取数据
package com.dcit.chacpter01;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;public class StreamSource {public static void main(String[] args) throws Exception {/*** 从集合中读取数据*/// 1.创建Flink运行时环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Event> source = new ArrayList<>();// Event是自定义的POJO数据类 url user timestampsource.add(new Event("./home","chaochao",1000L));source.add(new Event("./home","lisi",2000L));// 从集合中直接读取数据DataStreamSource<Event> eventDataStreamSource = env.fromCollection(source);// 打印eventDataStreamSource.print();// 执行env.execute();}
}
从文件中读取数据
env.readTextFile(“xxxx.txt”)
从Scoket中读取数据
DataStream<String> stream = env.socketTextStream("hadoop102",8888);
从Kafka中读取数据
想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官
方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本
只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依
赖版本。这里我们需要导入的依赖如下。
需要添加依赖
然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。
pom.xml文件
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> DS = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
创建 FlinkKafkaConsumer 时需要传入三个参数:
- 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic
列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据
时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
- 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中
使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数
组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是
公共接口,所以我们也可以自定义反序列化逻辑。
- 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
自定义Source
env.addSource(new SourceFunction())
定义一个ClickSource 实现SourceFunction接口 实现run方法和canel方法
然后在main方法里的addSource(new CliceSource()) 将ClicksSouce传入即可
package com.dcit.chacpter01;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import sun.security.util.Length;import java.util.Calendar;
import java.util.Random;public class ClickSource implements SourceFunction<Event> {public volatile boolean running = true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {//随机数据Random random = new Random();//定义选取的数据String [] users = {"Mary","Alice","Bob","Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};while (running){Thread.sleep(2000);long time = Calendar.getInstance().getTimeInMillis();sourceContext.collect(new Event(urls[random.nextInt(urls.length)],users[random.nextInt(users.length)],time));}}@Overridepublic void cancel() {running = false;}
}
DataStreamSource<Event> ds = env.addSource(new ClickSource());
注意:实现的SourceFunction这个接口是不能设置并行度的
如果需要调整并行度那么要继承 ParallelSourceFunction 接口 里面的重写方法和之前那个一样
Flink自带的Source源算子以及自定义数据源Source相关推荐
- Flink流处理Demo(含源码)
Flink流处理的Source 基于集合 基于文件 基于Socket 自定义数据源 使用Kafka作为数据源 使用MySql作为数据源 Flink流处理的Transformation keyby co ...
- DataStream API及源算子
一个Flink程序,其实就是对DataStream的各种转换.具体来说,代码基本上都由以下几部分构成 获取执行环境(execution environment) 读取数据源(source) 定义基于数 ...
- 源码面前没有秘密,推荐 9 个带你阅读源码的开源项目
在文章开始之前,请各位先回忆下在日常开发过程中,都使用或依赖了哪些开源项目?是不是发现,开源项目已经完全融入到日常开发! 如今大多数的程序员技术栈和工具箱里,或多或少都有开源项目的身影:大到操作系统. ...
- 《从0到1学习Flink》—— 如何自定义 Data Source ?
前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...
- 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink
1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...
- 直播带货app源码,进行直播平台的环境部署
直播项目环境部署 最近总是接到直播带货app源码的开发,在环境部署的过程中踩了不少坑.现在我将环境部署的完整教程分享给大家. 一 .搭建前期准备 注:操作系统centos7.0以上 64位,直播带货a ...
- 【转】Android事件分发机制完全解析,带你从源码的角度彻底理解(下)
转载请注明出处:http://blog.csdn.net/guolin_blog/article/details/9153761 记得在前面的文章中,我带大家一起从源码的角度分析了Android中Vi ...
- Android -- 带你从源码角度领悟Dagger2入门到放弃(一)
1,以前的博客也写了两篇关于Dagger2,但是感觉自己使用的时候还是云里雾里的,更不谈各位来看博客的同学了,所以今天打算和大家再一次的入坑试试,最后一次了,保证最后一次了. 2,接入项目 在项目的G ...
- [学习总结]7、Android AsyncTask完全解析,带你从源码的角度彻底理解
我们都知道,Android UI是线程不安全的,如果想要在子线程里进行UI操作,就需要借助Android的异步消息处理机制.之前我也写过了一篇文章从源码层面分析了Android的异步消息处理机制,感兴 ...
最新文章
- C++ Public, Protected, Private
- Spark ML - 协同过滤
- 如何保证 HBase 服务的高可用?看看这份 HBase 可用性分析与高可用实践吧!
- DreamFactory 第7章 限制和记录API请求
- 单片机和微型计算机硬件组成的异同,嵌入式和单片机的区别是什么?两者有什么联系...
- 嵌入式操作系统内核原理和开发(通用优先级调度)
- Java 如何抛出异常、自定义异常、手动或主动抛出异常
- 聚类算法实践——层次、K-means聚类
- Axure RP8下载以及注册
- 行内元素之间产生的间隙
- 什么编程语言的开发者平均年薪高达94万?
- 在EXCEL表格中快速自动求和
- python3__机器学习__神经网络基础算法__偏执项b
- 苹果电脑安装双系统Mac和Win7,详细教程
- Reincarnation
- vue 调起浏览器打印
- 商品详情页实现价格区间价
- 论文笔记:PSGAN
- 分享一份软件测试项目(Python项目)
- 【decode()】
热门文章
- 【前端】【学习】HTML+CSS的W3Cschool网站的实战知识点
- 计算机设备管理器老是闪烁,电脑经常闪屏的原因和解决方法
- 开源小游戏app源码和H5小游戏源码大全
- c++疑难杂症(未解决),高手路过瞧瞧
- 50个面试常见问题技巧回答
- Python3.8.0语法汉化规范思路1.0版
- 【IEEE论文投稿word中双栏情况下插入单栏效果图片】
- 2017-10-16 集训总结
- c++实现编译原理词法分析实验(含代码)
- Visual Studio Code 运行命令行,无法加载文件 C:\**.ps1,因为在此系统上禁止运行脚本的解决方法