Kafka在弹性、容错性以及高吞吐量方面有着很大的优势。想要达到生产环境最优,发挥这些特性,需要我们进行一系列的配置。Kafka提供了非常多的配置属性,对于初学者而言,很容易陷入困惑。其实,多数的配置已经满足了大部分的使用场景,本文分享总结了几个比较重要的配置参数,主要是针对producer端的配置,希望对你有所帮助。本文所讨论的配置文件包括:

acksmin.insync.replicasreplica.lag.time.max.msretriesenable.idempotencemax.in.flight.requests.per.connection buffer.memorymax.block.mslinger.msbatch.sizecompression.type

acks

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用,该参数的配置具体如下:

  • acks=0,表示生产者在成功写入消息之前不会等待任何来自服务器的响应.  换句话说,一旦出现了问题导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了. 改配置由于不需要等到服务器的响应,所以可以以网络支持的最大速度发送消息,从而达到很高的吞吐量。

  • acks=1,表示只要集群的leader分区副本接收到了消息,就会向生产者发送一个成功响应的ack,此时生产者接收到ack之后就可以认为该消息是写入成功的. 一旦消息无法写入leader分区副本(比如网络原因、leader节点崩溃),生产者会收到一个错误响应,当生产者接收到该错误响应之后,为了避免数据丢失,会重新发送数据.这种方式的吞吐量取决于使用的是异步发送还是同步发送.

    尖叫提示:如果生产者收到了错误响应,即便是重新发消息,还是会有可能出现丢数据的现象. 比如,如果一个没有收到消息的节点成为了新的Leader,消息就会丢失.

  • acks =all,表示只有所有参与复制的节点(ISR列表的副本)全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个Broker接收到了消息. 该模式的延迟会很高.

min.insync.replicas

上面提到,当acks=all时,需要所有的副本都同步了才会发送成功响应到生产者. 其实这里面存在一个问题:如果Leader副本是唯一的同步副本时会发生什么呢?此时相当于acks=1.所以是不安全的.

Kafka的Broker端提供了一个参数min.insync.replicas,该参数控制的是消息至少被写入到多少个副本才算是"真正写入",该值默认值为1,生产环境设定为一个大于1的值可以提升消息的持久性. 因为如果同步副本的数量低于该配置值,则生产者会收到错误响应,从而确保消息不丢失.

replica.lag.time.max.ms

In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的. 那么,ISR中存在是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中. 而follower副本是否在ISR中,取决于该follower副本是否与Leader副本保持了“同步”.

尖叫提示:对于"follower副本是否与Leader副本保持了同步"的理解如下:

(1)上面所说的同步不是指完全的同步,即并不是说一旦follower副本同步滞后与Leader副本,就会被踢出ISR列表.

(2)Kafka的broker端有一个参数**replica.lag.time.max.ms**, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒.  这就意味着,只要follower副本落后于leader副本的时间间隔不超过10秒,就可以认为该follower副本与leader副本是同步的,所以哪怕当前follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出出局.

(3)如果follower副本被踢出ISR列表,等到该副本追上了Leader副本的进度,该副本会被再次加入到ISR列表中,所以ISR是一个动态列表,并不是静态不变的。

retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms ,可以通过retry.backoff.ms 参数来配置时间间隔。

比如,设置了acks=all和min.insync.replicas=2。由于某种原因,所有follower都挂了,由于min.insync.replicas=2,所以生产者无法收到来自Broker端的ack。

此时我们会从Producer端收到一个错误消息:"Broker: Not enough in-sync replicas"。这就意味着Kafka不能在Broker上追加生产的消息(数据)了,因为此时的ISR的数量不够。此时在Broker端会有如下的错误消息:

org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition

默认情况下,Producer不会对此错误进行处理,这就会造成消息丢失,即**at-most-once **语义。我们可以通过配置重试次数来让生产者重新发送消息。比如配置retries=3,默认为0

enable.idempotence

