本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习。

  • 秦凯新的技术社区-大数据商业实战系列全集目录
  • kafka 商业环境实战-kafka生产环境规划
  • kafka 商业环境实战-kafka生产者和消费者吞吐量测试
  • kafka 商业环境实战-kafka生产者Producer参数设置及参数调优建议
  • kafka 商业环境实战-kafka集群Broker端参数设置及调优准则建议
  • kafka 商业环境实战-kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
  • kafka 商业环境实战-kafka之Consumer多种消费模式官方案例应用实战

1 我很安全

为何惊出此言?内心惶恐。kafka的Producer是线程安全的,用户可以非常非常放心的在多线程中使用。

但是官方建议:通常情况下,一个线程维护一个kafka 的producer的效率会更高。

2 Producer 消息发送流程

  • 第一步:封装ProducerRecord
  • 第二步:分区器Partioner进行数据路由,选择某一个Topic分区。如果没有指定key,消息会被均匀的分配到所有分区。
  • 第三步:确定好分区,就会找分区对应的leader,接下来就是副本同步机制。

3 Producer官方实例

3.1 Fire and Fogret案例 (无所谓心态)

  • 发送之后便不再理会发送结果
      Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));producer.close();
    复制代码

3.2 异步回调官方案例 (不阻塞)

  • JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制。
  • Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

  • RecordMetadata 和 Exception 不可能同时为空,消息发送成功时,Exception为null,消息发送失败时,metadata为空。

       ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);producer.send(myRecord,new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if(e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is: " + metadata.offset());}}});
    复制代码

3.3 同步发送官方案例 (阻塞)

  • 通过 producer.send(record)返回Future对象,通过调用Future.get()进行无限等待结果返回。

     producer.send(record).get()
    复制代码

3.4 基于事务发送官方案例 (原子性和幂等性)

  • From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.

  • To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();
    } catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();
    }
    producer.close();
    复制代码
  • As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.

3.5 可重试异常(继承RetriableException)

  • LeaderNotAvailableException :分区的Leader副本不可用,这可能是换届选举导致的瞬时的异常,重试几次就可以恢复
  • NotControllerException:Controller主要是用来选择分区副本和每一个分区leader的副本信息,主要负责统一管理分区信息等,也可能是选举所致。
  • NetWorkerException :瞬时网络故障异常所致。

3.6 不可重试异常

  • SerializationException:序列化失败异常
  • RecordToolLargeException:消息尺寸过大导致。

3.7 异常的区别对待

     producer.send(myRecord,new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if(e ==null){//正常处理逻辑System.out.println("The offset of the record we just sent is: " + metadata.offset()); }else{if(e instanceof RetriableException) {//处理可重试异常......} else {//处理不可重试异常......}}}});
复制代码

3.8 Producer的绅士关闭

  • producer.close():优先把消息处理完毕,优雅退出。
  • producer.close(timeout): 超时时,强制关闭。

4 总结

为了能够证明技术就是一层窗户纸,我会把kafka剖析的体无完肤。

秦凯新 于深圳 2018

kafka之Producer同步与异步消息发送及事务幂等性案例应用实战相关推荐

  1. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  2. 利用Spring框架封装的JavaMail现实同步或异步邮件发送

    利用Spring框架封装的JavaMail现实同步或异步邮件发送 作者:张纪豪 J2EE简单地讲是在JDK上扩展了各类应用的标准规范,邮件处理便是其中一个重要的应用.它既然是规范,那么我们就可以通过J ...

  3. KAFKA 同步和异步消息的发送(开发实战)

    文章目录 一.消费者监听 1. 启动zk 2. 启动kafka 3. 创建主题 4. 消费者监听消息 二.生产者工程 2.1. 依赖 2.2. 生产者代码(同步) 2.3. 生产者代码(异步) 2.4 ...

  4. RocketMQ:Producer启动流程与消息发送源码分析

    文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...

  5. 利用Spring框架封装的JavaMail实现同步或异步邮件发送

    J2EE简单地讲是在JDK上扩展了各类应用的标准规范,邮件处理便是其中一个重要的应用.它既然是规范,那么我们就可以通过JDK遵照邮件协议编写一个邮件处理系统,但事实上已经有很多厂商和开源组织这样做了. ...

  6. 同步和异步消息机制的区别

    消息通信的基本方式有两种: 1.同步通信 总结:发送方发完消息后会一直等待接收方响应,只有接收方响应后才能进行下一步工作. 两个通信应用服务之间必须要进行同步,两个服务之间必须都是正常运行的.发送程序 ...

  7. RocketMQ源码解析-Producer消息发送

    首先以默认的异步消息发送模式作为例子.DefaultMQProducer中的send()方法会直接调用DefaultMQProducerImpl的send()方法,在DefaultMQProducer ...

  8. Kafka(Go)教程(九)---如何避免消息丢失?

    来自:指月 https://www.lixueduan.com 原文:https://www.lixueduan.com/post/kafka/09-avoid-msg-lost/ 本文主要从 Pro ...

  9. mq发送消息到两个服务器问题,RocketMQ消息发送常见错误与解决方案

    本文将结合本身使用RocketMQ的经验,对消息发送常见的问题进行分享,基本会遵循出现问题,分析问题.解决问题.web 一.No route info of this topic 没法找到路由信息,其 ...

最新文章

  1. 牛客小白月赛25 补题+题解[A-J]
  2. 201621123028《Java程序设计》第一周学习总结
  3. C#LeetCode刷题之#387-字符串中的第一个唯一字符(First Unique Character in a String)
  4. == 与 equals 的区别
  5. Cocos2d-x Touch事件处理机制(better)
  6. 3.微服务:从设计到部署 --- 进程间通信
  7. 1018 锤子剪刀布
  8. ios 输入法扩展_iOS8、iOS9都可用的原生输入法扩展词库(搜狗词库)
  9. Visio_Premium_2010_VOL 和Project_Pro_2010。
  10. vba字典的key属性、item属性和keys方法、items方法、add方法
  11. 微服务网关之Springcloud GateWay
  12. linux文件系统知识总结、SD卡挂载问题总结
  13. excel怎么能把字竖着打出来_excel表格怎么把字竖着 怎么把excel表格里的字变成竖的?...
  14. 读取excel文件数据,封装成hashmap
  15. C++函数模板与类模板的区别
  16. 浏览器直接编辑html文件,HTML文件怎么打开 .html如何编辑
  17. 程序员的520表白代码,你给你对象整过几个?
  18. 前端面试题(带文字+代码解析),我不相信你看不懂(2022.11.04)
  19. http 4xx,5xx Server error
  20. GaRy-Liang的linux成长日记3-自动化安装

热门文章

  1. python画图代码彩虹-python绘制简单彩虹图
  2. python 网站文件下载-python实现下载文件的三种方法
  3. python创建打开文件-Python文件处理:创建、打开、追加、
  4. 自学python的书籍-Python学习可以用到的书籍有哪些?
  5. python可以用来做什么-Python 学会之后可以用来干嘛的?
  6. NVIDIA Jetson 系列产品开发相关文档,TAO、TLT、NGC
  7. flood fill算法
  8. Tree命令安装和使用
  9. 模块化与nodeJs
  10. 三.Hystrix资源隔离