本文主要介绍 Kafka 在 Apache Flink 中的使用,以一个简单的示例,向大家介绍在 Apache Flink 中如何使用 Kafka。

版本:

kafka_2.11-2.1.0.tgz

创建 Topic

Kafka 是消息订阅系统,首先创建可以被订阅的 Topic,我们创建一个名为 flink-tipic 的Topic,在一个新的 terminal 中,执行如下命令:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipicCreated topic "flink-tipic".

在 Kafka Server 的 terminal 中也会输出如下成功创建信息:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...

上面显示了flink-topic的基本属性配置,如消息压缩方式,消息格式,备份数量等等。

除了看日志,我们可以用命令显示的查询我们是否成功的创建了 flink-topic,如下:

 bin/kafka-topics.sh --list --zookeeper localhost:2181flink-tipic

如果输出flink-tipic,那么说明我们的 Topic 成功创建了。

那么 Topic 是保存在哪里?Kafka 是怎样进行消息的发布和订阅的呢?为直观,我们看如下 Kafka 架构示意图简单理解一下:

简单介绍一下,Kafka 利用 ZooKeeper 来存储集群信息,也就是上面我们启动的 Kafka Server 实例,一个集群中可以有多个 Kafka Server 实例,Kafka Server 叫做 Broker,我们创建的 Topic可以在一个或多个 Broker 中。Kafka 利用 Push 模式发送消息,利用 Pull 方式拉取消息。

1.3 发送消息

如何向已经存在的 Topic 中发送消息呢,当然我们可以 API 的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg
>Kafka connector

上面我们发送了两条消息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。

1.4 读取消息

如果读取指定 Topic 的消息呢?同样可以 API 和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector

其中--from-beginning 描述了我们从 Topic 开始位置读取消息。

2.Flink Kafka Connector

前面我们以最简单的方式安装了 Kafka 环境,那么我们以上面的环境介绍 Flink Kafka Connector 的使用。

Apache Flink 中提供了多个版本的 Kafka Connector,本篇以 Flink-1.7.0 版本为例进行介绍。

2.1 mvn 依赖

要使用 Kakfa Connector 需要在我们的 pom 中增加对 Kafka Connector 的依赖,如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.7.0</version>
</dependency>

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java / Scala 对象。 DeserializationSchema允许用户指定这样的模式。 为每个 Kafka 消息调用 T deserialize(byte [] message)方法,从 Kafka 传递值。

2.2 Examples

我们示例读取 Kafka 的数据,再将数据做简单处理之后写入到 Kafka 中。我们需要再创建一个用于写入的 Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output

所以示例中我们 Source 利用flink-topic,Sink用slink-topic-output。

2.2.1 Simple ETL

我们假设 Kafka 中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serialize和deserialize的实现,也就是我们要定义一个实现DeserializationSchema和SerializationSchema 的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写 Flink 主程序。

  • KafkaMsgSchema - 完整代码
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {private static final long serialVersionUID = 1L;private transient Charset charset;public KafkaMsgSchema() {// 默认UTF-8编码this(Charset.forName("UTF-8"));}public KafkaMsgSchema(Charset charset) {this.charset = Preconditions.checkNotNull(charset);}public Charset getCharset() {return this.charset;}public String deserialize(byte[] message) {// 将Kafka的消息反序列化为java对象return new String(message, charset);}public boolean isEndOfStream(String nextElement) {// 流永远不结束return false;}public byte[] serialize(String element) {// 将java对象序列化为Kafka的消息return element.getBytes(this.charset);}public TypeInformation<String> getProducedType() {// 定义产生的数据Typeinforeturn BasicTypeInfo.STRING_TYPE_INFO;}private void writeObject(ObjectOutputStream out) throws IOException {out.defaultWriteObject();out.writeUTF(this.charset.name());}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();String charsetName = in.readUTF();this.charset = Charset.forName(charsetName);}
}
  • 主程序 - 完整代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;public class KafkaExample {public static void main(String[] args) throws Exception {// 用户参数获取final ParameterTool parameterTool = ParameterTool.fromArgs(args);// Stream 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Source的topicString sourceTopic = "flink-topic";// Sink的topicString sinkTopic = "flink-topic-output";// broker 地址String broker = "localhost:9092";// 属性参数 - 实际投产可以在命令行传入Properties p = parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers", broker);env.getConfig().setGlobalJobParameters(parameterTool);// 创建消费者FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(sourceTopic,new KafkaMsgSchema(),p);// 设置读取最早的数据
//        consumer.setStartFromEarliest();// 读取Kafka消息DataStream<String> input = env.addSource(consumer);// 数据处理DataStream<String> result = input.map(new MapFunction<String, String>() {public String map(String s) throws Exception {String msg = "Flink study ".concat(s);System.out.println(msg);return msg;}});// 创建生产者FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(sinkTopic,new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);// 将数据写入Kafka指定Topic中result.addSink(producer);// 执行jobenv.execute("Kafka Example");}
}

运行主程序如下:

我测试操作的过程如下:

  • 启动flink-topic和flink-topic-output的消费拉取;
  • 通过命令向flink-topic中添加测试消息only for test;
  • 通过命令打印验证添加的测试消息 only for test;
  • 最简单的 FlinkJob source->map->sink 对测试消息进行 map 处理:""Flink study ".concat(s);
  • 通过命令打印 sink 的数据;

2.2.2 内置 Schemas

Apache Flink 内部提供了如下 3 种内置的常用消息格式的 Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema)它基于 Flink 的 TypeInformation 创建模式。 如果数据由 Flink 写入和读取,这将非常有用。
  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它将序列化的JSON转换为 ObjectNode 对象,可以使用 objectNode.get(“field”)作为(Int / String / ...)()从中访问字段。 KeyValue objectNode 包含“key”和“value”字段,其中包含所有字段以及可选的"metadata"字段,该字段公开此消息的偏移量/分区/主题。
  • AvroDeserializationSchema  它使用静态提供的模式读取使用 Avro 格式序列化的数据。 它可以从 Avro 生成的类(AvroDeserializationSchema.forSpecific(...))推断出模式,或者它可以与 GenericRecords 一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用内置的 Schemas 需要添加如下依赖:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>1.7.0</version>
</dependency>

2.2.3 读取位置配置

我们在消费 Kafka 数据时候,可能需要指定消费的位置,FlinkKafkaConsumer提供很多便利的位置设置,如下:

  • consumer.setStartFromEarliest() - 从最早的记录开始;
  • consumer.setStartFromLatest() - 从最新记录开始;
  • consumer.setStartFromTimestamp(...); // 从指定的 epoch 时间戳(毫秒)开始;
  • consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

上面的位置指定可以精确到每个分区,比如如下代码:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始consumer.setStartFromSpecificOffsets(specificStartOffsets);

对于没有指定的分区还是默认的 setStartFromGroupOffsets 方式。

2.2.4 Topic 发现

Kafka 支持 Topic 自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如:

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);