在某些情况下,实际上已将消息提交给了所有同步副本,但是由于网络问题,Broker无法向Producer发送确认ack。由于我们设置retries=3,所以producer将重新发送消息3次,这可能会导致topic中消息重复。

比如有一个producer向该topic发送1M消息,并且在提交消息之后但在生产者收到所有确认ack之前,broker失败了。在这种情况下,由于重试机制,最终可能在该topic上收到超过1M的消息,这也称为at-lease-once语义。

当然,我们想要实现的是exactly-once语义,即:即便生产者重新发送消息,消费者也应该只收到一次相同的消息

此时需要进行幂等操作,所谓幂等,即指一次执行一个操作或多次执行一个操作具有相同的效果。配置幂等很简单,通过配置enable.idempotence=true即可,默认为false。

那么,幂等是如何实现的呢?由于消息是分batch(批次)发送的,每个batch都有一个序列号。在Broker端,会追踪每个分区的最大序列号。如果出现序列号较小或相等的batch(批次),broker将不会将该batch写入topic。这样,除了保证了幂等性,还可以确保batch的顺序。

max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

因为如果将两个批次发送到单个分区,并且第一个批次失败并被重试,但是,接着第二个批次写入成功,则第二个批次中的记录可能会首先出现,这样就会发生乱序。

如果没有启用幂等功能,但仍然希望按顺序发送消息,则应将此设置配置为1。但是,如果已经启用了幂等,则无需显式定义此配置。

buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置max.block.ms。

当生产者调用时send(),消息并不会立即发送,而是会添加到内部缓冲区中。默认buffer.memory值为32MB。如果生产者发送消息的速度超过了将消息发送到broker的速度,或者存在网络问题,send()方法调用会被阻塞max.block.ms参数配置的时常,默认1分钟。

max.block.ms

该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会被阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。

linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。kafka生产者会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,这取决于linger.ms的配置,比如如果linger.ms时间到了,即便批次只包含一个消息,也会被立即发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。

可以使用配置使用linger.msbatch.sizelinger.ms是准备好发送批次之前的延迟时间,默认值为0。这意味着即使批次中只有1条消息,批次也会立即发送。有时,会增加linger.ms以减少请求数量并提高吞吐量。但这将导致更多消息保留在内存中。batch.size是单个批次的最大大小,当满足这两个要求中的任何一个时,将发送批次。

compression.type

默认情况下,消息发送时不会被压缩。该参数可以设置为snappy 、gzip 或lz4 ,它指定了消息被发送给broker 之前使用哪一种压缩算也进行压缩。使用压缩可以降低网络传输开销和存储开销,而这往往是向Kafka 发送消息的瓶颈所在。

总结

本文主要分享了Kafka几个比较重要的配置参数,并对每个参数进行了详细解释,通过配置这些参数,可以充分发挥Kafka的优良特性。希望本文对你有所帮助。

