目录

  • 0. 目录
  • 1. 生产消息步骤
  • 2. ProducerRecord的属性
  • 3. 必要的参数
  • 4. 发送消息
  • 5. 分区器
  • 6. 拦截器
  • 8. 几个重要生产者参数
  • 9. 生产者是线程安全的

0. 目录

1. 生产消息步骤

  • 配置生产者客户端参数,创建生产者实例
  • 构建待发送的消息
  • 发送消息
  • 关闭生产者

2. ProducerRecord的属性

public class ProducerRecord<K, V> {private final String topic;//必填项private final Integer partition;private final Headers headers;private final K key;private final V value;//必填项private final Long timestamp;
}

key用来计算分区号,确定发送到指定分区

3. 必要的参数

  • bootstrap.servers:指定kafka集群的broker的地址,默认为空,虽然可以从给定的broker找到其他broker,但是为防止某一节点宕机导致消息发送失败,建议填写至少2个broker地址。
  • key.serializer、value.serializer
  • client.id:客户端ID,若不设置,会自动生成非空字符串。
  • 设置技巧:
    属性名不好记,可以使用ProducerConfig类中的常量。

4. 发送消息

  • 发后即忘(fire-and-foget)
    性能最高,可靠性最差
try {producer.send(record);
} catch (Exception e) {e.printStackTrace();
}
  • 同步(sync)
    同步发送的性能差,需要阻塞等待一条消息发送完成之后,再去发送下一条。
    send方法本身是异步的,通过使用get方法返回的Future对象实现同步。
try {producer.send(record).get();
} catch (Exception e | InterruptedException e) {e.printStackTrace();
}

如果需要Future.get()的返回值,可以采用以下方式。返回值中包含消息的主题、分区号、偏移量、时间戳等元数据信息。

try {Future<RecordMetadata> future = producer.send(record);RecordMetadata metadata = future.get();System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
} catch (Exception e | InterruptedException e) {e.printStackTrace();
}

由于send()方法返回的是Future对象,可以使用java并发的相关方法丰富实现。
同步发送可以配置客户端参数的重试次数,如果超过重试次数,则必须处理异常。

  • 异步(async)
    Future send(ProducerRecord<K, V> record, Callback callback);
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();} else {System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());}}
})

kafka消息发送保证分区有序的情况下,回调函数也能保证有序。

producer.close();
public void close(long timeout, TimeUnit timeUnit); //等待超时时间之后,强行退出。

close()方法会阻塞等待所有的消息都发送完成再关闭。

5. 分区器

  • 消息体重指定partition字段,根据该字段发送到对应分区
  • 没有指定分区字段,根据消息体中key字段,由分区器分配分区
    kafa使用的分区器即客户端的DefaultPartitioner类
DefaultPartitioner implements Partitioner {public int partition(...);  //计算分区号public void close();    //关闭分区器时释放相应资源,空方法
}
  • 默认分区器:
    (1)如果key不为null,对key进行哈希得到分区号(所有分区中的任意一个,不论是否可用)。
    (2)key为null,轮询发送到各个可用分区,如果没有可用分区,则轮询到各个分区。
  • 如何自定义分区器?
    (1)实现Partitioner接口,重写partition()方法
    (2)配置客户端的配置参数
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitinoer.class.getName());

6. 拦截器

  • 作用:
    在发送之前做准备工作(eg:过滤消息,修改部分内容)
    在发送的回调逻辑之前,做一些定制化的需求(eg:统计工作)
  • 自定义拦截器
    (1)实现ProducerInterceptor<String, String>接口,重写onsend()方法,重写onAcknowledgement()方法
    (2)配置客户端的配置参数:ProducerConfig.INTECERPTOR_CLASSES_CONFIG
    可以指定多个,类名中间以“,”连接,多个拦截器时,某一个失败,会从上一个成功的开始接着执行下一个拦截器

    7. 整体架构

    主线程:KafkaProducer-->拦截器(非必须,一般不)-->序列号器(必须)-->分区器(根据key可选)-->消息累加器-->Sender线程
    Sender线程:Sender-->创建Request-->提交给Selector发送-->未收到响应的请求根据节点分别放在节点对应的InFlightRequests中。
    Sender线程,先将分区信息转换为节点信息,做应用逻辑层到网络IO层的转换,然后将消息转换成满足kafka协议的Request。

可以通过比较InFlightRequests的大小的配置参数max.in.flight.requests.per.connection与在InFlightRequests中的消息多少,来判断各个节点的负载情况。

