开发者可以使用kafka内置的客户端API开发kafka应用程序。除了内置的客户端之外,kafka还提供了二进制连接协议,也就是说,我们直接向kafka网络端口发送适当的字节序列,就可以实现从Kafka读取消息或往kafka写入消息。还有很多用其它语言实现的kafka客户端,比如C++、python等,都实现了kafka连接协议。这些客户端不属于kafka项目,但是kafka项目wiki上提供了一个清单,列出了所有可用的客户端。(所以意思就是说kafka内置的API只能用于java语言的开发咯)。

kafka生产者流程

  首先创建一个ProducerRecord对象,它需要包含目标主题和要发送的内容,还可以指定键或分区。在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。(生产者的消息先被放到缓存里,然后使用单独的线程发送到服务器)

  接下来,数据被传给分区器,如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息回被发送到相同的主题和分区上。有一个独立的线程负责把这些记录彼此发送到相应的broker上。

  服务器在收到这些消息时会返回一个响应,如果消息成功写入kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则返回一个错误。生趁着在收到错误之后会尝试重新发送消息,几次之后如果还是返回失败,那么就返回错误信息。

创建kafka生产者

  要往kafka里写入数据,首先要创建一个生产者对象,并设置一些属性。kafka生产者有3个必选的属性。

bootstrap.servers

  该属性指定了broker的地址清单,地址的格式为host:port,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其它broker的信息,不过最好是提供两个broker的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

key.serializer

  broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型。因此可以把java对心作为键和值发送给broker。但是生产者需要知道如何把这些java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。kafka客户端默认提供了ByteArraySerializer、StringSerializer、IntegerSerializer,因此,如果你只使用常见的几种java类型对象,那么就没有必要实现自己的序列化器。要注意,key.serializer是必须设置的,就算你只打算发送值内容。

value.serializer

  value.serializer指定的类会将值序列化,如果键和值都是字符串,则可以使用和key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。

  通过配置生产者的不同属性就可以很大程度地控制它的行为。Kafka的文档涵盖额所有的配置参数。

  kafka生产消息的3种方式:

  1.发送并忘记(fire-and-forget)

    把消息发送给服务器,但不关系它是否正常到达。大多是情况下,消息会正确到达,因为kafka是高可用的,而且生产者会自动尝试重发。不过使用这种方式有时会丢失一些消息。

  2.同步发送

    使用send()方法发送消息,它会返回一个Future对象,调用get方法进行等待,就可以知道消息是否发送成功。

  3.异步发送

    调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数。

  生产者发送消息时先把消息放在本地缓存,然后再发送到服务器,但是在发送消息之前还是可能会发生一些异常的,这些异常可能是SerializationException(说明序列化消息失败)、BufferExhaustedException或TimeoutException(说明缓冲区已满)、又或者是InterruptException(说明发送线程被中断)。

  KafkaProducer一般会发送两类错误,一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,无主“no leader”错误则可以通过重新为分区选择首领来解决。KafkaProducer可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,这类问题kafka直接抛出异常。

生产者的配置

  《kafka权威指南》里多次提到这个配置在kafka文档里有,这么重要的东西一定要下载下来看一下。如下几个重要配置:

