数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFile(path) readFile(),当然,也可以写一个自定义的数据源(可以通过实现SourceFunction方法,但是无法并行执行。或者实现可以并行实现的接口ParallelSourceFunction或者继承RichParallelSourceFunction)

入门

首先做一个简单入门,建立一个DataStreamSourceApp

Scala

object DataStreamSourceApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentsocketFunction(env)env.execute("DataStreamSourceApp")}def socketFunction(env: StreamExecutionEnvironment): Unit = {val data=env.socketTextStream("192.168.152.45", 9999)data.print()}
}

这个方法将会从socket中读取数据,因此我们需要在192.168.152.45中开启服务:

nc -lk 9999

然后运行DataStreamSourceApp,在服务器上输入:

iie4bu@swarm-manager:~$ nc -lk 9999
apache
flink
spark

在控制台中也会输出:

3> apache
4> flink
1> spark

前面的 341表示的是并行度。可以通过设置setParallelism来操作:

data.print().setParallelism(1)

Java

public class JavaDataStreamSourceApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();socketFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void socketFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource<String> data = executionEnvironment.socketTextStream("192.168.152.45", 9999);data.print().setParallelism(1);}
}

自定义添加数据源方式

Scala

实现SourceFunction接口

这种方式不能并行处理。

新建一个自定义数据源

class CustomNonParallelSourceFunction extends SourceFunction[Long]{var count=1Lvar isRunning = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel(): Unit = {isRunning = false}
}

这个方法首先定义一个初始值count=1L,然后执行的run方法,方法主要是输出count,并且执行加一操作,当执行cancel方法时结束。调用方法如下:

  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    socketFunction(env)nonParallelSourceFunction(env)env.execute("DataStreamSourceApp")}def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction())data.print()}

输出结果就是控制台一直输出count值。

无法设置并行度,除非设置并行度是1.

val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)

那么控制台报错:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel sourceat org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16)at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11)at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)

继承ParallelSourceFunction方法

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{var isRunning = truevar count = 1Loverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel(): Unit = {isRunning=false}
}

方法的功能跟上面是一样的。main方法如下:

  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    socketFunction(env)
//    nonParallelSourceFunction(env)parallelSourceFunction(env)env.execute("DataStreamSourceApp")}def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3)data.print()}

可以设置并行度3,输出结果如下:

2> 1
1> 1
2> 1
2> 2
3> 2
3> 2
3> 3
4> 3
4> 3

继承RichParallelSourceFunction方法

class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {var isRunning = truevar count = 1Loverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(count)count += 1Thread.sleep(1000)}}override def cancel(): Unit = {isRunning = false}
}
  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    socketFunction(env)//    nonParallelSourceFunction(env)
//    parallelSourceFunction(env)richParallelSourceFunction(env)env.execute("DataStreamSourceApp")}def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3)data.print()}

Java

实现SourceFunction接口

import org.apache.flink.streaming.api.functions.source.SourceFunction;public class JavaCustomNonParallelSourceFunction implements SourceFunction<Long> {boolean isRunning = true;long count = 1;@Overridepublic void run(SourceFunction.SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(count);count+=1;Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning=false;}
}
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);nonParallelSourceFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction());data.print().setParallelism(1);}

当设置并行度时:

        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);

那么报错异常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel sourceat org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16)at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)

实现ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;public class JavaCustomParallelSourceFunction implements ParallelSourceFunction<Long> {boolean isRunning = true;long count = 1;@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(count);count+=1;Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning=false;}
}
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
//        nonParallelSourceFunction(environment);parallelSourceFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2);data.print().setParallelism(1);}

可以设置并行度,输出结果:

1
1
2
2
3
3
4
4
5
5

继承抽象类RichParallelSourceFunction

public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction<Long> {boolean isRunning = true;long count = 1;@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(count);count+=1;Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning=false;}
}
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
//        nonParallelSourceFunction(environment);
//        parallelSourceFunction(environment);richpParallelSourceFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2);data.print().setParallelism(1);}

输出结果:

1
1
2
2
3
3
4
4
5
5
6
6

SourceFunction  ParallelSourceFunction  RichParallelSourceFunction类之间的关系

Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)相关推荐

  1. Apache Flink 零基础入门(五)Flink开发实时处理应用程序

    使用Flink + java实现需求 环境 JDK:1.8 Maven:3.6.1(最低Maven 3.0.4) 使用上一节中的springboot-flink-train项目 开发步骤 第一步:创建 ...

  2. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  3. sql数据库教程百度云_绘画自学零基础入门教程|五天学会绘画pdf百度云下载!...

    绘画自学零基础入门教程|五天学会绘画pdf百度云下载!画画是可以让人留存记忆的事情.我自己就很喜欢画画来记录生活中一些特别的日子.场景还有我的家人朋友们.有时候,比照片更有故事感和纪念意义-有空拿出来 ...

  4. 【学习记录】Python零基础入门(五)

    第五章 Python零基础入门之条件.循环及其他语句 本人自学的主要教材为Magnus Lie Hetland的Beginning Python:From Novice to Professional ...

  5. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

  6. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  7. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  8. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  9. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

最新文章

  1. 华为正式发布自有操作系统鸿蒙OS
  2. 算法经典书籍--计算机算法的设计与分析
  3. threejs添加立方体_前端图形学(三十)——从源码去看threejs中的光照模型
  4. MySQL主从失败 错误Got fatal error 1236解决方法
  5. Reverse Polish Notation
  6. mysql服务的启动和停止 net stop mysql net start mysql
  7. (转)IOS中获取各种文件的目录路径的方法
  8. 第八节: Quartz.Net五大构件之SimpleThreadPool及其四种配置方案
  9. 软件质量管理之困境与对策思考
  10. GitHub in vs2010、vs2013
  11. 【排序算法】堆排序——常规方法
  12. android ppsspp 存档位置,小鸡模拟器游戏存档在哪个文件夹
  13. 开源软件之lftp的使用
  14. Word文档快速翻译的方法,一分钟学会
  15. 数据库笔试——查出各部门超出部门平均薪资的员工的姓名,薪资,所在部门名称及部门平均薪水...
  16. 机器人,给我来一瓶82年的农夫山泉
  17. java Date.getDay()
  18. 【CSA STAR 对标分析】BCR-业务连续性管理与操作恢复
  19. Python基础学习1
  20. 笨方法学python 41:来自 Percal 25 号行星的哥顿人 (Gothons)

热门文章

  1. RabbitMQ中的消息确认ACK机制
  2. mysql获取当前时间,前一天,后一天(执行效率比较高)
  3. MySql数据库连接池
  4. centos nfs java_CentOS下安装配置NFS并通过Java进行文件上传下载
  5. java http get json_java实现Http post(参数json格式)、get 请求
  6. linux 条件变量函数,Linux线程同步之条件变量
  7. div置于页面底部_网易内部PPT模板有点丑,如何花最少的时间提高页面颜值?
  8. 您拒绝了位置共享服务器,共享服务器拒绝访问
  9. mysql blob 导出_mysql blob导出文本解密 | 学步园
  10. 【阿里云MPS】Demo