【README】

本文记录了flink读取不同数据源的编码方式,数据源包括;

  • 集合(元素列表);
  • 文件
  • kafka;
  • 自定义数据源;

本文使用的flink为 1.14.4 版本;maven依赖如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>

【1】从集合读取数据

【1.1】代码

/*** @Description flink从集合读取数据 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SourceTest1_Collection {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从集合读取数据DataStream<SensorReading> sensorStream = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 12341561L, 36.1), new SensorReading("sensor_2", 12341562L, 33.5), new SensorReading("sensor_3", 12341563L, 39.9), new SensorReading("sensor_4", 12341564L, 31.2)));// 打印输出sensorStream.print("sensor");// 从元素列表读取数据DataStream<Integer> intStream = env.fromElements(1, 2, 3, 7, 8, 2, 100, 34, 3);intStream.print("intStream");// 执行env.execute("sensorJob");}
}
/*** @Description 传感器温度读数* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SensorReading {private String id;private Long timestamp;private double temperature;public SensorReading() {}public SensorReading(String id, Long timestamp, double temperature) {this.id = id;this.timestamp = timestamp;this.temperature = temperature;}

打印结果:

intStream:6> 8
intStream:5> 7
intStream:7> 2
sensor:8> SensorReading{id='sensor_2', timestamp=12341562, temperature=33.5}
intStream:1> 34
sensor:1> SensorReading{id='sensor_3', timestamp=12341563, temperature=39.9}
intStream:3> 2
intStream:4> 3
intStream:2> 1
intStream:2> 3
sensor:7> SensorReading{id='sensor_1', timestamp=12341561, temperature=36.1}
intStream:8> 100
sensor:2> SensorReading{id='sensor_4', timestamp=12341564, temperature=31.2}


【2】 从文件读取数据

【2.1】代码

/*** @Description flink从文件读取数据* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SourceTest2_File {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置全局并行度为1// 从文件读取数据DataStream<String> fileStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 打印输出fileStream.print("sensor");// 执行env.execute("sensorJob");}
}

sensor.txt 如下:

sensor_1,12341561,36.1
sensor_2,12341562,33.5
sensor_3,12341563,39.9
sensor_4,12341564,31.2

打印结果:

sensor> sensor_1,12341561,36.1
sensor> sensor_2,12341562,33.5
sensor> sensor_3,12341563,39.9
sensor> sensor_4,12341564,31.2


【3】从kafka读取数据

1)引入maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.4</version></dependency>

2)flink作为消费者连接到kafka

/*** @Description flink从kafka读取数据* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/
public class SourceTest3_kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 设置全局并行度为1// 创建flink连接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// 打印输出kafkaStream.print("kafkaStream");// 执行env.execute("kafkaStreamJob");}
}
public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;}
}

3)打开 kafka生产者命令行:

kafka-console-producer.sh --broker-list centos201:9092,centos202:9092,centos203:9092 --topic hello0415

补充: 关于kafka集群,可以参见我的文章 :

kafka集群搭建_PacosonSWJTU的博客-CSDN博客


【4】自定义数据源

自定义数据源,可以用于自测 flinkjob 的场景中;

public class SourceTest4_UDF {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度为1// 创建自定义数据源DataStream<SensorReading> udfStream = env.addSource(new SourceFunction<SensorReading>() {int i = 1;int mod = 1000;Random random = new Random();boolean runnable = true;@Overridepublic void run(SourceContext<SensorReading> sourceContext) throws Exception {while (runnable) {sourceContext.collect(new SensorReading(String.valueOf(i++ % mod + 1), System.currentTimeMillis(), 30 + random.nextGaussian()));if (i % 5 == 0) TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {runnable = false;}});// 打印输出udfStream.print("udfStream");// 执行env.execute("udfStreamJob");}
}

打印结果:

udfStream:4> SensorReading{id='5', timestamp=1650030559865, temperature=31.015354380481117}
udfStream:1> SensorReading{id='2', timestamp=1650030559853, temperature=29.23797321841027}
udfStream:3> SensorReading{id='4', timestamp=1650030559865, temperature=31.148402161461384}
udfStream:2> SensorReading{id='3', timestamp=1650030559865, temperature=30.082462570224305}

【1】flink-source读取数据相关推荐

  1. Flink / Scala 实战 - 6.使用 Jedis、JedisPool 作为 Source 读取数据

    一.引言 现在有一批数据写入多台 Redis 相同 key 的队列中,需要消费 Redis 队列作为 Flink Source,为了提高可用性,下面基于 JedisPool 进行队列的消费.队列数据示 ...

  2. Flink 分别读取kafka和mysql作为source

    需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...

  3. 一、flink基础之数据读取

    Flink读取文件的几种方式 1.从文本文件中读取数据 2.从容器中读取数据 3.从流处理组件中读取数据 4.自定义源读取数据 1.从文本文件中读取数据 我们尝试读取一份用户访问网址的数据: 用户名 ...

  4. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  5. datahub数据source读取问题

    datahub读取数据的位置 datahub全部启动正常后有9个项目,在datahub-actions中是做数据读取等一系列操作的 在读取数据时候这个镜像内部有python3.9.9所以个人建议在使用 ...

  6. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  7. 中读取数据_Flink入门实战 (中)

    一.Flink 流处理 API 1.Environment getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文. 如果程序是独立调用的,则 此方法返回本地执行环 ...

  8. 【Flink】使用Flink实现索引数据到Elasticsearch

    1.概述 转载:使用Flink实现索引数据到Elasticsearch 建议看原文 使用Flink处理数据时,可以基于Flink提供的批式处理(Batch Processing)和流式处理(Strea ...

  9. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  10. Flink如何保证数据的一致性

    当在分布式系统中引入状态时,自然也引入了一致性问题.一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到 ...

最新文章

  1. 自然语言处理-Word2Vec
  2. 手把手教你使用spring cloud+dotnet core搭建微服务架构:服务治理(-)
  3. 打造属于自己的Vim神器
  4. spring+springmvc+mybatis实现图书管理系统_Spring、SpringMVC、Mybatis自学视频分享
  5. 03-28 弱网测试
  6. 互联网运营数据分析(2):转化分析
  7. [JLOI 2016]成绩比较
  8. 用Python筛选国考职位表
  9. html实现点击图片放大功能
  10. python 文件夹操作_Python之路(第九篇)Python文件操作
  11. 2018 中国Linux内核开发者大会
  12. 题目连接:http://acm.zznu.edu.cn/problem.php?id=1329
  13. 快速了解vue前端框架
  14. 2023年全国最新会计专业技术资格精选真题及答案9
  15. 学习Android笔记
  16. Axure RP9——【图片放大预览效果】
  17. 如何彻底禁用 werfalut.exe
  18. FusionCharts Demo
  19. 【博学谷学习记录】超强总结,用心分享 | Java入门级基础概述
  20. 2万字系统总结,实现Linux命令自由

热门文章

  1. CF 1635E Cars 二分图 + 拓扑
  2. Meeting HDU - 5521
  3. P7988-[USACO21DEC] HILO G【set,线段树】
  4. [2020.11.27NOIP模拟赛]拼图王【dp】
  5. P2679-子串【dp】
  6. vijos1056-图形面积【离散化】
  7. 【主席树】更为厉害(P3899)
  8. 【数位DP】B-number(HDU 3652)
  9. 2017 ACM Jordanian Collegiate J.Efficiency Test 动态规划、类倍增
  10. Spring Data之MongoDB配置