Apache Flink和Kafka入门
介绍
Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。
Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。
先决条件
- Apache Kafka 0.9.x
- 吉特
- Maven 3.x或更高版本
创建您的Flink流项目
第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,该原型包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。
在此应用程序中,我们将创建两个作业:
WriteToKafka
:生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。ReadFromKafka
:读取相同的主题,并使用Kafka Flink连接器及其使用方在标准输出中显示消息。 API。
完整项目可在GitHub上找到:
- Flink和Kakfa应用
让我们使用Apache Maven创建项目:
mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false
Maven将创建以下结构:
tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│ └── com│ └── grallandco│ └── demos│ ├── BatchJob.java│ ├── SocketTextStreamWordCount.java│ ├── StreamingJob.java│ └── WordCount.java└── resources└── log4j.properties7 directories, 6 files
该项目被配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行该文件所需的所有依赖项。
该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。
添加Kafka连接器
打开pom.xml
并将以下依赖项添加到您的项目中:
第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:
您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>
元素中添加以下条目:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>
现在,Flink项目已准备就绪,可以通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。
安装并启动Kafka
下载Kafka,在终端中输入以下命令:
curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0
Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:
./bin/zookeeper-server-start.sh config/zookeeper.properties
通过在新终端中运行以下命令来启动Kafka代理:
./bin/kafka-server-start.sh config/server.properties
在另一个终端中,运行以下命令来创建一个名为flink-demo
的Kafka主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo
使用Kafka工具将消息发布和使用到flink-demo
主题。
制片人
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo
消费者
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning
在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。
编写您的Flink应用程序
现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。
制片人
生产者使用SimpleStringGenerator()
类生成消息,并将该字符串发送到flink-demo
主题。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}
SimpleStringGenerator()
方法代码在此处可用。
主要步骤是:
- 在任何Flink应用程序的基础上创建一个新的
StreamExecutionEnvironment
- 在应用程序环境中创建一个新的
DataStream
时,SimpleStringGenerator
类将Flink中所有流数据源的Source接口实现SourceFunction 。 - 将
FlinkKafkaProducer09
器添加到主题。
消费者
使用者只需从flink-demo
主题中读取消息,然后将它们打印到控制台中即可。
public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}
主要步骤是:
- 在任何Flink应用程序的基础上创建一个新的
StreamExecutionEnvironment
- 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者
group.id
。 - 使用
FlinkKafkaConsumer09
从主题flink-demo
获取消息
生成并运行应用程序
让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。
1-建立专案:
$ mvn clean package
2-运行Flink生产者作业
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka
3-运行Flink消费者工作
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka
在终端中,您应该看到生产者生成的消息
现在,您可以在Flink群集上部署并执行此作业。
结论
在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。
翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html
Apache Flink和Kafka入门相关推荐
- flink和kafka区别_Apache Flink和Kafka入门
flink和kafka区别 介绍 Apache Flink是用于分布式流和批处理数据处理的开源平台. Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序. Flink应用程序通常使 ...
- Apache Flink 零基础入门【转】
Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...
- Apache Flink 零基础入门(二十)Flink kafka connector
内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...
- Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...
- Apache Flink 零基础入门(一):基础概念解析
Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...
- Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
文章目录: Apache Flink 应用程序中的 Exactly-Once 语义 Flink 应用程序端到端的 Exactly-Once 语义 示例 Flink 应用程序启动预提交阶段 在 Flin ...
- flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理
周凯波(宝牛) 阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构.目前在阿里计算平台事业 ...
- 5.Flink对接Kafka入门
Flink Connector Kafka 1. Kafka 1.1. [Kafka官网](http://kafka.apache.org/) 1.2. Kafka 简述 1.3. Kafka特性 1 ...
- Apache Flink 零基础入门(二十)Flink部署与作业的提交
之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...
最新文章
- linux 内核位置无关,Linux内核启动阶段虚实地址映射
- 强制html元素不随窗口缩小而换行
- Pycharm使用black作为Python代码格式化外部工具
- Spring 实战-第一章-基本概念
- Qt文档阅读笔记-Label QML Type官方解析及实例
- 分页池内存持续增长_鸿蒙内核源码分析(从进程/线程视角看内存)
- php制作的ios端 跳转url,ThinkPHP 简易开发思路 MVC和URL跳转
- 四部门联合约谈马云等蚂蚁集团有关人员,蚂蚁集团回应...
- 改变计算机界的存储解决方案:RAID,30岁生日快乐!
- staticmethod自己定制
- 云安全-Python实现凯撒密码和替换密码的加密解密与暴力破解
- 分不清ERP、SAP、MES?我来帮你搞定
- 干货 | 华为内部几近满分的项目管理PPT
- ubuntu16.04 lidar_align实现三维激光雷达和Imu联合标定
- 2020年12月程序员工资新出炉,和你相差多少?
- 【matplotlib】画图怎样将中文为宋体-英文为新罗马字体
- SJT生成排列(清华OJ)
- mysql中的浮点数和定点数
- python sql语句换行_python一行sql太长折成多行并且有多个参数的方法
- 多个lmg在盒子里在左浮动( float: left)时候出现横向图片中间有缝隙
热门文章
- Java对象内存结构
- Maven精选系列--发布jar包到Nexus私库
- 大二暑假工作三个月后辞职,总体感悟
- C++描述杭电OJ 2018.母牛的故事 ||
- 使用相对长度单位rem布局网页内容
- 可以代表学计算机的标志,桌面上的图标可以用来表示
- java面试设计模式
- android 按键消息,在android中模拟键盘消息(shell命令的方法)
- (转)web.xml 中的listener、 filter、servlet 加载顺序及其详解
- java虚拟机采用UTF-16编码格式对字符进行编码