在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。

2.3 定义 Watermark(Window)

对 Kafka Connector 的应用不仅限于上面的简单数据提取,我们更多时候是期望对 Kafka 数据进行 Event-time 的窗口操作,那么就需要在 Flink Kafka Source 中定义 Watermark。

要定义 Event-time,首先是 Kafka 数据里面携带时间属性,假设我们数据是String#Long的格式,如only for test#1000。那么我们将Long作为时间列。

  • KafkaWithTsMsgSchema - 完整代码

要想解析上面的 Kafka 的数据格式,我们需要开发一个自定义的 Schema,比如叫KafkaWithTsMsgSchema,将String#Long解析为一个 Java 的Tuple2<String, Long>,完整代码如下:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {private static final long serialVersionUID = 1L;private transient Charset charset;public KafkaWithTsMsgSchema() {this(Charset.forName("UTF-8"));}public KafkaWithTsMsgSchema(Charset charset) {this.charset = Preconditions.checkNotNull(charset);}public Charset getCharset() {return this.charset;}public Tuple2<String, Long> deserialize(byte[] message) {String msg = new String(message, charset);String[] dataAndTs = msg.split("#");if(dataAndTs.length == 2){return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));}else{// 实际生产上需要抛出runtime异常System.out.println("Fail due to invalid msg format.. ["+msg+"]");return new Tuple2<String, Long>(msg, 0L);}}@Overridepublic boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {return false;}public byte[] serialize(Tuple2<String, Long> element) {return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);}private void writeObject(ObjectOutputStream out) throws IOException {out.defaultWriteObject();out.writeUTF(this.charset.name());}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();String charsetName = in.readUTF();this.charset = Charset.forName(charsetName);}@Overridepublic TypeInformation<Tuple2<String, Long>> getProducedType() {return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);}
}
  • Watermark 生成

提取时间戳和创建 Watermark,需要实现一个自定义的时间提取和 Watermark 生成器。在Apache Flink 内部有 2 种方式如下:

  • AssignerWithPunctuatedWatermarks - 每条记录都产生Watermark。
  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

我们以AssignerWithPunctuatedWatermarks为例写一个自定义的时间提取和 Watermark 生成器。代码如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;import javax.annotation.Nullable;public class KafkaAssignerWithPunctuatedWatermarksimplements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {// 利用提取的时间戳创建Watermarkreturn new Watermark(l);}@Overridepublic long extractTimestamp(Tuple2<String, Long> o, long l) {// 提取时间戳return o.f1;}
}
  • 主程序 - 完整程序

