从集合构建

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object Sensor extends App {//创建环境private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.api.scala._//从集合构建DataStreamprivate val stream: DataStream[SensorReading] = env.fromCollection(List(SensorReading("sensor1", 1547718199, 38.222),SensorReading("sensor2", 1547718199, 40.222),SensorReading("sensor3", 1747718199, 56.222)))//输出DataStreamstream.print().setParallelism(1)//执行env.execute()
}case class SensorReading(id:String,timestamp:Long,temperature:Double)

从文件读取数据

val value1: DataStream[String] = env.readTextFile("")

从socket读取

 val textDstream: DataStream[String] = env.socketTextStream(host, port)

从kafka读取

首先添加pom依赖,其中link-connector-kafka这个依赖就是用来搞定kafka的,注意和flink版本要一样!

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version></dependency></dependencies>

测试代码

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import java.util.Propertiesobject KafkaSource extends App {val properties = new Properties()properties.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//创建kafka消费者,消费cat topicprivate val consumer = new FlinkKafkaConsumer011[String]("cat", new SimpleStringSchema(), properties)import org.apache.flink.api.scala._private val stream: DataStream[String] = env.addSource(consumer)stream.print().setParallelism(1)//执行env.execute()
}

Flink+kafka是如何实现exactly-once语义的?

Flink通过checkpoint来保存数据是否处理完成的状态。

JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。

总结

  • flink可以接受各种常见source,比如文件,集合,kafka,socket等等

Flink API之Source入门相关推荐

  1. flink API之Sink入门

    kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...

  2. flink Table API 与SQL入门实战

    流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...

  3. Flink 教程 gitbook 从入门到入土(详细教程)

    Flink从入门到入土(详细教程) 和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分 1.Environment Flink J ...

  4. 【Flink】Flink SQL 自定义 Source format

    1.概述 转载:Flink SQL 自定义 Source format 1.背景 由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format. 2.步骤 1.自定义 ...

  5. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

  6. 初识Django —Python API接口编程入门

    初识Django -Python API接口编程入门 一.WEB架构的简单介绍 Django是什么? Django是一个开放源代码的Web应用框架,由Python写成.我们的目标是用Python语言, ...

  7. Cannot resolve method ‘getTableEnvironment(org.apache.flink.api.java.ExecutionEnvironment)‘

    代码如下: public class UDTF {public static void main(String[] args) throws Exception{ExecutionEnvironmen ...

  8. NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo

    完整报错如下: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/sc ...

  9. cannot find or load main class org.apache.flink.api.scala.FlinkShell

    报错复现如下: (Python3.6) appleyuchi@Desktop:bin$ ./start-scala-shell.sh 錯誤: 找不到或無法載入主要類別 org.apache.flink ...

最新文章

  1. pandas使用groupby函数计算dataframe数据中每个分组的滚动统计值(rolling statistics)的语法:例如分组的N天滚动平均值、滚动中位数、滚动最大最小值、滚动加和等
  2. OpenGL 字体颜色问题
  3. AVR 又一个网址推荐
  4. Otsu algorithm
  5. leetcode 95. Unique Binary Search Trees II | 96. Unique Binary Search Trees
  6. 最全三大框架整合(使用映射)——applicationContext.xml里面的配置
  7. WinCE5.0 SMDK2410 BSP在GEC2410开发板上的移植(11)-BINFS在Nand上的实现(Multi-Bin的实现)
  8. EASYUI- EASYUI左移右移 GRID中值
  9. 泰安本地话听不懂,为何后来能勉强听懂?
  10. vi/vim命令使用
  11. java c 语言之父_Java之父评价C语言之父:我用尽了形容词
  12. 火车进出栈【卡特兰数】【高精度】【压位】【压int位】
  13. 仿牛客社区项目3.2——发布帖子(异步通信技术AJAX)
  14. Python在数字后端中的应用(一)
  15. c1TrueDBGrid在C#中的研究
  16. 【MPPT】基于MPPT的风力发电系统simulink仿真
  17. react---收藏的点击和取消(刷新还会存在)--demo
  18. Git修改文件大小写的修改
  19. 百度灵医智惠渗透基层,以AI赋能中国医疗
  20. cesium中缓冲区分析

热门文章

  1. python基础系统性学习
  2. 主板开启网络唤醒(Wake on lan)
  3. 切换账号_微软 Edge 更新:自动切换工作 / 生活账号,移动端上线集锦功能
  4. DecimalFormat的使用
  5. java 不刷新页面_java – 更新jsp页面的内容而不刷新
  6. html布局文字设置,div css布局中css中文字体设置
  7. 驱动级的自动按键_太牛了!业余单片机爱好者用DS3231制作自动亮度的4位数码管时钟...
  8. select函数fdwrite用法_通俗易懂的学会:SQL窗口函数
  9. abaqus生成adams柔性体_Abaqus和STAR-CCM+流固耦合
  10. nessus8.9.0百度网盘_免费百度网盘SVIP共享20.1.19