Flink专题-Source
Flink Source
进入flink的数据源大致分为以下几类:
- 集合 Collection
- 文件 File
- Kafka
- UDF
一般都是使用前三个source源即可,如果想要使用其他数据源就可以自定义数据源即UDF。
Collection
public static void main(String[] args) throws Exception {//指定运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度env.setParallelism(1);
//1.使用fromCollection函数DataStreamSource<Integer> inputStream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));
//2.使用fromElements函数DataStreamSource<Integer> inputStream1 = env.fromElements(1, 2, 3, 4, 5);
//打印输出inputStream.print();inputStream1.print();
//执行任务env.execute();}
File
public static void main(String[] args) throws Exception {//获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置全局并行度env.setParallelism(1);//读取数据String s = "hello.text";DataStream<String> inputDataStream = env.readTextFile(s);//打印数据inputDataStream.print("out");//执行任务env.execute("jobname");}
Kafka
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//kafka连接参数配置Properties prop = new Properties();prop.put("bootstrap.servers","hadoop102:9092");//设置需要连接的主机prop.put("zookeeper.connect","hadoop102:2181");prop.put("group.id","first");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common,serialization.StringSerializer");prop.put("auto.offset.rest","latest");//通过addSource获取一般数据源//注意这里使用了kafka与flink的连接器,如果没有在pom.xml文件中依赖中添加:
//<dependency>
// <groupId>org.apache.flink</groupId>
// <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
// <version>${flink.version}</version>
//</dependency>DataStreamSource<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("first", new SimpleStringSchema(), prop));//注意获取的数据是String类型.inputStream.print();env.execute();}
UDF(自定义source源)
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//使用自定义source,需要实现一个SourceFunction,或者他的富函数RichSourceFunctionDataStreamSource<String> inputStream= env.addSource(new MysourceFunciton());inputStream.print();env.execute();}public static class MysourceFunciton extends RichSourceFunction<String> {private Integer start;@Overridepublic void open(Configuration parameters) throws Exception {start=1;}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while(start!=0){ctx.collect("hello wolrd");}}
Flink专题-Source相关推荐
- Flink的Source端和Sink端大全
Flink和各种组件 enviroment Source flink + kafka (flink 消费 kafka 中的数据) Transform Transformation 的介绍 复杂的方法 ...
- Flink之Source
Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理.一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source).所以,Source 就是我们整个处理程序的 ...
- 自定义flink es source
1. 需求 增量导入elasticsearch的数据到kafka. 2. 解决方式 1) 自定义一个flume的essource 2)使用spark 的 es rdd 3) 自定义flink的es s ...
- Flink专题-BaseTransform
Flink BaseTransform Map FlatMap Filter Map 特点:一对一,可以改变数据类型 一对一:一个数据只返回一个数据 eg:a->b 虽然数据不同了但是还是一个 ...
- Flink专题四:Flink DataStream 窗口介绍及使用
由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...
- flink source和sink
flink中的source作为整个stream中的入口,而sink作为整个stream的终点. SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel ...
- 【flink】flink Source Coordinator Thread already exists driving actions Source Coordinator. Existing
文章目录 1.概述 1.概述 一个flink程序,什么也没做 ,昨天晚上9点的时候 集群突然就挂了,我7点下班的时候 还好好的 然后报错:Source Coordinator Thread alrea ...
- 《从0到1学习Flink》—— 如何自定义 Data Source ?
前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...
- 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink
1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...
最新文章
- sizeof()与strlen()的区别与联系
- Segment Routing — SRv6 — Overview
- mysql更新记录删除_mysql 插入-更新-删除
- Spring3 @ResponseBody注解引起头部Accept-Charset过大
- 数据挖掘之关联分析五(序列模式)
- 易到追债贾跃亭 乐视回应:对方无耻甩锅
- [Twisted] Protocols协议和Protocol Factories 协议工厂
- 避免在ASP.NET Core中使用服务定位器模式
- jQuery 左侧滑动
- C#实现关机的两种方法
- 计算机工程科学计算与仿真,BGPLUS实地科研 |中科院|计算机科学、计算机工程:计算机算法与数值建模实训...
- 姓名大全 导入mysql_My SQL常用操作汇总详解
- 成为UiBot Store推广员,解锁全新赚钱方式
- Axure RP9授权码适合3658版本,亲测可用
- 如何群发电子邮件?群发邮箱账号批发吗?
- 数字图像处理笔记(一)空间分辨率与灰度分辨率
- javascript如何监听 form.submit()事件
- 字节跳动-数据分析-实习面经
- Upload 上传:图片上传
- js 中的this指针
热门文章
- 4.extern关键字.rs
- 透过汇编另眼看世界之DLL导出函数调用
- 1_2 AbstractFactoryMode 抽象工厂模式
- 互斥体CMutex的使用
- Linux 多线程(二)线程安全:线程安全、互斥与互斥锁、死锁、同步与条件变量
- Python中re.sub()实现替换文本字符串
- Java程序员面试必备的一些流程图
- [五]RabbitMQ-客户端源码之AMQChannel
- ​冲刺最后一公里——音视频场景下的边缘计算实践
- Xilinx 拥抱“新基建” 发力大中华区核心市场