本次内容我们有两个目标:第一个初探Producer发送消息的流程第二个我们学习一下Kafka是如何构造异常体系的一、代码分析Producer核心流程初探

//因为生产中开发使用的是异步的方式发送的消息,所以我这儿直接贴的代码//就是异步发送的代码,大家注意这个代码里面传进去了两个参数//一个是消息//一个是回调函数,这个回调函数很重要,每个消息发送完成以后这个回调函数都会被//执行,我们可以根据这个回调函数返回来的信息知道消息是否发送成功,//做相对应的应对处理。这种传递回调函数的代码设计方式也值得我们积累,这样可以增加用户开发代码时候的灵活性。 producer.send(new ProducerRecord<>(topic,                    messageNo,                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));

点击过去就会看到如下核心代码

private Future doSend(ProducerRecord record, Callback callback) {        TopicPartition tp = null;try {// first make sure the metadata for the topic is available//第一步:阻塞等待获取集群元数据//maxBlockTimeMs 获取元数据最多等待的时间            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);//最多等待的时间减去等待元数据花的时间等于还可以在等待的时间            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);//集群元数据信息            Cluster cluster = clusterAndWaitTime.cluster;//第二步:对key和value进行序列化            byte[] serializedKey;try {                serializedKey = keySerializer.serialize(record.topic(), record.key());            } catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer");            }            byte[] serializedValue;try {                serializedValue = valueSerializer.serialize(record.topic(), record.value());            } catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer");            }//第三步:根据分区器选择合适的分区            int partition = partition(record, serializedKey, serializedValue, cluster);//第四步:计算消息的大小            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);//确认消息是否超出限制            ensureValidRecordSize(serializedSize);//第五步:根据元数据获取到topic信息,封装分区对象            tp = new TopicPartition(record.topic(), partition);            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);// producer callback will make sure to call both 'callback' and interceptor callback//第六步:设置回调对象            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);//第七步:把消息追加到accumulator对象中            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);//消息存入accumulator中,如果一个批次满了,或者是创建了一个新的批次//那么唤醒sender线程,让sender线程开始干活,至于干什么活,我们后面//再去分析if (result.batchIsFull || result.newBatchCreated) {                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);//第八步:唤醒sender线程this.sender.wakeup();            }return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly        } catch (ApiException e) {            log.debug("Exception occurred during message send:", e);if (callback != null)                callback.onCompletion(null, e);this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);return new FutureFailure(e);        } catch (InterruptedException e) {this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);        } catch (BufferExhaustedException e) {this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw e;        } catch (KafkaException e) {this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw e;        } catch (Exception e) {// we notify interceptor about all exceptions, since onSend is called before anything else in this methodif (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw e;        }    }

看完上面的代码,我们也就大概完成了本次的第一个目标:初探Producer的核心流程初探。代码调用的时序图如下:Producer发送数据流程分析二、Kafka异常体系一直跟着分析源码的同学能感觉得到上面的代码就是KafkaProducer的核心流程。这也是我们为什么在挑这个时候讲Kafka是如何构造异常体系的原因,一般在项目的核心流程里面去观察这个项目的异常体系会看得比较清晰,大家发现这个流程里面捕获了很多异常:

 } catch (ApiException e) {            log.debug("Exception occurred during message send:", e);if (callback != null)                callback.onCompletion(null, e);this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);return new FutureFailure(e);        } catch (InterruptedException e) {this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);        } catch (BufferExhaustedException e) {this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw e;        } catch (KafkaException e) {this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw e;        } catch (Exception e) {// we notify interceptor about all exceptions, since onSend is called before anything else in this methodif (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw e;        }

通过看这段代码,我们可以学习到如下3个点:1. 核心流程捕获各种异常(上面的这段代码就是核心代码)2. 底层异常直接往上抛,比如:ensureValidRecordSize方法3. 自定义各种异常,力求出了问题,方便精准定位问题 比如:ensureValidRecordSize方法注意:核心流程捕获异常的时候我们也可以考虑把异常封装成为各种状态码。Kafka自定义各种异常。举个例子,比如我们分析初探核心流程里面有段代码是:

//检查要发送的这个消息大小, 检查是否超过了请求大小和内存缓冲大小。            ensureValidRecordSize(serializedSize);

点击过去

private void ensureValidRecordSize(int size) {//默认值1M,如果超过1M抛异常if (size > this.maxRequestSize)throw new RecordTooLargeException("The message is " + size +" bytes when serialized which is larger than the maximum request size you have configured with the " +                                              ProducerConfig.MAX_REQUEST_SIZE_CONFIG +" configuration.");//不能超过内存缓冲的大小,如果超过内存大小抛异常if (size > this.totalMemorySize)throw new RecordTooLargeException("The message is " + size +" bytes when serialized which is larger than the total memory buffer you have configured with the " +                                              ProducerConfig.BUFFER_MEMORY_CONFIG +" configuration.");    }

RecordTooLargeException 就是自定义的异常,Kafka选择把这种底层代码的异常往上抛,在核心流程里统一处理。如果没有太多工业项目设计经验的同学,可以学习Kafka的异常体系的设计,Kafka使用的这种异常处理方式是大多数大数据项目处理异常使用的方式。三、总结本小节主要分析了KafkaProducer发送消息的大致步骤,另外此小节还有一个重点就是我们学习了Kafka是如何构建自己的异常体系的。系列更新第八期啦,后续还有更精彩的内容!喜欢的同学可以点赞,关注-   关注“大数据观止”   -

producer send源码_Kafka源码深度剖析系列(七)——Producer核心流程初探相关推荐

  1. hibernate 并发获取session失败 空指针_高并发之|通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程...

    核心逻辑概述 ThreadPoolExecutor是Java线程池中最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并通过原子方式更新线程池每个阶段的状态. ThreadPoolExecu ...

  2. μC/OS-II 源码阅读笔记 —— 内核深度剖析

    一个程序猿郁结十年的青苹果 Bush 2014-4-24 前言 此文发表在此,由于正吃菜的我才疏学浅,文中难免有错误的地方,欢迎看官和过客指正批评,痛骂也无妨,我虚心接受所有的鄙视. 目录 概述 缩略 ...

  3. python web开发项目 源码_Python + Flask 项目开发实践系列七

    对于 Python + Flask 这种灵活的web开发框架,在前面的六个系列文章中详细的进行了说明,主要讲到了页面的首页加载时的页面渲染,增加功能,删除功能,修改功能,查询功能,查询详情功能等一些页 ...

  4. 深度学习系列之CNN核心内容

    导读 怎么样来理解最近异常火热的深度学习网络?深度学习有什么亮点呢?答案其实很简答,今年十月份有幸参加了深圳高交会的中科院院士论坛,IEEE fellow汤晓欧做了一场精彩的报告,这个问题被汤大神一语 ...

  5. 【深度相机系列七】深度相机应用全面梳理:多点开花,定点爆破

    前面我们对深度相机的基本原理有了一定了解,本文相对全面的梳理一下深度相机的应用领域.深度相机的应用在智能人机交互.人脸技术.三维重建.机器人.AR等领域全面开花,目前商用深度相机最成熟的应用就是移动终 ...

  6. 【深度相机系列七】深度相机的应用领域

    说明:文中所举例的产品比较早,读者把重点放在学习原理上就好. 前面我们对深度相机的基本原理有了一定了解,本文相对全面的梳理一下深度相机的应用领域.深度相机的应用在智能人机交互.人脸技术.三维重建.机器 ...

  7. 深度剖析RPC框架的核心设计

    做过分布式服务端的Java工程师,随着对技术底层的认知的加深,都会或多或少的会去想: 一个RPC框架需要考虑的问题有哪些,如何来解决? 下面我们围绕RPC通信框架,从如何实现这个角度做一个剖析,以及每 ...

  8. MySQL深度剖析之SQL语句更新流程(2021)

    #1 线程开始执行SQL更新请求之前,会创建事务,并且为当前线程分配一块内存空间叫做binlog cache 注意:binlog cache 是每一个线程分配一个:binlog cache大小受bin ...

  9. 【LED子系统深度剖析】七、触发器实现

    个人主页:董哥聊技术 我是董哥,高级嵌入式软件开发工程师,从事嵌入式Linux驱动开发和系统开发,曾就职于世界500强公司! 创作理念:专注分享高质量嵌入式文章,让大家读有所得! 文章目录 1.前言 ...

最新文章

  1. 惠普企业第三财季净利润23亿美元 同比增长914%
  2. 奖励名单表格模板_员工出勤工薪记算表(行政人事模板)
  3. 图像重建算法_降噪重建技术路在何方?
  4. scala调用java代码_scala调用java代码
  5. 很少有人能把CDN说的这么有趣了
  6. 200万存银行理财,年利率5%,未来30年够养老了吗?
  7. Spring面试问题与解答
  8. android反编译工具 ApkDec-Release-0.1
  9. matlab对主动悬架的仿真分析,运用MatlabSimulink对主动悬架力学仿真与分析.doc
  10. thinkphp vue后台管理系统模板_careyshop-admin 后台管理模板
  11. Mac 终端使用自动补全时忽略大小写
  12. 构造体中变量后面的冒号_flow中问号在参数后面和在冒号有什么区别?declare type的作用是?看英文文档有点一知半解...
  13. 什么是CRM客户管理系统?
  14. 中科院计算所是怎样的存在?院校详情大盘点!
  15. 什么是信息技术外包?
  16. css3 3d头像,CSS3做的头像效果,CSS3学习实例
  17. P2P下载技术-BT协议与Magnet磁力链接
  18. 一种快速生成边界交通场景数据的新方法
  19. 24小时制时间转换成am、pm (或 上午、下午)
  20. 硕盟type-c转接头HDMI+VGA+USB3.0+PD3.0四合一多功能扩展坞

热门文章

  1. 卷积神经网络(二):卷积神经网络CNN的BP算法
  2. face recognition[翻译][深度人脸识别:综述]
  3. Web前端小白入门指迷
  4. 还原JavaScript的真实历史~
  5. C#之获取mp3文件信息
  6. Openstack平台搭建(先电版)
  7. “互联网+”时代下 银行信息如何保证安全?
  8. 科沃斯旺宝与阿里云联合参加通信展
  9. jQuery滑动效果实例
  10. 4.mysql数据库创建,表中创建模具模板脚本,mysql_SQL99标准连接查询(恩,外部连接,全外连接,交叉连接)...