flink和kafka区别_Apache Flink和Kafka入门
flink和kafka区别
介绍
Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。
Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。
先决条件
- Apache Kafka 0.9.x
- 吉特
- Maven 3.x或更高版本
创建您的Flink流项目
第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,其中包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。
在此应用程序中,我们将创建两个作业:
WriteToKafka
:生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。ReadFromKafka
:读取相同的主题,并使用Kafka Flink连接器及其使用者在标准输出中打印消息。 API。
完整项目可在GitHub上找到:
- Flink和Kakfa应用
让我们使用Apache Maven创建项目:
mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false
Maven将创建以下结构:
tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│ └── com│ └── grallandco│ └── demos│ ├── BatchJob.java│ ├── SocketTextStreamWordCount.java│ ├── StreamingJob.java│ └── WordCount.java└── resources└── log4j.properties7 directories, 6 files
此项目配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行它所需的所有依赖关系。
该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。
添加Kafka连接器
打开pom.xml
并将以下依赖项添加到您的项目中:
第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:
您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>
元素中添加以下条目:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>
Flink项目现在准备通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。
安装并启动Kafka
下载Kafka,在终端中输入以下命令:
curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0
Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:
./bin/zookeeper-server-start.sh config/zookeeper.properties
通过在新终端中运行以下命令来启动Kafka代理:
./bin/kafka-server-start.sh config/server.properties
在另一个终端中,运行以下命令来创建一个名为flink-demo
的Kafka主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo
使用Kafka工具将消息发布和使用到flink-demo
主题。
制片人
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo
消费者
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning
在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。
编写您的Flink应用程序
现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。
制片人
生产者使用SimpleStringGenerator()
类生成消息,然后将字符串发送到flink-demo
主题。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}
SimpleStringGenerator()
方法代码在此处提供 。
主要步骤是:
- 在任何Flink应用程序的基础上创建一个新的
StreamExecutionEnvironment
- 在应用程序环境中创建一个新的
DataStream
时,SimpleStringGenerator
类将Flink中所有流数据源的Source接口实现SourceFunction 。 - 将
FlinkKafkaProducer09
器添加到主题。
消费者
使用者只需从flink-demo
主题中读取消息,然后将它们打印到控制台中即可。
public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}
主要步骤是:
- 在任何Flink应用程序的基础上创建一个新的
StreamExecutionEnvironment
- 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者
group.id
。 - 使用
FlinkKafkaConsumer09
从主题flink-demo
获取消息
生成并运行应用程序
让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。
1-建立专案:
$ mvn clean package
2-运行Flink生产者作业
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka
3-运行Flink消费者工作
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka
在终端中,您应该看到生产者生成的消息
现在,您可以在Flink群集上部署并执行此作业。
结论
在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。
翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html
flink和kafka区别
flink和kafka区别_Apache Flink和Kafka入门相关推荐
- kafka window 启动_Apache Flink结合Kafka构建端到端的Exactly-Once处理
Apache Flink自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira).它提取了两阶段提交协议的 ...
- flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现
2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...
- 【Flink】kafka FlinkKafkaException send data to Kafka old epoch newer producer same transactionalId
文章目录 1.场景1 1.1 概述 2.场景2 M.参考 1.场景1 1.1 概述 重复问题:[Flink]kafka INVALID_PRODUCER_EPO send data to Kafka ...
- 【Flink】kafka INVALID_PRODUCER_EPO send data to Kafka old epoch newer producer same transactionalId
文章目录 1.场景1 1.1 原因 1.2 解决 1.3 源码 2.类似问题 1.场景1 问题重复:[Flink]kafka FlinkKafkaException send data to Kafk ...
- Flink(16):Flink之Connect Kafka API
目录 0. 相关文章链接 1. pom依赖 2. 参数设置 3. 参数说明 3.1. 序列化和反序列化器 3.2. 消费者起始位置 3.3. 动态分区检测 3.4. Connect Kaf ...
- 【Flink】 java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
自研flinsql提交平台,在服务器上新安装了一个flink客户端,准备提交flinksql到yarn,执行测试的时候报错 /data/flink/flink-1.13.3/bin/flink run ...
- flink checkpoint 恢复_Apache Flink 管理大型状态之增量 Checkpoint 详解
邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink State&C ...
- kafka版本_Apache Kafka 版本演进及特性介绍
前段时间有一个同事问到:Kafka 0.8.2 只能使用Zookeeper连接吗?虽然仍有一部分Kafka的老用户在使用 0.8.x 版本,但 Kafka 0.8.x 确实是比较老的版本了.如果不是对 ...
- flink笔记1(初识 Flink)
flink 一.初识 Flink 1.概念 2. Flink 的应用 (1)Flink 主要的应用场景 3.流式数据处理的发展和演变 (1)流处理和批处理 (2)传统事务处理 (3)有状态的流处理 ( ...
最新文章
- java数组语法_Java 基本语法----数组
- 美媒人工智能(AI)代表了计算的优点,没有人类推理的缺点
- 高密度(HD)电路的设计 (主指BGA封装的布线设计)
- Linux内核文件vmlinux 和压缩后的bzImage文件格式分析
- pccs色卡_NCS色彩体系与PCCS色彩体系如何关联使用
- 软考准考证打印详细步骤(打印一张纸上)
- ejb 2.1 jboss_JBoss AS 8中的Java EE 7和EJB 3.2支持
- 【Python实战】使用python批量生成发票
- Redis面试之传统五大数据类型的落地应用详解
- matlab cy11,matlab解线性回归方程,y=a0+a1*A+a2*B+a3*C+a4*D+a5*E+a6*F; 数据足够,求a0,a
- mysql扩展函数创建临时表_MySQL函数中创建临时表
- markdown 文档转 word
- 格雷码与二进制码之间的相互转换
- 采用RP2040 MCU的树莓派Pico迷你开发板介绍
- java面试常问问题及答案,附源代码
- 使用FFmpeg命令实现音视频转码的备忘录
- 基础SQL Server 操作问题——对象‘主键’依赖于列‘ID’/标识列‘ID’的数据类型必须是int,bigint,smallint等
- 数字电路实验一 组合逻辑电路的设计预实验报告
- Python实现获取汉字笔画数,根据汉字笔画数量排序
- 易语言c源码流程图怎么实现,基于易语言的弯管坐标转换加工程序研究