元数据(主题分区的数量、leader副本所在节点的地址、端口等信息)的更新由Sender线程,选择负载最小的节点进行发送MetaDataRequest获取。默认每5分钟更新一次。
主线程也需要使用Sender更新过的元数据。

8. 几个重要生产者参数

  • acks --> 什么情况下算发送成功
    默认值:1,只要leader写入成功即成功。是可靠性和吞吐量之间的折中方案。
    0,发送即成功,不管服务端是否返回相应。吞吐量最大。
    -1或all,所有ISR中的副本都成功。可靠性最高,但不保证一定可靠,因为ISR中可能只有leader,此时的效果与1相同。
  • max.request.size
    生产者能发送的消息的最大值,默认1MB

9. 生产者是线程安全的

end :)

转载于:https://www.cnblogs.com/suyeSean/p/11241900.html

【Kafka】01 生产者相关推荐

  1. Kafka的生产者优秀架构设计

    来自:架构之美 前言 Kafka 是一个高吞吐量的分布式的发布订阅消息系统,在全世界都很流行,在大数据项目里面使用尤其频繁.笔者看过多个大数据开源产品的源码,感觉 Kafka 的源码是其中质量比较上乘 ...

  2. Kafka 的生产者优秀架构设计

    作者 | 孙玄 责编 | 郭芮 来源 | 架构之美(ID:beautyArch) Kafka 是一个高吞吐量的分布式的发布订阅消息系统,在全世界都很流行,在大数据项目里面使用尤其频繁.笔者看过多个大数 ...

  3. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  4. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  5. 手撸kafka producer生产者的分区器(partition)API

    简介:本篇博客是对kafka produce 生产者分区器的API(Java) 包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果. 目录标题 ...

  6. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  7. kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...

    作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...

  8. Apache Kafka-初体验Kafka(01)-入门整体认识kafka

    文章目录 kafka官方文档 使用场景 Kafka基本概念 消息( Message )相关术语 主题Topic & 消息日志Log 分布式Distribution Producers Cons ...

  9. Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once

    目录 至少一次(at least once) 最多一次(at most once) 精确一次(exactly once) 幂等性 幂等性作用范围 实现方法 代码 事务 事务作用范围 实现方法 代码 我 ...

  10. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

最新文章

  1. tf.get_variable
  2. e300氛围灯哪里调节_奥迪Q5L安装原厂32色20灯源氛围灯
  3. 外刊晨读 2018 年 年 5 月 月 15 日
  4. 砰的一声,实验室又炸鸡了
  5. 微服务架构的常见问题
  6. dw如何写php代码提示,DW CS5 jquery代码提示插件
  7. 自学UI设计,应当具备的基础技能(软件)
  8. 静默授权获取unionid_Asp.Net Core 中IdentityServer4 授权中心之自定义授权
  9. asp.net core刷新css缓存
  10. Java转C#的最佳工具
  11. graphicsmagick常用命令
  12. pdflib textflow
  13. 魔兽支持宽屏--怎样让宽屏支持更多游戏?
  14. java程序员电脑内存配置_学习JAVA对电脑配置有要求吗
  15. java山海经之轩辕_山海经之情剑轩辕 炼化任务详细攻略
  16. linux运行directory,我在linux里用命令出来is a directory是怎么回事
  17. Julia1.4文档 —— 5. Julia 字符串
  18. Android USB tethering相关代码
  19. 腾讯新版 Windows QQ 首个公测版发布,采用全新 QQ NT 架构
  20. win10下VMware15安装centos7详细步骤及遇到的问题

热门文章

  1. 二分查找的代码实现--go语言
  2. jlabel 不能连续两次set_请问一个JAVA中JLabel的setFont()问题?
  3. 华硕笔记本返厂维修流程_Intel EVO严苛认证!14款极品笔记本上市:秒光|英特尔|笔记本|华硕|宏碁|惠普|微星...
  4. oracle系统AP对应的凭证编号,AP主要账户及会计分录
  5. mysql实现主从复制的方式_mysql实现主从复制、读写分离的配置方法(二)
  6. 建模步骤_数学建模的基本步骤
  7. 码云推出企业 Git 和项目管理现场培训服务
  8. docker设置不同网络和迁移到指定网络
  9. JSONObject获取的值有时候不是String类型,而有时候又是String类型,怎么办呐
  10. [ASP.NET AJAX]Function对象及Type类的方法介绍