1 初始化配置

  Kafka 通过 KafkaProducer 构造器初始化生产者客户端的配置。
  常用的重要配置,详见官网。

  • bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客户端初始化时会自动发现地址,所以可以不填写所有地址。
  • key.serializer:实现了 Kafka 序列化接口的类,用来序列化 key。
  • value.serializer:实现了 Kafka 序列化接口的类,用来序列化 value。
  • acks:leader 接收到的 follower 确认的数量需要满足 acks 的配置。
     0:生产者把消息发送出去就认为发送完成了。
     1:leader 接收到消息后,不用等 follower 的确认,就表示发送完成了。
     all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 确认后,才表示完成了。
  • retries:消息发送失败后的重试次数。如果允许重试,而 max.in.flight.requests.per.connection>1,则可能导致消息乱序,因为如果把两批消息发送到同一个分区,第一批失败并重试,而第二批成功了,则第二批消息可能先生成了。
  • retry.backoff.ms:消息重试发送的间隔。
  • client.id:标识客户端的 id。
  • compression.type:压缩类型。可选:none、gzip、snappy、lz4。
  • buffer.memory:记录累加器可以使用的最大内存缓冲池大小。
  • batch.size:内存缓冲池的缓冲列表大小。当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送 batch。
  • transactional.id:事务 ID。
// 基础配置
Map<String, Object> configs = new HashMap<>();
// Kafka broker 集群
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key 序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value 序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

2 构造消息

  Kafka 提供了6种构造器来构造消息。

  • topic:消息主题,必填;
  • partition:分区号,非必填。如果为空,会计算 key 的 hash 值,再和该主题的分区总数取余得到分区号;如果 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域得到分区号。
  • timestamp:时间戳,非必填。如果为空,默认为 KafkaProducer 构造器初始化的时间。
  • key:消息 key,非必填。关系到分区分配,broker 会对带 key 的消息进行日志压缩。
  • value:消息内容,必填。
  • headers:消息头,非必填。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);

3 发送消息

  支持同步发送和异步发送消息。

  同步发送

producer.send(record).get();

  异步发送

producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 回调处理流程}
});

转载于:https://www.cnblogs.com/bigshark/p/11182403.html

Kafka2.0生产者客户端使用相关推荐

  1. 带你100% 地了解 Redis 6.0 的客户端缓存

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 近日 Redis 6.0.0 GA 版本发布,这是 Redis 历 ...

  2. 十九、Redis 6.0 的客户端缓存

    一.为什么需要客户端缓存? 我们都知道,使用 Redis 进行数据的缓存的主要目的是减少对 MySQL 等数据库的访问,提供更快的访问速度,毕竟 <Redis in Action> 中提到 ...

  3. Redis 6.0 的客户端缓存是怎么肥事?一文带你了解!

    来源 | 程序员历小冰 责编 | Carol 封图 | CSDN 付费下载于视觉中国 近日 Redis 6.0.0 GA 版本发布,这是 Redis 历史上最大的一次版本更新,包括了客户端缓存 (Cl ...

  4. 客户端更新功能实现_exlive1.0|监控客户端功能更新

    为了给用户提供更友好的使用体验.更时效性的管理,exlive1.0监控客户端本次新增及优化以下功能: ① 交通部JT808增加"预警命令" ② 交通部JT808车机命令分类,快速查 ...

  5. jms.jar 2.0_JMS API 2.0生产者和使用者

    jms.jar 2.0 Please go through my previous post at "JMS API 1.1 Producer and Consumer" to g ...

  6. linux安装telnet客户端_Redis 6.0 的客户端缓存是怎么肥事?一文带你了解!

    来源 | 程序员历小冰责编 | Carol封图 | CSDN 付费下载于视觉中国近日 Redis 6.0.0 GA 版本发布,这是 Redis 历史上最大的一次版本更新,包括了客户端缓存 (Clien ...

  7. 【U8V13.0】客户端登录提示4522问题

    U8V13.0,客户端安装时选的是传统客户端(智能客户端也有此问题),安装完登录提示如下图: 问题原因:产品BUG. 解决办法:打补丁 U8服务器端: 1.将ISD升级至最新版 2.安装补丁更新工具 ...

  8. JAX-RS 2.0 REST 客户端

    JAX-RS 2.0对客户端API进行了标准化.客户端API通过HTTP请求Web资源,同样符合统一接口和REST架构风格.与Apache HTTP Client和HttpURLConnection相 ...

  9. 客户端华为p9调用摄像头出现空指针_带你 100% 了解 Redis 6.0 的客户端缓存

    文章来源:https://mp.weixin.qq.com/s/DFgygoDcXeJXjbjPt7BZiQ 原文作者:历小冰 近日 Redis 6.0.0 GA 版本发布,这是 Redis 历史上最 ...

最新文章

  1. Eureka 注册中心/服务发现框架
  2. 如何确定敏捷是否适合你的团队?
  3. 【转载】Unix编程艺术——Unix哲学
  4. svn 不支持http 客户端_Xversion for mac(SVN客户端)
  5. Java中Volatile关键字详解
  6. python赋值方式
  7. 案例学习BlazeDS+Spring之八InSync06“松耦合”UI同步事件通知
  8. FFMPEG结构体分析:AVPacket
  9. flush table mysql_MySQL flush table 导致的锁问题
  10. 兰花草c语言编码蜂鸣器,蜂鸣器奏乐-多种音乐分享
  11. STM32G0系列的启动配置与程序下载
  12. 新手初识安信可ESP8266 12f机智云开发板微信直连云
  13. python理财基金数据分析可视化系统
  14. Spring boot集成Spring-data-Jpa中遇到的问题
  15. kafka如何选择分区数及kafka性能测试
  16. c语言关键字大全(32个)
  17. C#静默打印 PDF
  18. 图的逆拓扑排序(回路识别)
  19. linux 视频无法播放视频教程,Ubuntu7.10下无法正常播放网页上Flash视频的解决
  20. Android studio 动画---补间动画

热门文章

  1. Yet Another Array Partitioning Task CodeForces - 1114B(思维)
  2. A Simple Math Problem(矩阵快速幂)
  3. mysql数据库导入导出
  4. access denied for_abm怎么样?ACCESS集团携8大国际品牌在进博会首秀,展示abmr 硬核实力!...
  5. pic10f220 c语言,PIC10F200 LED流水灯程序
  6. mysql单单写join_MySQL系列之Join大法
  7. Mask R-CNN详解
  8. μC/OS-Ⅱ的移植
  9. pycharm的console输入如何换行
  10. 房贷是不是越多越久越好?