kafka配置_Kafka生产环境的几个重要配置参数相关推荐

  1. db2去除字段内容空格_Vue CLI3.x 配置指南生产环境去除console

    天赋决定上限,努力决定下限 每天一个小技巧,每天一个优化点.慢慢积累慢慢总结. 生产环境去除console babel-plugin-transform-remove-console  官方地址 ya ...

  2. Tomcat与Jre绿色环境配置(生产环境)

    Tomcat与Jre绿色环境配置(生产环境) 博客分类: Apache Java jreapachetomcat  Tomcat运行时需要jre的支持,一般有两种方式,一种是用jdk带的jre,另一种 ...

  3. linux ubuntu安装 mono,在 Ubuntu Server 上安装配置 Mono 生产环境

    在 Ubuntu Server 上安装配置 Mono 生产环境 在 Ubuntu Server 上安装和配置 Apache2 + Mono 生产环境的记录. 服务器环境是 Ubuntu Server ...

  4. linux ntp时间立即同步命令_记一次生产环境部署NTP服务及配置时间同步

    概述 linux服务器在提供服务时,要和其他机器进行请求的交互,实际生产环境中,可能因为时间不同步,导致了服务异常. 下面介绍下怎么部署NTP服务来解决这个问题. ps:强烈吐槽下头条这个新排版功能, ...

  5. Vue Nginx反向代理配置 解决生产环境跨域

    Vue本地代理举例: module.exports = {publicPath: './',devServer: {proxy: {'/api': {target: 'https://movie.do ...

  6. node代理请求 vue_vue-cli项目开发/生产环境代理实现跨域请求+webpack配置开发/生产环境的接口地址...

    使用 Vue-cli 创建的项目,开发地址是 localhost:8080,需要访问非本机上的接口http://10.1.0.34:8000/queryRole.不同域名之间的访问,需要跨域才能正确请 ...

  7. java jdk 1.8 配置_Java开发环境jdk 1.8安装配置方法(Win7 64位系统/windows server 2008)...

    什么是jdk jdk是什么呢?jdk的是java development kit的缩写,意思是java程序开发的工具包.也可以说jdk是java的sdk. 目前的JDK大致分三个大版本: Java S ...

  8. linux 网卡配置不一致,linux环境下,双网卡配置不同网段后,路由问题

    最近,新上线了10几台服务器,系统是centos 6.0 按照以前的惯例,配置服务器生产网络和维护网络.每台服务器的eth0 配置ip为生产网络,eth3配置为维护网络,生产网络和维护网络,物理上隔离 ...

  9. Windows10 配置深度学习环境(使用本地GPU,配置Visual Studio 2017 + CUDA 10.1 + TensorFlow-GPU 2.3.0)

    目录 配置环境 1. 安装Anaconda 2. 安装 Visual Studio 3. 安装 CUDA 4. 安装 CUDNN 5. 安装 TensorFlow-GPU 最近需要使用TensorFl ...

最新文章

  1. Project Chameleon Work In Progress 10
  2. 报错解决:ERROR: Failed to format E:; DiskPart errorlevel -2147212243.
  3. 鸿蒙系统系列教程5-鸿蒙开发环境的搭建
  4. 小公司出来的产品经理被大厂鄙视?
  5. 1949:【10NOIP普及组】数字统计
  6. java 练习6 交通工具
  7. oracle查询一张表的主键,Oracle查询表主键、外键
  8. MySQL学习笔记15:触发器
  9. IDEA的常用配置一键导入及优化内存
  10. 勒索老黄未果!黑客公布英伟达核心源代码,超40万个文件、75GB机密数据
  11. python--django基础篇(创建项目,模型类,迁移,测试数据库操作)
  12. 用gulp-imageisux智图api压缩图片
  13. 游戏设计的100个原理(11-15)
  14. navicat mysql 数据库备份_怎么用navicat自动备份mysql数据库
  15. 关于Ajax原理与使用方式,收藏这一篇文章就够了!!
  16. 1001 害死人不偿命的(3n+1)猜想 (15 分) (MyFirstCSDNBlog~)
  17. xss.haozi.me靶机
  18. PHP生成压缩包 (并下载)【解决压缩包下载,提示压缩包损坏】
  19. java小球挡板游戏_多线程的一个小球游戏,就是以前的那个Pong游戏
  20. 互联网毒瘤——内容农场

热门文章

  1. public/private/protected/默认 的各种理论上的区别
  2. 一代杰出科学家--钱学森去世了
  3. 博客订阅代码de制作
  4. python中count()方法
  5. MyEclipse使用总结——MyEclipse去除网上复制下来的来代码带有的行号
  6. 博客园首页博问闪存新随笔联系订阅管理 随笔- 252 文章- 0 评论- 45 HashPasswordForStoringInConfigFile中的Md5算法并非常用的Md5算法...
  7. WCF传输大数据的设置
  8. redis没有加密码导致服务器被当做矿机了
  9. ngx_lua与go高并发性能对比
  10. 解决nginx 504 Gateway Time-out的一些方法