介绍

Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。

Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。

先决条件

  • Apache Kafka 0.9.x
  • 吉特
  • Maven 3.x或更高版本

创建您的Flink流项目

第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,该原型包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。

在此应用程序中,我们将创建两个作业:

  • WriteToKafka :生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。
  • ReadFromKafka :读取相同的主题,并使用Kafka Flink连接器及其使用方在标准输出中显示消息。 API。

完整项目可在GitHub上找到:

  • Flink和Kakfa应用

让我们使用Apache Maven创建项目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false

Maven将创建以下结构:

tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│   └── com│       └── grallandco│           └── demos│               ├── BatchJob.java│               ├── SocketTextStreamWordCount.java│               ├── StreamingJob.java│               └── WordCount.java└── resources└── log4j.properties7 directories, 6 files

该项目被配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行该文件所需的所有依赖项。

该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。

添加Kafka连接器

打开pom.xml并将以下依赖项添加到您的项目中:

第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:

您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>元素中添加以下条目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>

现在,Flink项目已准备就绪,可以通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。

安装并启动Kafka

下载Kafka,在终端中输入以下命令:

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0

Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:

./bin/zookeeper-server-start.sh config/zookeeper.properties

通过在新终端中运行以下命令来启动Kafka代理:

./bin/kafka-server-start.sh config/server.properties

在另一个终端中,运行以下命令来创建一个名为flink-demo的Kafka主题:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo

使用Kafka工具将消息发布和使用到flink-demo主题。

制片人

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo

消费者

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning

在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。

编写您的Flink应用程序

现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。

制片人

生产者使用SimpleStringGenerator()类生成消息,并将该字符串发送到flink-demo主题。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}

SimpleStringGenerator()方法代码在此处可用。

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 在应用程序环境中创建一个新的DataStream时, SimpleStringGenerator类将Flink中所有流数据源的Source接口实现SourceFunction 。
  • FlinkKafkaProducer09器添加到主题。

消费者

使用者只需从flink-demo主题中读取消息,然后将它们打印到控制台中即可。

public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者group.id
  • 使用FlinkKafkaConsumer09从主题flink-demo获取消息

生成并运行应用程序

让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。

1-建立专案:

$ mvn clean package

2-运行Flink生产者作业

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3-运行Flink消费者工作

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

在终端中,您应该看到生产者生成的消息

现在,您可以在Flink群集上部署并执行此作业。

结论

在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。

翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html

Apache Flink和Kafka入门相关推荐

  1. flink和kafka区别_Apache Flink和Kafka入门

    flink和kafka区别 介绍 Apache Flink是用于分布式流和批处理数据处理的开源平台. Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序. Flink应用程序通常使 ...

  2. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  3. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  4. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  5. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  6. Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

    文章目录: Apache Flink 应用程序中的 Exactly-Once 语义 Flink 应用程序端到端的 Exactly-Once 语义 示例 Flink 应用程序启动预提交阶段 在 Flin ...

  7. flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理

    周凯波(宝牛) 阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构.目前在阿里计算平台事业 ...

  8. 5.Flink对接Kafka入门

    Flink Connector Kafka 1. Kafka 1.1. [Kafka官网](http://kafka.apache.org/) 1.2. Kafka 简述 1.3. Kafka特性 1 ...

  9. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

最新文章

  1. linux 内核位置无关,Linux内核启动阶段虚实地址映射
  2. 强制html元素不随窗口缩小而换行
  3. Pycharm使用black作为Python代码格式化外部工具
  4. Spring 实战-第一章-基本概念
  5. Qt文档阅读笔记-Label QML Type官方解析及实例
  6. 分页池内存持续增长_鸿蒙内核源码分析(从进程/线程视角看内存)
  7. php制作的ios端 跳转url,ThinkPHP 简易开发思路 MVC和URL跳转
  8. 四部门联合约谈马云等蚂蚁集团有关人员,蚂蚁集团回应...
  9. 改变计算机界的存储解决方案:RAID,30岁生日快乐!
  10. staticmethod自己定制
  11. 云安全-Python实现凯撒密码和替换密码的加密解密与暴力破解
  12. 分不清ERP、SAP、MES?我来帮你搞定
  13. 干货 | 华为内部几近满分的项目管理PPT
  14. ubuntu16.04 lidar_align实现三维激光雷达和Imu联合标定
  15. 2020年12月程序员工资新出炉,和你相差多少?
  16. 【matplotlib】画图怎样将中文为宋体-英文为新罗马字体
  17. SJT生成排列(清华OJ)
  18. mysql中的浮点数和定点数
  19. python sql语句换行_python一行sql太长折成多行并且有多个参数的方法
  20. 多个lmg在盒子里在左浮动( float: left)时候出现横向图片中间有缝隙

热门文章

  1. Java对象内存结构
  2. Maven精选系列--发布jar包到Nexus私库
  3. 大二暑假工作三个月后辞职,总体感悟
  4. C++描述杭电OJ 2018.母牛的故事 ||
  5. 使用相对长度单位rem布局网页内容
  6. 可以代表学计算机的标志,桌面上的图标可以用来表示
  7. java面试设计模式
  8. android 按键消息,在android中模拟键盘消息(shell命令的方法)
  9. (转)web.xml 中的listener、 filter、servlet 加载顺序及其详解
  10. java虚拟机采用UTF-16编码格式对字符进行编码