文章目录

  • Flink的DataStream API(基础篇)
    • Source源算子
      • 从集合中读取数据
      • 从文件中读取数据
      • 从Scoket中读取数据
      • 从Kafka中读取数据
      • 自定义Source

Flink的DataStream API(基础篇)

Flink程序主要是分为
Source -> Transform -> Sink
本篇文章主要介绍的是Flink的源算子

Source源算子

POJO类的定义:

POJO类定义为一个数据类型,Flink会把这样的类作为一个特殊的POJO数据类型,方便数据的解析和序列化

POJO的规范:

  • 是公有的
  • 有一个无参的构造方法
  • 所有的属性都是公有的
  • 所有属性的数据类型都是可以序列化的

从集合中读取数据

package com.dcit.chacpter01;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;public class StreamSource {public static void main(String[] args) throws Exception {/*** 从集合中读取数据*/// 1.创建Flink运行时环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Event> source = new ArrayList<>();// Event是自定义的POJO数据类    url  user timestampsource.add(new Event("./home","chaochao",1000L));source.add(new Event("./home","lisi",2000L));// 从集合中直接读取数据DataStreamSource<Event> eventDataStreamSource = env.fromCollection(source);// 打印eventDataStreamSource.print();// 执行env.execute();}
}

从文件中读取数据

env.readTextFile(“xxxx.txt”)

从Scoket中读取数据

DataStream<String> stream = env.socketTextStream("hadoop102",8888);

从Kafka中读取数据

想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官

方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本

只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依

赖版本。这里我们需要导入的依赖如下。

需要添加依赖

然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

pom.xml文件

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
        Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> DS = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic

列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据

时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。

  • 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消

息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中

使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数

组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是

公共接口,所以我们也可以自定义反序列化逻辑。

  • 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。

自定义Source

env.addSource(new SourceFunction())

定义一个ClickSource 实现SourceFunction接口 实现run方法和canel方法

然后在main方法里的addSource(new CliceSource()) 将ClicksSouce传入即可

package com.dcit.chacpter01;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import sun.security.util.Length;import java.util.Calendar;
import java.util.Random;public class ClickSource implements SourceFunction<Event> {public volatile boolean running = true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {//随机数据Random random = new Random();//定义选取的数据String [] users = {"Mary","Alice","Bob","Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};while (running){Thread.sleep(2000);long time = Calendar.getInstance().getTimeInMillis();sourceContext.collect(new Event(urls[random.nextInt(urls.length)],users[random.nextInt(users.length)],time));}}@Overridepublic void cancel() {running = false;}
}
    DataStreamSource<Event> ds = env.addSource(new ClickSource());

注意:实现的SourceFunction这个接口是不能设置并行度的

​ 如果需要调整并行度那么要继承 ParallelSourceFunction 接口 里面的重写方法和之前那个一样

Flink自带的Source源算子以及自定义数据源Source相关推荐

  1. Flink流处理Demo(含源码)

    Flink流处理的Source 基于集合 基于文件 基于Socket 自定义数据源 使用Kafka作为数据源 使用MySql作为数据源 Flink流处理的Transformation keyby co ...

  2. DataStream API及源算子

    一个Flink程序,其实就是对DataStream的各种转换.具体来说,代码基本上都由以下几部分构成 获取执行环境(execution environment) 读取数据源(source) 定义基于数 ...

  3. 源码面前没有秘密,推荐 9 个带你阅读源码的开源项目

    在文章开始之前,请各位先回忆下在日常开发过程中,都使用或依赖了哪些开源项目?是不是发现,开源项目已经完全融入到日常开发! 如今大多数的程序员技术栈和工具箱里,或多或少都有开源项目的身影:大到操作系统. ...

  4. 《从0到1学习Flink》—— 如何自定义 Data Source ?

    前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...

  5. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

  6. 直播带货app源码,进行直播平台的环境部署

    直播项目环境部署 最近总是接到直播带货app源码的开发,在环境部署的过程中踩了不少坑.现在我将环境部署的完整教程分享给大家. 一 .搭建前期准备 注:操作系统centos7.0以上 64位,直播带货a ...

  7. 【转】Android事件分发机制完全解析,带你从源码的角度彻底理解(下)

    转载请注明出处:http://blog.csdn.net/guolin_blog/article/details/9153761 记得在前面的文章中,我带大家一起从源码的角度分析了Android中Vi ...

  8. Android -- 带你从源码角度领悟Dagger2入门到放弃(一)

    1,以前的博客也写了两篇关于Dagger2,但是感觉自己使用的时候还是云里雾里的,更不谈各位来看博客的同学了,所以今天打算和大家再一次的入坑试试,最后一次了,保证最后一次了. 2,接入项目 在项目的G ...

  9. [学习总结]7、Android AsyncTask完全解析,带你从源码的角度彻底理解

    我们都知道,Android UI是线程不安全的,如果想要在子线程里进行UI操作,就需要借助Android的异步消息处理机制.之前我也写过了一篇文章从源码层面分析了Android的异步消息处理机制,感兴 ...

最新文章

  1. C++ Public, Protected, Private
  2. Spark ML - 协同过滤
  3. 如何保证 HBase 服务的高可用?看看这份 HBase 可用性分析与高可用实践吧!
  4. DreamFactory 第7章 限制和记录API请求
  5. 单片机和微型计算机硬件组成的异同,嵌入式和单片机的区别是什么?两者有什么联系...
  6. 嵌入式操作系统内核原理和开发(通用优先级调度)
  7. Java 如何抛出异常、自定义异常、手动或主动抛出异常
  8. 聚类算法实践——层次、K-means聚类
  9. Axure RP8下载以及注册
  10. 行内元素之间产生的间隙
  11. 什么编程语言的开发者平均年薪高达94万?
  12. 在EXCEL表格中快速自动求和
  13. python3__机器学习__神经网络基础算法__偏执项b
  14. 苹果电脑安装双系统Mac和Win7,详细教程
  15. Reincarnation
  16. vue 调起浏览器打印
  17. 商品详情页实现价格区间价
  18. 论文笔记:PSGAN
  19. 分享一份软件测试项目(Python项目)
  20. 【decode()】

热门文章

  1. 【前端】【学习】HTML+CSS的W3Cschool网站的实战知识点
  2. 计算机设备管理器老是闪烁,电脑经常闪屏的原因和解决方法
  3. 开源小游戏app源码和H5小游戏源码大全
  4. c++疑难杂症(未解决),高手路过瞧瞧
  5. 50个面试常见问题技巧回答
  6. Python3.8.0语法汉化规范思路1.0版
  7. 【IEEE论文投稿word中双栏情况下插入单栏效果图片】
  8. 2017-10-16 集训总结
  9. c++实现编译原理词法分析实验(含代码)
  10. Visual Studio Code 运行命令行,无法加载文件 C:\**.ps1,因为在此系统上禁止运行脚本的解决方法