Flink Source

进入flink的数据源大致分为以下几类:

  1. 集合 Collection
  2. 文件 File
  3. Kafka
  4. UDF

一般都是使用前三个source源即可,如果想要使用其他数据源就可以自定义数据源即UDF。

Collection

public static void main(String[] args) throws Exception {//指定运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度env.setParallelism(1);
//1.使用fromCollection函数DataStreamSource<Integer> inputStream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));
//2.使用fromElements函数DataStreamSource<Integer> inputStream1 = env.fromElements(1, 2, 3, 4, 5);
//打印输出inputStream.print();inputStream1.print();
//执行任务env.execute();}

File

 public static void main(String[] args) throws Exception {//获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置全局并行度env.setParallelism(1);//读取数据String s = "hello.text";DataStream<String> inputDataStream = env.readTextFile(s);//打印数据inputDataStream.print("out");//执行任务env.execute("jobname");}

Kafka

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//kafka连接参数配置Properties prop = new Properties();prop.put("bootstrap.servers","hadoop102:9092");//设置需要连接的主机prop.put("zookeeper.connect","hadoop102:2181");prop.put("group.id","first");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common,serialization.StringSerializer");prop.put("auto.offset.rest","latest");//通过addSource获取一般数据源//注意这里使用了kafka与flink的连接器,如果没有在pom.xml文件中依赖中添加:
//<dependency>
//    <groupId>org.apache.flink</groupId>
//    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
//    <version>${flink.version}</version>
//</dependency>DataStreamSource<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("first", new SimpleStringSchema(), prop));//注意获取的数据是String类型.inputStream.print();env.execute();}

UDF(自定义source源)

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//使用自定义source,需要实现一个SourceFunction,或者他的富函数RichSourceFunctionDataStreamSource<String> inputStream= env.addSource(new MysourceFunciton());inputStream.print();env.execute();}public static class MysourceFunciton extends RichSourceFunction<String> {private Integer start;@Overridepublic void open(Configuration parameters) throws Exception {start=1;}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while(start!=0){ctx.collect("hello wolrd");}}

Flink专题-Source相关推荐

  1. Flink的Source端和Sink端大全

    Flink和各种组件 enviroment Source flink + kafka (flink 消费 kafka 中的数据) Transform Transformation 的介绍 复杂的方法 ...

  2. Flink之Source

    Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理.一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source).所以,Source 就是我们整个处理程序的 ...

  3. 自定义flink es source

    1. 需求 增量导入elasticsearch的数据到kafka. 2. 解决方式 1) 自定义一个flume的essource 2)使用spark 的 es rdd 3) 自定义flink的es s ...

  4. Flink专题-BaseTransform

    Flink BaseTransform Map FlatMap Filter Map 特点:一对一,可以改变数据类型 一对一:一个数据只返回一个数据 eg:a->b 虽然数据不同了但是还是一个 ...

  5. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  6. flink source和sink

    flink中的source作为整个stream中的入口,而sink作为整个stream的终点. SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel ...

  7. 【flink】flink Source Coordinator Thread already exists driving actions Source Coordinator. Existing

    文章目录 1.概述 1.概述 一个flink程序,什么也没做 ,昨天晚上9点的时候 集群突然就挂了,我7点下班的时候 还好好的 然后报错:Source Coordinator Thread alrea ...

  8. 《从0到1学习Flink》—— 如何自定义 Data Source ?

    前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...

  9. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

最新文章

  1. sizeof()与strlen()的区别与联系
  2. Segment Routing — SRv6 — Overview
  3. mysql更新记录删除_mysql 插入-更新-删除
  4. Spring3 @ResponseBody注解引起头部Accept-Charset过大
  5. 数据挖掘之关联分析五(序列模式)
  6. 易到追债贾跃亭 乐视回应:对方无耻甩锅
  7. [Twisted] Protocols协议和Protocol Factories 协议工厂
  8. 避免在ASP.NET Core中使用服务定位器模式
  9. jQuery 左侧滑动
  10. C#实现关机的两种方法
  11. 计算机工程科学计算与仿真,BGPLUS实地科研 |中科院|计算机科学、计算机工程:计算机算法与数值建模实训...
  12. 姓名大全 导入mysql_My SQL常用操作汇总详解
  13. 成为UiBot Store推广员,解锁全新赚钱方式
  14. Axure RP9授权码适合3658版本,亲测可用
  15. 如何群发电子邮件?群发邮箱账号批发吗?
  16. 数字图像处理笔记(一)空间分辨率与灰度分辨率
  17. javascript如何监听 form.submit()事件
  18. 字节跳动-数据分析-实习面经
  19. Upload 上传:图片上传
  20. js 中的this指针

热门文章

  1. 4.extern关键字.rs
  2. 透过汇编另眼看世界之DLL导出函数调用
  3. 1_2 AbstractFactoryMode 抽象工厂模式
  4. 互斥体CMutex的使用
  5. Linux 多线程(二)线程安全:线程安全、互斥与互斥锁、死锁、同步与条件变量
  6. Python中re.sub()实现替换文本字符串
  7. Java程序员面试必备的一些流程图
  8. [五]RabbitMQ-客户端源码之AMQChannel
  9. ​冲刺最后一公里——音视频场景下的边缘计算实践
  10. Xilinx 拥抱“新基建” 发力大中华区核心市场