1.acks

  该参数制定了必须要有多少个分区副本接收到消息,生产者才会认为消息写入是陈工的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项:

  • acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有接收到消息,生产者将无从得知,导致消息丢失,但是因为生产者不需要等待服务器返回的消息,所以它能以网络支持的最大速度发送消息,从而大都很高的吞吐量。
  • 如果acks=0,只要集群的首领 结点收到消息,生产者就会返回一个来自服务器的成功响应。如果消息无法到达生产者的首领(比如首领结点崩溃,新的首领结点还没有被选举出来),生产者将会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的结点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的同步发送还是异步发送。如果让客户端等待服务器响应(通过调用Future对象的get方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
  • 如果acks=all,只有当所有参与复制结点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器崩溃,整个集群依然可以运行。不过,它的延迟比acks=1时更高,因为我们要等待不止一个服务器结点接收消息。

2.buffer.memory

  该参数用来设置生产者内存缓冲区的大小,生产者用它来缓冲要发送到服务器的消息,如果应用程序发送消息的速度(向内存缓存数据的速度)超过发送到服务器的速度(通过网络发送数据的速度),会导致生产者内存空间不足。这个时候,send方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full参数(0.9.0.0被替换成了max.block.ms,表示在抛出异常以前可以阻塞一段时间(注:想必更高的版本也换了啊))。

3.compression.type

  默认情况下,消息发送不会被压缩。该参数可以设置为snappy、gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。snappy压缩算法由Google发明,它占用较小的CPU,但可以提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip压缩算法一般会占用较多的CPU,但会提供更高的压缩比,所以,如果网络带宽有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 kafka发送消息的瓶颈所在。

4.retries

  生产者从服务器接收到的错误可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃结点需要多长时间,让总的重试时间比kafka集群从崩溃中恢复的时间长,否则,生产者会过早的放弃重试。但对于不是临时性的错误,就不能通过重启试来解决了。一般情况下生产者会自动重试,所以没必要在代码逻辑里处理那些可重试的错误。只需要处理那些不可重试的错误或重试次数超出上限的情况。

5.batch.size

  当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存的大小,按照字节数计算(而不是消息个数)。当批次被填满时,批次里所有消息会被发送出去。不过生产者并不一定都会等到批次被天马才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把批次大小设置的很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置的太小,因为生产者需要更频繁的发送消息,会增加一些额外开销。

6.linger.ms

  该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把批次发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

7.client.id

  该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。

8.max.in.flight.requests.per.connection

  该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用高越多的内存,不过也会提升吞吐量,把它设置为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

9.timeout.ms、request.timeout.ms、metadata.fetch.timeout.ms

  request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.time.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms指定了broker等待同步副本返回消息确认的时间,与acks的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么broker就会返回一个错误。

10.max.block.ms

  该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者就会抛出超时异常。

11.max.request.size

  该参数用于控制生产者发送的请求的大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB,那么可以发送的单个消最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息的大小是1KB,灵位,broker对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝。

12.receive.buffer.bytes和send.buffer.bytes

  这两个参数分贝指定了TCP socket接收和发送数据包的缓冲区大小。如果它们被设为-1,就使用当前操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

  

  顺序保证:

  kafka可以保证同一分区的里的消息是有序的,也就是说,如果生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。

  如果把retries设为非零数,同时把max.in.flight.requests.per.connection设置为比1大的数,那么,如果第一个批次写入消息失败,而第二个批次写入成功,broker会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

  一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也很关键,所以不建议把retries设置为0,可以把max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其它的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求时才能这么做。 

序列化器

  创建一个生产者必须要指定序列化器。

自定义序列化器

  如果发送到Kafka对象的不是简单的字符串或整型,那么可以使用序列化框架来创建消息记录,例如Avro、Thrift、Protobuf、Json等,或者使用自定义的序列化器。但最好不要使用自定义的序列化器,因为没有通用性,而且扩展性上也不太好。

Avro序列化器

  Apache Avro(简称Avr)是一种与编程语言无关的序列化格式。这是一种共享数据文件的方式,可以将它和kafka搭配起来使用。Avro数据通过与语言无关的schema来定义,schema通过JSON来描述,数据被序列化成二进制或JSON文件,不过一般使用二进制文件。Avro读写文件时需要用到schmea,schema一般会被内嵌在数据文件里。

  Avro的一个很好地特性是:当负责写消息的应用程序使用了新的schema,负责读消息的应用程序可以继续处理消息而无需做任何改动。例如,在写消息时最初的格式是:

{"name" : "","sex",:""},系统运行一段时间后,将消息格式更改为了{"name":"","age":""}。如果读消息的客户端没有进行更新,那么在读到新格式时,再调用类似getSex的方法时将会返回null,而如果读消息的客户端进行了升级,那么在读就消息时,使用了getAge方法时,将返回null,因为,老的消息格式没有age这个字段。

  需要注意:

  1.用于写入数据和读取数据的schema必须是相互兼容的。Avro文档提到了一些兼容原则。

  2.反序列化器需要用到用于写入数据的schema,即使它可能与用于读取数据的schema不一样。

Kafka里使用Avro

  Avro的数据文件里包含了整个schema。但如果每条记录都要包含schema那记录的大小将成倍增加,但是在读取数据时又必须要用到schema,这时候就需要用到“schema注册表”来解决这个问题。schema注册表不属于kafka项目,有一些已经开源的实现可以使用。把所有写入数据需要用到的schema保存在注册表里,然后在记录里引用schema的标识符。负责读取数据的应用程序使用标识符从注册表里拉取schema来反序列化记录。序列化器和反序列化器分别负责处理schema的注册和拉取。

  图 Avro记录的序列化和反序列化流程图

分区

  ProducerRecord对象(把其理解为要生产的对象)包含了目标主题、键和值。Kafka的消息是一个个的键值对,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写入到同一个分区。也就是说,如果一个进程只从一个主题的分区读取数据,那么具有相同键的记录都会被该进程读取。

  ProducerRecord<Integer,String> record = new ProducerRecord<>("CustomerCountry","Laboratory Equipment","USA")  //指定了键的写法

  ProdecerRecord<Integer,String> record = new ProducerRecord<>("CustomerCountry","USA")   //不指定键的写法,此时键默认为null

  如果键默认为null,则使用默认的分区器,记录将被随机的发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡的分布到各个分区上。

  如果键不为空,并且使用了默认的分区器,那么Kafka将会对键进行散列(这是kafka自己的散列算法),然后根据散列值把消息映射到特定的分区上。关键在于,同一个键总是被映射到同一个分区上,所以在进行映射时,使用主题的所有分区而不是可用分区。那么当写入分区不可用时,就会发生错误。只有在不改变主题分区数量的情况下,键和分区之间的映射才能保持不变。如果要使用键来映射分区,那么最好再创建主题的时候就把分区规划好,并且永远不要增加新分区。

定义自定义分区策略

  就是说你不一定总是需要使用默认分区策略,也可以根据需要定义自己的分区器。(Partitioner)

  摘自《kafka权威指南》

转载于:https://www.cnblogs.com/ToBeExpert/p/9827769.html

Kafka生产者----向kafka写入数据相关推荐

  1. kafka生产者和消费者端的数据不一致

    撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...

  2. Kafka(生产者)

    Kafka 1.概述 1.1 消息队列 1.1.1 传统消息队列的应用场景 1.1.2 消息队列的两种模式 1.2 kafka基础结构 2.kafka的快速入门 2.1 集群部署 2.1.1 安装ja ...

  3. kafka系列之kafka生产者与分区(3)

    概要 当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量? 举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消 ...

  4. kafka(三):kafka broker

    文章目录 1. broker总体工作流程 2. Broker重要参数 3. broker节点的服役和退役 3.1 服役新节点 3.2 退役旧节点 4. kafka副本与leader选举 4.1 副本基 ...

  5. logstash读取Elasticsearch数据保存为json,logstash接收log数据写入kafka生产者

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  6. 7.1.5 智慧物流【车辆监控Structured Streaming、整合kafka、Redis、Mysql、HBASE 写入数据】

    车辆监控 文章目录 车辆监控 第一节 Structured Streaming 1.1 Structured Streaming发展历史 1.1.1 Spark Streaming 1.1.2 Dat ...

  7. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

  8. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  9. flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

最新文章

  1. 非洲儿童(南阳oj1036)(馋)
  2. 18_clickhouse副本同步与高可用功能验证,分布式表与集群配置,数据副本与复制表,ZooKeeper整合,创建复制表,副本同步机制,数据原子写入与去重,负载平衡策略,案例(学习笔记)
  3. QueryDSL中包含通配符的字符串的精确匹配
  4. oracle -12169,很奇怪的错误ORA-12169
  5. 前端开发 什么是网页 什么是html
  6. 前方高能!IT 程序员、软件工程师值得考的证书原来有这么多! | 原力计划
  7. Shell脚本学习-阶段六-密钥的批量分发与执行
  8. Android框架之AsyncHttpClient
  9. 油炸锅EN/IEC60335 CE认证标准介绍
  10. 缠中说禅电子书_缠中说禅的资源下载?
  11. java框架难吗_java框架难学吗?怎样才能学好java框架?
  12. 【华为二面】2020/3/25_华为第二次技术面试_45分钟
  13. cpp中string类
  14. Intel Edison 基础开发之配置第一个小程序
  15. 如何阻止手机虚拟键盘弹起
  16. UE4 Slate二 用UMG思想去理解Slate+Slate编码
  17. 锁定计算机后游戏掉线,Win7旗舰版系统下玩游戏挂机总是掉线的解决方法
  18. 计算机显示在屏幕上怎么取消,电脑显示屏显示的九宫格怎样取消掉
  19. SpringMVC的视图和视图解析器
  20. 关于调用百度AI接口进行图片识别的实现(C#)

热门文章

  1. 【Skywalking】二、分布式链路跟踪-Skywalking页面介绍
  2. 我的世界服务器时装不显示,我的世界时装工坊在游戏内怎么用 | 手游网游页游攻略大全...
  3. pc 端 TIM双击无法启动解决办法
  4. Gox语言中进行屏幕截图并显示在Sciter图形界面中-GX38.2
  5. 微信空白昵称设置,瞬间变全透明
  6. 在html5中关键帧属性的描述,CSS动画属性关键帧keyframes全解析
  7. 点不到的按钮表白代码_DNF心动表白季第二季上线 表白也需要浪漫+运气
  8. 小哥哥,这边有个恋爱建议你谈一下
  9. 带你全面了解积分商城,逐步完善积分商城运营
  10. java 36 进制_数学-在Java中如何将十进制数转换为以36为底的数字?