Kafka生产者

Kafka 发送消息的主要步骤:

ProducerRecord 对象包括目标主题和发送的内容,还可以制定键或分区。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。

Propertites prop = new Propertites();
prop.put("bootstrap.servers", "broker1:9092,borker2:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

Kafka 发送消息主要有以下3种方式:

  • 发送并忘记:把消息发送给服务器,但并不关心它是否正常到达。
ProducerRecord<String, String> record = new ProducerRecord<>("CustoerContry", "Precision Products", "France");try {producer.send(record);} catch(Exception e) {e.printStackTrce();}
}
  • 同步发送:使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。
ProducerRecord<String, String> record = new ProducerRecord<>("CustoerContry", "Precision Products", "France");try {producer.send(record).get();} catch(Exception e) {e.printStackTrce();}
}
  • 异步发送:调用 send() 方法, 并指定一个回调函数,服务器在返回响应时调用该函数。
private class DemoProducerCallback impleents Callback {@Overridepublic void onCopletion(Recordetadata recordMetadata, Exception e) {if (e != null) {e.printStackTrace();}}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustoerContry", "Precision Products", "France");try {producer.send(record, new DemoProducerCallback());} catch(Exception e) {e.printStackTrce();}
}

生产者的配置

acks

acks 指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。

  • acks = 0,生产者在成功写入消息之前不会等待任何来自服务器的响应。
  • acks = 1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。
  • acks = all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的响应。

buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它来缓冲发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常。

copression.type

默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4 等。

retries

该参数决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在诶次重试之间等到100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。

batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次被填满,批次里的所有消息会被发送处理。

linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次被填满或 linger.ms 达到上限时把批次发送出去。

max.in.flight.requests.per.connection

该参数指定了生产者在接收到服务器响应之前可以发送多少个消息。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据时等待服务器返回的响应时间。如果等待响应超时,要么重试发送数据,要么返回一个错误。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配。

max.block.ms

该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

max.request.size

该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。

receive.buffer.bytes 和 send.buffer.bytes

这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。

序列化

创建一个生产者对象必须指定序列化器。

分区

Kafka 的消息是一个个键值对。拥有相同键的消息将被写到统一分区。如果使用默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮训(Round Robin)算法将消息均衡地分布到各个分区上。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。

获取以上Java高级架构最新视频,欢迎

加入Java进阶架构交流群:142019080。直接点击链接加群。https://jq.qq.com/?_wv=1027&k=5lXBNZ7

Kafka权威指南,Kafka生产者相关推荐

  1. kafka权威指南_Kafka-分区、片段、偏移量

    [分区.片段.偏移量] 1. 每个分区是由多个Segment组成,当Kafka要写数据到一个partition时,它会写入到状态为active的segment中.如果该segment被写满,则一个新的 ...

  2. 送5本《Kafka权威指南》第二版

    文末送书 科学家们每一次发生分歧都是因为掌握的数据不够充分.所以,我们可以先就获取哪一类数据达成一致,只要获取了数据,问题也就迎刃而解了.要么我是对的,要么你是对的,要么我们都是错的,然后继续. -- ...

  3. 如何使用Kafka可靠地发送消息-《Kafka权威指南(第二版)》阅读笔记

    可靠性是系统而不是某个独立组件的一个属性,所以,在讨论Kafka的可靠性保证时,需要从系统的整体出发.说到可靠性,那些与Kafka集成的系统与Kafka本身一样重要.正因为可靠性是系统层面的概念,所以 ...

  4. kafka权威指南中文翻译之一

    kafka 初见 (Meet Kafka) 在讨论Kafka 细节之前,有必要先来了解下消息发布/ 订阅的概念,这个概念非常重要. kafka 中的数据单位是message .对比数据库来说,可以把消 ...

  5. 《Kafka权威指南》——问题1——onParitionsAssigned

    四.Kafka消费者--从Kafka读取数据 4.8 从特定偏移量处开始处理数据 4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以 ...

  6. 《Kafka权威指南》读书笔记3 Kafka生产者

    日常业务开发很重要.很常用的一章 提纲:如何使用Kafka生产者:如何创建KafkaProducer.ProducerRecords:如何将记录发给Kafka:如何处理从Kafka返回的错误:一些配置 ...

  7. 【Kafka】《Kafka权威指南》入门

    发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性.数据(消息)的发送者(发布者)不会直接把消息发送给接收 ...

  8. 《Kafka权威指南》记录

    生产者 生产流程 32页 生产者创建 Kafka生产者需要三个必须参数:broker地址清单,key和value的序列化方式 (如StringSerializer) 生产者发送 ACKS acks 参 ...

  9. 《kafka权威指南》学习记录1

    本博客只作为自己学习的一个记录. 一.kafka生产者 1.kafka生产者组件 main线程 send线程 producerrecord对象 序列化器 分区器. Producerrecord对象格式 ...

  10. 《kafka权威指南》之可靠的数据传输

    文章目录 可靠性保证 Kafka做出的四个保证 kafka可靠性保证的核心 kafka的复制机制 不恰当的垃圾回收配置(**) broker配置 复制系数1 不完全的首领选举2 最少同步副本3 可靠的 ...

最新文章

  1. 给一个表单提交绑定一个点击事件
  2. wayos利用easyradius实现WEB认证页面的记住密码及到期提醒功能
  3. 【Java】5.2 方法详解
  4. Bitmap之位图采样和内存计算详解
  5. Entity Framewrok 7beta7中不同版本sql server自动生成分页sql语句的问题
  6. Axure RP Pro 6.0 原型设计工具(产品经理必备)
  7. 用 Python+openpose 实现抖音尬舞机
  8. 项目合作| 视频监控解决隧道洗车线的安全问题
  9. es6 实例:使用Proxy实现观察者模式
  10. 载 Kubernetes和OpenStack到底是什么关系?先搞清楚,再系列学习
  11. 惠普c7000服务器装系统,HP BladeSystem c7000 安装配置手册
  12. npm -save 和-save-dev秒懂
  13. WNM6002 N通道增强功能MOS场效应晶体管
  14. 新疆计算机二级vb 试题,2013新疆维吾尔自治区WORD全国计算机等级考试二级VB试题及答案...
  15. 测量脉冲宽度仿真proteus
  16. bzoj3668: [Noi2014]起床困难综合症
  17. Matlab主成分分析法
  18. View inflate方法和LayoutInflater inflate方法的区别
  19. Android 生成二维码
  20. Android仿微信图片编辑库,你想要的功能这里都有

热门文章

  1. C语言编写游戏的程序教程,如何运用C语言编写搬山游戏
  2. Python游戏编程快速上手
  3. 数据库系统概论知识点总结(附期末考试题库)
  4. 使用ActivityGroup类显示多个Activity
  5. 计算机维修英语情景对话大全,实用英语短对话:修电脑
  6. 如何利用极致业务基础平台做一个通用企业ERP之十一销售出库单设计
  7. Blender源代码编译(VS2019、win64_vc15)
  8. 【python】用PyQt5教你制作简单的水果抽奖机
  9. java poi jar包下载_poi.jar包下载
  10. 尚硅谷vue基础笔记