我们计算一个大小为 1 秒的 Tumble 窗口,计算窗口内最大的值。完整的程序如下

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;public class KafkaWithEventTimeExample {public static void main(String[] args) throws Exception {// 用户参数获取final ParameterTool parameterTool = ParameterTool.fromArgs(args);// Stream 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 Event-timeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Source的topicString sourceTopic = "flink-topic";// Sink的topicString sinkTopic = "flink-topic-output";// broker 地址String broker = "localhost:9092";// 属性参数 - 实际投产可以在命令行传入Properties p = parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers", broker);env.getConfig().setGlobalJobParameters(parameterTool);// 创建消费者FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(sourceTopic,new KafkaWithTsMsgSchema(),p);// 读取Kafka消息TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);DataStream<Tuple2<String, Long>> input = env.addSource(consumer).returns(typeInformation)// 提取时间戳,并生产Watermark.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());// 数据处理DataStream<Tuple2<String, Long>> result = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).max(0);// 创建生产者FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(sinkTopic,new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);// 将数据写入Kafka指定Topic中result.addSink(producer);// 执行jobenv.execute("Kafka With Event-time Example");}
}

测试运行如下:

简单解释一下,我们输入数如下:

我们看的5000000~7000000之间的数据,其中B#5000000, C#5000100和E#5000120是同一个窗口的内容。计算 MAX 值,按字符串比较,最大的消息就是输出的E#5000120。

2.4 Kafka 携带 Timestamps

在 Kafka-0.10+ 消息可以携带 timestamps,也就是说不用单独的在 msg 中显示添加一个数据列作为 timestamps。只有在写入和读取都用 Flink 时候简单一些。一般情况用上面的示例方式已经足够了。

3.小结

本篇重点是向大家介绍 Kafka 如何在 Flink 中进行应用,开篇介绍了 Kafka 的简单安装和收发消息的命令演示,然后以一个简单的数据提取和一个 Event-time 的窗口示例让大家直观的感受如何在 Apache Flink 中使用 Kafka。

Flink:DataStream Connectors 之 Kafka相关推荐

  1. Flink DataStream Connectors 之 Apache Kafka 连接器

    文章目录 依赖 Kafka Source 使用方法 Topic / Partition 订阅 消息解析 起始消费位点 有界 / 无界模式 其他属性 动态分区检查 事件时间和水印 空闲 消费位点提交 监 ...

  2. Flink 系例 之 Connectors 连接 Kafka

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  3. Flink DataStream读写Kafka

    Flink提供了Kafka连接器,用于从或向Kafka读写数据. 本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理. 问题一: 读Kafka的方式 ## 读取一个Topic Fl ...

  4. Flink Caused by:org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException

    Flink程序从kafka中读取数据进行计算,FLink程序一启动就报以下错误,看到错误很懵逼.加班到9点没解决,第二天提前来半小时,把如下错误信息又看了一遍.具体错误如下: 错误信息1. 20/12 ...

  5. flink 写kafka_flink消费kafka的offset与checkpoint

    生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...

  6. Flink的并行度和Kafka的partition的结合

    flink kafka实现反序列化: package Flink_Kafka;import com.alibaba.fastjson.JSON; import org.apache.flink.api ...

  7. flink入门3-Flink连接Kafka、Redis,实现Kafka Source/Redis Sink

    本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的. 1.单机部署Kafka 1.1 下载Kafka压缩包,解压 ...

  8. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  9. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

最新文章

  1. Ajax系列之JSON数据格式
  2. Shell脚本的调试技术
  3. 速领:1024大礼包
  4. 7、Reverse Integer(python)
  5. 全球及中国手持式吸尘器行业供应需求及未来投资潜力预测报告2022-2027年
  6. STM32F4 HAL库开发 -- DMA
  7. Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime
  8. usb连接不上 艾德克斯电源_艾德克斯双范围可编程直流电源IT6800A/B系列
  9. matlab 自再现模,平行平面腔自再现模FoxLi数值迭代解法及MATLAB实现
  10. 如何对 string 进行Base64编码,解码?
  11. linux基础知识——环境变量
  12. java基于文件的map实现_Mybatis中返回Map的实现
  13. phpexcel中文教程-设置表格字体颜色背景样式、数据格式、对齐方式、添加图片、批注、文字块、合并拆分单元格、单元格密码保护...
  14. 备战数学建模12-模糊综合评价模型
  15. matlab求数组转置,数组与矩阵运算 - MATLAB Simulink - MathWorks 中国
  16. MySQL的字符集和校对规则,你都会了吗?
  17. 陆羽茶交所严建红:陆羽茶产业互联网三螺旋模式
  18. Sketch 插件开发指南
  19. Microsoft Edge 离线安装包下载
  20. 树莓派Zero 2W python3.7 安装tensorflow2.2

热门文章

  1. oracle不足位数补零的实现sql语句
  2. 台球小技巧:高杆低杆中杆各种杆法效果线路图解
  3. 他趣产品总监张俊杰:从“爹爹框架”看内容平台运营的 4 大关键点
  4. 西门子S7-1500PLC程序 汽车焊装程序,有RFID 机器人精智面板,多种编程语言并存,FBD SCL STL 变频器控制 伺服控制FB285 FB284控制
  5. alternate端口什么意思_alternate是什么意思_alternate的翻译_音标_读音_用法_例句_爱词霸在线词典...
  6. 闵帆老师《论文写作》课程学习心得
  7. 标准型计算机窗口截图快捷键,如何快速打开电脑上的计算器
  8. 序列比对算法-计算生物学
  9. 如何制作表情包怎么制作gif动图
  10. 说说IOPS的重要指标