1.Flink读取kafka策略

读取kafka策略有

  • org.apache.kafka.clients.consumer.RangeAssignor
  • org.apache.kafka.clients.consumer.RoundRobinAssignor
  • org.apache.kafka.clients.consumer.StickyAssignor
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor

默认为 RangeAssignor,flinksql中可以如下调整:

'properties.partition.assignment.strategy' = 'org.apache.kafka.clients.consumer.RoundRobinAssignor'

kafka 消费者的消费策略以及再平衡_健康平安的活着的博客-CSDN博客_kafka 消费策略https://blog.csdn.net/u011066470/article/details/124090278

2.Flink写入Kafka策略

2.1默认构造器

一般情况下使用

FlinkKafkaProducer(String topicId,SerializationSchema<IN> serializationSchema,Properties producerConfig)

构造器,默认为 FlinkFixedPartitioner分区器。公式为parallelInstanceId % partitions.length。即按照分区轮询,如 flink sink有5个subtask分区,kafka 有3个分区。则 1 -> 1,2 -> 2, 3 -> 3,4 -> 1,5 -> 2 。如此

代码如下

// targetTopic为flink task sink的 parallelism,partitions 为kafka的分区,parallelInstanceId 为 当前task 在 parallelism中的编号
@Overridepublic int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {Preconditions.checkArgument(partitions != null && partitions.length > 0,"Partitions of the target topic is empty.");return partitions[parallelInstanceId % partitions.length];}

思考:按照该策略,如果sink subtask数比topic的partition数少,会不会有partition没有数据?

待研究

2.2.自定义Kafka schema

如自定义 KafkaSerializationSchema。且调用

FlinkKafkaProducer(String defaultTopic,KafkaSerializationSchema<IN> serializationSchema,Properties producerConfig,FlinkKafkaProducer.Semantic semantic) 

构造器。此时分区器为null。

此时依次调用了 
record = kafkaSchema.serialize(next, context.timestamp());
transaction.producer.send(record, callback);

方法。 send方法为kafka的发送方法DefaultPartitioner 代码如下

/*** The default partitioning strategy:* <ul>* <li>If a partition is specified in the record, use it* <li>If no partition is specified but a key is present choose a partition based on a hash of the key* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.* * See KIP-480 for details about sticky partitioning.*/
public class DefaultPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();public void configure(Map<String, ?> configs) {}/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);} List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}public void close() {}/*** If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one.*/public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}

即:

  1. 如果指定了分区,则写入指定分区
  2. 如果则定了key,则按照key进行hash计算分区
  3. 如果没有指定key则采用粘性分区,即分批随机写入,保证负载均衡

关于 StickyPartitionCache.nextPartition 代码如下:

public int nextPartition(String topic, Cluster cluster, int prevPartition) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);Integer oldPart = indexCache.get(topic);Integer newPart = oldPart;// Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed.if (oldPart == null || oldPart == prevPartition) {List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() < 1) {Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = random % partitions.size();} else if (availablePartitions.size() == 1) {newPart = availablePartitions.get(0).partition();} else {while (newPart == null || newPart.equals(oldPart)) {Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = availablePartitions.get(random % availablePartitions.size()).partition();}}// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.if (oldPart == null) {indexCache.putIfAbsent(topic, newPart);} else {indexCache.replace(topic, prevPartition, newPart);}return indexCache.get(topic);}return indexCache.get(topic);}

关于Sticky Partitioner ,具体参考 Apache Kafka Producer Improvements: Sticky Partitionerhttps://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

待整理 负载均衡:kafka的Rebalance问题分析_大叶子不小的博客-CSDN博客_kafka的rebalance

3.Flink 提交 Kafka offset

3.1 提交offset规则

Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。

配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。

  • 禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。需设置 enable.auto.commit 或者 auto.commit.interval.ms值

    enable.auto.commit默认值为 true,auto.commit.interval.ms 默认值为5000。具体可查看kafka官网或者  org.apache.kafka.clients.consumer.ConsumerConfig 类

  • 启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略

