Flink API之Source入门
从集合构建
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object Sensor extends App {//创建环境private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.api.scala._//从集合构建DataStreamprivate val stream: DataStream[SensorReading] = env.fromCollection(List(SensorReading("sensor1", 1547718199, 38.222),SensorReading("sensor2", 1547718199, 40.222),SensorReading("sensor3", 1747718199, 56.222)))//输出DataStreamstream.print().setParallelism(1)//执行env.execute()
}case class SensorReading(id:String,timestamp:Long,temperature:Double)
从文件读取数据
val value1: DataStream[String] = env.readTextFile("")
从socket读取
val textDstream: DataStream[String] = env.socketTextStream(host, port)
从kafka读取
首先添加pom
依赖,其中link-connector-kafka
这个依赖就是用来搞定kafka
的,注意和flink
版本要一样!
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version></dependency></dependencies>
测试代码
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import java.util.Propertiesobject KafkaSource extends App {val properties = new Properties()properties.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//创建kafka消费者,消费cat topicprivate val consumer = new FlinkKafkaConsumer011[String]("cat", new SimpleStringSchema(), properties)import org.apache.flink.api.scala._private val stream: DataStream[String] = env.addSource(consumer)stream.print().setParallelism(1)//执行env.execute()
}
Flink+kafka是如何实现exactly-once语义的?
Flink
通过checkpoint
来保存数据是否处理完成的状态。
由JobManager
协调各个TaskManager
进行checkpoint
存储,checkpoint
保存在 StateBackend
中,默认StateBackend
是内存级的,也可以改为文件级的进行持久化保存。
执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
如果宕机需要通过StateBackend
进行恢复,只能恢复所有确认提交的操作。
总结
flink
可以接受各种常见source
,比如文件,集合,kafka,socket
等等
Flink API之Source入门相关推荐
- flink API之Sink入门
kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...
- flink Table API 与SQL入门实战
流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...
- Flink 教程 gitbook 从入门到入土(详细教程)
Flink从入门到入土(详细教程) 和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分 1.Environment Flink J ...
- 【Flink】Flink SQL 自定义 Source format
1.概述 转载:Flink SQL 自定义 Source format 1.背景 由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format. 2.步骤 1.自定义 ...
- 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink
1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...
- 初识Django —Python API接口编程入门
初识Django -Python API接口编程入门 一.WEB架构的简单介绍 Django是什么? Django是一个开放源代码的Web应用框架,由Python写成.我们的目标是用Python语言, ...
- Cannot resolve method ‘getTableEnvironment(org.apache.flink.api.java.ExecutionEnvironment)‘
代码如下: public class UDTF {public static void main(String[] args) throws Exception{ExecutionEnvironmen ...
- NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo
完整报错如下: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/sc ...
- cannot find or load main class org.apache.flink.api.scala.FlinkShell
报错复现如下: (Python3.6) appleyuchi@Desktop:bin$ ./start-scala-shell.sh 錯誤: 找不到或無法載入主要類別 org.apache.flink ...
最新文章
- pandas使用groupby函数计算dataframe数据中每个分组的滚动统计值(rolling statistics)的语法:例如分组的N天滚动平均值、滚动中位数、滚动最大最小值、滚动加和等
- OpenGL 字体颜色问题
- AVR 又一个网址推荐
- Otsu algorithm
- leetcode 95. Unique Binary Search Trees II | 96. Unique Binary Search Trees
- 最全三大框架整合(使用映射)——applicationContext.xml里面的配置
- WinCE5.0 SMDK2410 BSP在GEC2410开发板上的移植(11)-BINFS在Nand上的实现(Multi-Bin的实现)
- EASYUI- EASYUI左移右移 GRID中值
- 泰安本地话听不懂,为何后来能勉强听懂?
- vi/vim命令使用
- java c 语言之父_Java之父评价C语言之父:我用尽了形容词
- 火车进出栈【卡特兰数】【高精度】【压位】【压int位】
- 仿牛客社区项目3.2——发布帖子(异步通信技术AJAX)
- Python在数字后端中的应用(一)
- c1TrueDBGrid在C#中的研究
- 【MPPT】基于MPPT的风力发电系统simulink仿真
- react---收藏的点击和取消(刷新还会存在)--demo
- Git修改文件大小写的修改
- 百度灵医智惠渗透基层,以AI赋能中国医疗
- cesium中缓冲区分析
热门文章
- python基础系统性学习
- 主板开启网络唤醒(Wake on lan)
- 切换账号_微软 Edge 更新:自动切换工作 / 生活账号,移动端上线集锦功能
- DecimalFormat的使用
- java 不刷新页面_java – 更新jsp页面的内容而不刷新
- html布局文字设置,div css布局中css中文字体设置
- 驱动级的自动按键_太牛了!业余单片机爱好者用DS3231制作自动亮度的4位数码管时钟...
- select函数fdwrite用法_通俗易懂的学会:SQL窗口函数
- abaqus生成adams柔性体_Abaqus和STAR-CCM+流固耦合
- nessus8.9.0百度网盘_免费百度网盘SVIP共享20.1.19