执行以下代码:

KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);

走构造:

public KafkaProducer(Properties properties) {this(new ProducerConfig(properties), null, null);
}

先把我们的配置拿出来:

//拿到我们设置的参数
Map<String, Object> userProvidedConfigs = config.originals();

然后获取clientId,如果我们未配置,它就给我们拼一个出来:

//命名,跟自增长数字:producer-1
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();

如果搞多个KafkaProducer,每个都会有各自的clientId。

接下来会构造核心组件,Partitioner 分区器

//构造了分区器,用来决定你发送的消息路由到topic的哪个分区中
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

分区器是决定发送的每条消息是路由到Topic的哪个分区里去的

构造核心组件,Metadata,它是用来从broker集群拉取元数据的

//从broker集群拉取元数据的 Topics(topic-->partitions(leader + followers,ISR列表和leader保持同步))
//每隔一段时间就会更新元数据,默认5min
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

后面写消息到Topic,才知道Topic对应有哪些partitions、Partition Leader所在的Broker,后面肯定会每隔一段时间就刷新元数据,metadata.max.age.ms,默认5min
如果某个Topic对应的元数据不在本地,就会通过Metadata来向Broker发送请求,拉取对应的元数据。

接下来定义核心参数:

发送失败后的重试间隔时间

//发送失败后,重试的时间间隔,默认100ms
//props.put("retry.backoff.ms",500)
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

Request的最大的字节数(请求包含多个batch,每个batch中包含多个消息)

//发送的请求最大多少字节:props.put("max.request.size",xx) 默认1MB
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);

内存缓冲区大小

//缓冲区大小,props.put("buffer.memory",xx),默认32MB
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);

如果消息发送到缓冲区的速度,远大于缓冲区到Broker的速度,即缓冲区被填满了,Producer就会把发送的请求block住,不让你继续发消息了。(MAX_BLOCK_MS_CONFIG,默认会block 1min)1min后如果还不行,就抛异常。注释提示:不要把所有的JVM内存都给到这个缓冲区,还得留下额外的内存给消息压缩、in-flight请求(消息已发出,但未收到回应)

每个request的超时时间,默认30s

//请求的超时时间,默认30s
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

定义核心组件,RecordAccumulator缓冲区,负责消息的缓冲机制

//缓冲区,负责消息的缓冲机制
//消息打包成batch,如果我的消息要发送给同一个分区(Broker下有很多分区),就把同一个分区的请求打包成1个batch。
//1个Borker上的多个分区,对应的多个batch,会被打包成1个Request。1次网络请求就都搞定了
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), //batch大小,16kthis.totalMemorySize, //缓冲区大小,32MBthis.compressionType,config.getLong(ProducerConfig.LINGER_MS_CONFIG), //消息如果没凑够batch,超过该时间后,也得立刻发送出去retryBackoffMs, //发送失败的重试间隔,100msmetrics,time);

消息打包成batch,发送request的原理:

1个Broker对应多个分区,1个分区对应1个batch。将消息打包为一个个的batch,再将batch打包成1个Request,1次网络请求就搞定了。如果batch太小,就会频繁Request,造成网络请求此时变多,吞吐量就会变小。如果batch搞太大,就会让缓冲区缓存 太多,会浪费内存资源。BATCH_SIZE_CONFIG batch默认就16k
足够多的消息凑够1个batch,才能request到Broker上。

但是如果发送了1条消息,经历了 LINGER_MS_CONFIG(默认:0) 后,即使没有凑够1个batch,也要立刻把消息发送出去。就好像坐大巴一样,凑够了人数就发车。但是今天就1个乘客,到了发车时间,大巴也得发车。

获取我们的Broker地址:

//broker地址 props.put("bootstrap.servers","localhost:9092")
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

核心行为,去Broker拉取元数据

//核心行为:初始化时,直接调用Metadata组件的方法,从Broker上拉取1次集群元数据,每隔5min刷新一次
//在发送消息过程中,如果没找到某个Broker的元数据,也得去拉取一次
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

核心组件,网络通信组件NetworkClient