Kafka | Apache FlinkApache Kafka Connector # Flink provides an Apache Kafka connector for reading data from and writing data to Kafka topics with exactly-once guarantees.Dependency # Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For details on Kafka compatibility, please refer to the official Kafka documentation.https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing

3.4 Last Committed Offset

Last Committed Offset表示consumer Group 已经提交的offset。记录当前消费点位,用于下次消费时定位offset

3.3 lag监控

如果开启了checkpoint,且时间周期为10min(10min提交一次),此情况下,通过Last Committed Offset来监控kafka lag显然是不对的。kafka提供了相关指标来进行监控,如 records-lag-max。该指标为当前partition的Log End Offset(LEO) - Current Position Offset,在flink中可以将该指标上报给Prometheus进行监控,另外flink也有一些指标可供监控使用

Apache Kafka

Kafka | Apache Flink

Flink Kafka相关推荐

  1. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  2. 网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...

  3. flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...

  4. flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...

  5. 【Kafka】Flink kafka TimeoutException Timeout expired while fetching topic metadata

    1.背景 一个服务突然不能工作了,数据有输入但是没有数据,好看数据输入进去就丢了,但是正常情况下,应该是最起码正确和错误都应该输出. 下面开始查看日志 docker exec -it xxxx bas ...

  6. flink kafka addSource(comsumer ) 源码学习笔记

    addsource 其中function存的是FlinkKafkaConsumer对象 public <OUT> DataStreamSource<OUT> addSource ...

  7. 【Flink实战系列】Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/

    java.lang.AbstractMethodError: Method flink/stream/deserialization/PoJoDeserializationSchema.deseria ...

  8. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

  9. Flink Kafka Doris实战demo

    Flink Kafka Doris实战demo 环境: Flink 1.12 Doris 0.12 Kafka 1.0.1+kafka3.1.1 一:编译doris 参考官网Docker编译:http ...

  10. 吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

    长文预警, 全文两万五千多字, 37页word文档的长度 (略有杂乱,有些非常复杂的地方可能需要更多的例子来说明,使得初学者也能很容易看懂,但是实在花的时间已经太多太多了,留待后边利用起碎片时间一点点 ...

最新文章

  1. andriod 接入mqtt_Android 连接阿里云 mqtt失败
  2. linux socket 时间,Socket编程获取服务器时间
  3. 遍历boost::fibers::unbuffered_channel< unsigned int >的测试程序
  4. 安装phpssdb扩展:
  5. c语言around用法,KET基础语法:介词among和around的用法及例句
  6. 用户登录自动注销问题
  7. 展讯DTS路径及编译
  8. JavaScript中数组高级编程实践
  9. VS2010/MFC编程入门之三十三(常用控件:标签控件Tab Control 下)
  10. 电子书(文学,计算机)搜索与下载网站推荐
  11. vb.net 实现编辑某列并回车后不换行,查询数据进行相关处理
  12. Java零基础学习-API每日单词(日更)
  13. 连续时间 Markov 链从某一状态 i 转移到其他状态之前在 i 逗留的时间服从指数分布
  14. 进制转换--2进制转16进制
  15. 本地的html文件怎么运行在本地服务器上
  16. 李开复:未来最重要的不是操作系统 而是浏览器
  17. Word文档翻译成中文的方法
  18. 宏碁传奇Go评测 怎么样
  19. 服务器存储的作用和用途,存储服务器有四大作用你都清楚吗?
  20. 力扣热门题目简单部分合集(共23道)

热门文章

  1. 纯干货!一个白帽子挖漏洞经验细致分享
  2. Autojs4.1.0实战教程---中青看点签到
  3. 颠覆者的颠覆,DeFi是什么?
  4. 数据挖掘r语言和python知乎_Hellobi Live |R语言爬虫实战案例分享:网易云课堂、知乎live、今日头条、B站视频...
  5. python怎么画两张图_python中如何用subplot画多个子图?
  6. matlab画雷克子波
  7. python-docx模块表格部分单元格格式调整
  8. ffmpeg-音频淡入淡出混合滤镜(二十三)
  9. 解决一个金蝶KIS云专业版客户端不能打印的问题
  10. 烟卷配送库房温湿度监控系统中应用的温湿度传感器