flink和kafka区别

介绍

Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。

Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。

先决条件

  • Apache Kafka 0.9.x
  • 吉特
  • Maven 3.x或更高版本

创建您的Flink流项目

第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,其中包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。

在此应用程序中,我们将创建两个作业:

  • WriteToKafka :生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。
  • ReadFromKafka :读取相同的主题,并使用Kafka Flink连接器及其使用者在标准输出中打印消息。 API。

完整项目可在GitHub上找到:

  • Flink和Kakfa应用

让我们使用Apache Maven创建项目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false

Maven将创建以下结构:

tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│   └── com│       └── grallandco│           └── demos│               ├── BatchJob.java│               ├── SocketTextStreamWordCount.java│               ├── StreamingJob.java│               └── WordCount.java└── resources└── log4j.properties7 directories, 6 files

此项目配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行它所需的所有依赖关系。

该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。

添加Kafka连接器

打开pom.xml并将以下依赖项添加到您的项目中:

第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:

您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>元素中添加以下条目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>

Flink项目现在准备通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。

安装并启动Kafka

下载Kafka,在终端中输入以下命令:

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0

Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:

./bin/zookeeper-server-start.sh config/zookeeper.properties

通过在新终端中运行以下命令来启动Kafka代理:

./bin/kafka-server-start.sh config/server.properties

在另一个终端中,运行以下命令来创建一个名为flink-demo的Kafka主题:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo

使用Kafka工具将消息发布和使用到flink-demo主题。

制片人

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo

消费者

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning

在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。

编写您的Flink应用程序

现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。

制片人

生产者使用SimpleStringGenerator()类生成消息,然后将字符串发送到flink-demo主题。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}

SimpleStringGenerator()方法代码在此处提供 。

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 在应用程序环境中创建一个新的DataStream时, SimpleStringGenerator类将Flink中所有流数据源的Source接口实现SourceFunction 。
  • FlinkKafkaProducer09器添加到主题。

消费者

使用者只需从flink-demo主题中读取消息,然后将它们打印到控制台中即可。

public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者group.id
  • 使用FlinkKafkaConsumer09从主题flink-demo获取消息

生成并运行应用程序

让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。

1-建立专案:

$ mvn clean package

2-运行Flink生产者作业

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3-运行Flink消费者工作

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

在终端中,您应该看到生产者生成的消息

现在,您可以在Flink群集上部署并执行此作业。

结论

在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。

翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html

flink和kafka区别

flink和kafka区别_Apache Flink和Kafka入门相关推荐

  1. kafka window 启动_Apache Flink结合Kafka构建端到端的Exactly-Once处理

    Apache Flink自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira).它提取了两阶段提交协议的 ...

  2. flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...

  3. 【Flink】kafka FlinkKafkaException send data to Kafka old epoch newer producer same transactionalId

    文章目录 1.场景1 1.1 概述 2.场景2 M.参考 1.场景1 1.1 概述 重复问题:[Flink]kafka INVALID_PRODUCER_EPO send data to Kafka ...

  4. 【Flink】kafka INVALID_PRODUCER_EPO send data to Kafka old epoch newer producer same transactionalId

    文章目录 1.场景1 1.1 原因 1.2 解决 1.3 源码 2.类似问题 1.场景1 问题重复:[Flink]kafka FlinkKafkaException send data to Kafk ...

  5. Flink(16):Flink之Connect Kafka API

    目录 ​​​​​​0. 相关文章链接 1. pom依赖 2. 参数设置 3. 参数说明 3.1. 序列化和反序列化器 3.2. 消费者起始位置 3.3. 动态分区检测 3.4. Connect Kaf ...

  6. 【Flink】 java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

    自研flinsql提交平台,在服务器上新安装了一个flink客户端,准备提交flinksql到yarn,执行测试的时候报错 /data/flink/flink-1.13.3/bin/flink run ...

  7. flink checkpoint 恢复_Apache Flink 管理大型状态之增量 Checkpoint 详解

    邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink  State&C ...

  8. kafka版本_Apache Kafka 版本演进及特性介绍

    前段时间有一个同事问到:Kafka 0.8.2 只能使用Zookeeper连接吗?虽然仍有一部分Kafka的老用户在使用 0.8.x 版本,但 Kafka 0.8.x 确实是比较老的版本了.如果不是对 ...

  9. flink笔记1(初识 Flink)

    flink 一.初识 Flink 1.概念 2. Flink 的应用 (1)Flink 主要的应用场景 3.流式数据处理的发展和演变 (1)流处理和批处理 (2)传统事务处理 (3)有状态的流处理 ( ...

最新文章

  1. java数组语法_Java 基本语法----数组
  2. 美媒人工智能(AI)代表了计算的优点,没有人类推理的缺点
  3. 高密度(HD)电路的设计 (主指BGA封装的布线设计)
  4. Linux内核文件vmlinux 和压缩后的bzImage文件格式分析
  5. pccs色卡_NCS色彩体系与PCCS色彩体系如何关联使用
  6. 软考准考证打印详细步骤(打印一张纸上)
  7. ejb 2.1 jboss_JBoss AS 8中的Java EE 7和EJB 3.2支持
  8. 【Python实战】使用python批量生成发票
  9. Redis面试之传统五大数据类型的落地应用详解
  10. matlab cy11,matlab解线性回归方程,y=a0+a1*A+a2*B+a3*C+a4*D+a5*E+a6*F; 数据足够,求a0,a
  11. mysql扩展函数创建临时表_MySQL函数中创建临时表
  12. markdown 文档转 word
  13. 格雷码与二进制码之间的相互转换
  14. 采用RP2040 MCU的树莓派Pico迷你开发板介绍
  15. java面试常问问题及答案,附源代码
  16. 使用FFmpeg命令实现音视频转码的备忘录
  17. 基础SQL Server 操作问题——对象‘主键’依赖于列‘ID’/标识列‘ID’的数据类型必须是int,bigint,smallint等
  18. 数字电路实验一 组合逻辑电路的设计预实验报告
  19. Python实现获取汉字笔画数,根据汉字笔画数量排序
  20. 易语言c源码流程图怎么实现,基于易语言的弯管坐标转换加工程序研究

热门文章

  1. CF848E-Days of Floral Colours【dp,分治NTT】
  2. AT4378-[AGC027D]ModuloMatrix【构造】
  3. YbtOJ#20067-[NOIP2020模拟赛B组Day5]糖果分配【dp】
  4. 【期望】期望收益(金牌导航 期望-3)
  5. 【SPFA】重建道路(jzoj 1212)
  6. Sentinel(二)之Quick Start
  7. Java web文件下载断点续传
  8. Java 内存泄露总结
  9. array关于map,reduce,filter的用法
  10. 投资理财要趁早,基金风险是最小!