//核心组件,网络通信组件
NetworkClient client = new NetworkClient(//connections.max.idle.ms 网络连接的最长空闲时间,超过就得回收,默认9minnew Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),this.metadata,clientId,//max.in.flight.requests.per.connection,每个Broker最多允许几个request未收到响应,默认5config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),//和Broker建立连接失败后的重试间隔,默认50msconfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),//Socket发送缓冲区大小,默认128kconfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),//Socket接收缓冲区大小,默认32kconfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),this.requestTimeoutMs, time);

核心组件,Sender线程

//核心组件,发送线程,从缓冲区获取消息,发送到Broker上去
this.sender = new Sender(client, //网络通信组件NetworkClientthis.metadata,this.accumulator,//每个连接没收到请求的数量是否==1config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,//发送请求的最大大小,1MBconfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),//默认1,只要leader写入成功就认为成功了(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),//重试次数,默认0config.getInt(ProducerConfig.RETRIES_CONFIG),this.metrics,new SystemTime(),clientId,//请求超时时间,30sthis.requestTimeoutMs);
//线程名
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
//真正的发送线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

Sender是一个Runnable,线程是KafkaThread,线程名为:kafka-producer-network-thread。此时线程直接就启动了,就会将Sender(是个Runnable任务)跑起来,这就将NetworkClient网络通信组件给run起来了。

接着会准备好核心组件:序列化组件、拦截器组件

//序列化组件
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);
//拦截器组件
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

KafkaProducer初始化时涉及到哪些核心组件?相关推荐

  1. KafkaProducer初始化时,是否会拉取集群元数据?

    debug看一下: //核心行为:初始化时,直接调用Metadata组件的方法,从Broker上拉取1次集群元数据,每隔5min刷新一次 //在发送消息过程中,如果没找到某个Broker的元数据,也得 ...

  2. 涉及到整型参数时,没有初始化时,Debug下可以运行,但是Release下不行

    涉及到整型参数时,没有初始化时,Debug下可以运行,但是Release下不行 是因为Debug给它默认赋值是0了,而Release下不会默认

  3. 【Stateflow】Chart初始化时访问data store memory警告

    应用Stateflow的时候,涉及到Data Store Memory作为全局变量的时候的一个错误,记录下来. 1. 问题描述 变量xxx是用Data Store Memory在外层定义的全局变量,S ...

  4. 关于STM32使用LWIP协议栈二次初始化时无法成功初始化TCP服务器----内存碎片化问题以及解决方法

    关于STM32使用LWIP协议栈二次初始化时无法成功初始化TCP服务器----内存碎片化问题以及解决方法 关于LWIP协议栈的话后期再出一个相关的系列文章吧,关于使用LAN8720芯片断网线重连的问题 ...

  5. vue 项目初始化时,npm run dev报错解决方法

    vue 项目初始化时,npm run dev报错解决方法 参考文章: (1)vue 项目初始化时,npm run dev报错解决方法 (2)https://www.cnblogs.com/ruilin ...

  6. LPC单片机IO口默认状态、复位状态、未初始化时输出高电平处理

    众所周知,STM32当复位以后,gpio默认是高阻状态,也就是浮空输入. 由STM32切换到LPC1788,发现LPC的IO未初始化时输出高电平,初始化后才能拉低,这样和STM32的设计就不兼容了. ...

  7. CString初始化时提示字符串太大

    CString初始化时提示字符串太大 char*也会报一样的错,原因是转义字符 char* aaaa="d:\asdf\sadf"; 解决方法:改为右斜杠就行.

  8. Android在初始化时弹出popwindow的方法

    Android中在onCreate()时弹出popwindow,很多人都有过类似的需求吧,但是直接在onCreate()中调用popwindow的showAtLocation()方法是会报异常的,原因 ...

  9. 系统初始化时kernel_init在内核态创建和运行应用程序以完成系统初始化

    系统初始化时kernel_init在内核态创建和运行应用程序以完成系统初始化.  内核刚刚启动时,只有内核态的代码,后来在init过程中,在内核态运行了一些初始化系统的程序,才产生了工作在用户空间的进 ...

最新文章

  1. 深度学习核心技术精讲100篇(十七)-多标准中文分词( Multi-Criteria-CWS)
  2. Java中这7个方法,一不小心就用错了!
  3. 【Python实例第19讲】图像分割的谱聚类
  4. 正则表达式之位置匹配
  5. ubuntu下安装ros出现“无法下载-package.ros.org中某个包-校验和不符”的解决方法...
  6. Mysql(二)Mysql SQL练习题
  7. Unity使用Aspose.Words创建表格和UI截图一起插入到Word中并保存到本地的一种解决方案
  8. appium自动化测试
  9. mysql无法创建partition_mysql的partition分区
  10. 【老生谈算法】matlabAP近邻传播聚类算法源码——聚类算法
  11. android的权限一览表和RGB颜色对照表
  12. 小视频如何消重 视频剪切后md5值变了吗
  13. a记录 mysql_[a]-和[a]相关的内容-阿里云开发者社区
  14. 时空大数据与众包计算学习总结
  15. 重庆赛区ACM热身赛 8526. 小埋的烦恼
  16. 怎样把音乐存到计算机里,如何把CD光盘中的歌曲复制到电脑
  17. 计算机专业英语复习第七天
  18. 加了@CrossOrigin ,仍然报跨域错误
  19. 弹性力学偏微分方程组及其边界条件
  20. 前端分享到微信及微信朋友圈之后链接变化

热门文章

  1. labview波形图两个游标,LabVIEW数据可视化:使用波形图表控件逐点显示曲线的方法...
  2. 4种事务特性,5种隔离级别,7种传播行为
  3. 基于上下文的访问控制——CBAC的配置
  4. stm32与arm7比较(经典)
  5. 解决ERROR 2003 (HY000): Can't connect to MySQL server on
  6. 【原创翻译】The Case for the Reduced Instruction Set Computer
  7. 学习类中的const和static类型
  8. 修复思维导图mindmanager移动文件位置后打开崩溃
  9. RH033 Unit 13 Finding and Processing Files
  10. IT精英们!一路走好!