一.Kafka核心API模块-producer API讲解实战

1.1 封装配置属性

public static Properties getProperties(){Properties props = new Properties();
​props.put("bootstrap.servers", "ip:9092");//props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092");
​// 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。props.put("acks", "all");//props.put(ProducerConfig.ACKS_CONFIG, "all");
​// 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性props.put("retries", 0);//props.put(ProducerConfig.RETRIES_CONFIG, 0);
​
​// 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KBprops.put("batch.size", 16384);
​
​/*** 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满* 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到          服务端* 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送           减少请求* 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送*/props.put("linger.ms", 1);
​/*** buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。* 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到             Kafka服务器* 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了* buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整* 需要结合实际业务情况压测进行配置*/props.put("buffer.memory", 33554432);
​
​/*** key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被          设置,* 即使消息中没有指定key,序列化器必须是一个实org.apache.kafka.common.serialization.Serializer接口的类,* 将key序列化成字节数组。*/props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
​return props;}

1.2 生产者投递消息API实战(同步发送)

 /*** send()方法是异步的,添加消息到缓冲区等待发送,并立即返回* 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合** 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack* 发送消息后返回的一个 Future 对象,调用get即可** 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程*  1)main线程发送消息到RecordAccumulator即返回*  2)sender线程从RecordAccumulator拉取信息发送到broker*  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数***/@Testpublic void testSend(){
​Properties props = getProperties();Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 1; i < 3; i++){Future<RecordMetadata>  future = producer.send(new ProducerRecord<>("my-topic", "xdclass-key"+i, "xdclass-value"+i));try {RecordMetadata recordMetadata = future.get();//不关心是否发送成功,则不需要这行System.out.println("发送状态:"+recordMetadata.toString());
​} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(i+"发送:"+LocalDateTime.now().toString());}producer.close();}

1.3 【面试】 ProducerRecord介绍和key的作用

ProducerRecord(简称PR)
发送给Kafka Broker的key/value 值对, 封装基础数据信息

-- Topic (名字)
-- PartitionID (可选)
-- Key(可选)
-- Value



key默认是null,大多数应用程序会用到key

如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在各个partition上如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到Topic的哪个partition,拥有相同key的消息会被写到同一个partition,实现顺序消息

注意:单纯的投递到一个分区不能百分百保证有序,需要考虑异常重试的问题。

kafka学习(六)相关推荐

  1. 大数据 -- kafka学习笔记:知识点整理(部分转载)

    一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...

  2. Kafka学习笔记(一):什么是消息队列?什么是Kafka?

    目录 一.消息队列的概述 (一)前置知识点 1.集群和分布式 2.队列(Queue)的含义 3.同步与异步的含义 (二)消息队列的含义与特点 二.Kafka (一) 概述 (二) 常用名词含义 导航栏 ...

  3. 2021年大数据Kafka(六):❤️安装Kafka-Eagle❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 安装Kafka-Eagle 一.Kafka-eagle基本介 ...

  4. [Big Data - Kafka] kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许 ...

  5. Apollo代码学习(六)—模型预测控制(MPC)_follow轻尘的博客-CSDN博客_mpc代码

    Apollo代码学习(六)-模型预测控制(MPC)_follow轻尘的博客-CSDN博客_mpc代码

  6. 艾伟:C#多线程学习(六) 互斥对象

    本系列文章导航 C#多线程学习(一) 多线程的相关概念 C#多线程学习(二) 如何操纵一个线程 C#多线程学习(三) 生产者和消费者 C#多线程学习(四) 多线程的自动管理(线程池) C#多线程学习( ...

  7. kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险. ...

  8. Kafka学习-入门

    在上一篇kafka简介的基础之上,本篇主要介绍如何快速的运行kafka. 在进行如下配置前,首先要启动Zookeeper. 配置单机kafka 1.进入kafka解压目录 2.启动kafka bin\ ...

  9. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

最新文章

  1. Javascript 中 null、NaN和undefined的区别
  2. magento二次开发的基本步骤分享
  3. Teams Bot如何判断用户所在的时区
  4. js iframe 出现跨越问题
  5. 2021年宣城市高考成绩查询,宣城高考成绩查询入口
  6. VS(Visual Studio2017)快速入门基础操作(运行结果一闪而过,找不到解决方案资源管理器)
  7. 请熟悉SQL server的高手赐教。
  8. 代理服务器的工作原理是什么?
  9. Mocking and Stubbing
  10. 计算机怎样安装硬盘,固态硬盘怎么安装?小编教你怎么安装固态硬盘详图
  11. jq+css3树叶飘散特效
  12. BCIduino转载|3D打印机使用的日常问题汇总
  13. 如何制作手机海报?手把手教你在线自制手机海报
  14. C语言小黄鸭-->函数()
  15. 重装系统后安装的软件
  16. android 手电筒开关,Android实现手电筒电源键关闭功能
  17. 斐波那契数列 python 高阶解法
  18. 渐进式复杂度分析-学习笔记
  19. i.MX6ULL驱动开发 | 02-字符设备驱动框架
  20. JS兼容所有浏览器获取浏览器高度和宽度

热门文章

  1. 计算机键盘的tab键是哪个,电脑键盘中的Tab键都有哪些妙用
  2. Paper:可解释性之VI/PFI《All Models are Wrong, but Many are Useful: Learning a Variable’s Importance》翻译与解读
  3. FileWriter和FileReader的基本使用
  4. 【资源】OpenCV3编程入门_毛星云
  5. 台式机和笔记本属于什么计算机,pc机属于什么型计算机
  6. input的各种事件
  7. 可靠、稳定、安全,龙蜥云原生容器镜像正式发布!
  8. Win7 64位系统不能使用农业银行网银
  9. 微计算机最新科技应用论文,微计算机应用
  10. 创基MIFI扩展坞自带4G上网功能扩展坞