生产者流程

发送流程

  1. 创建ProducerRecord对象,包含主题和发送内容,可选指定键和分区
  2. 键值对象序列化成字节数组
  3. 数据发送给分区器,没有指定分区情况下,根据键选择一个分区
  4. 分区选择完毕,这条消息添加到一个批次当中,这个批次的所有记录归属于相同主题和分区
  5. 独立线程将记录批次发送到相应的broker上
  6. broker返回响应
    1. 消息写入broker成功,返回RecordMetaData对象,包含主题和分区,以及记录在分区的偏移量
    2. 消息写入broker失败,返回错误,生产者根据配置进行重试,重试次数达到仍然失败,返回错误信息

发送消息的方式

  • 发送忘记:消息发送给服务器,但并不关心它是否正常到达,错误自动重试
  • 同步发送:send() 方法发送消息,它会返回一个Future 对象,调用get() 方法进行等待,判断是否发送成功
  • 异步发送:send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数

生产者发送消息-Coding

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class MessageProducer {public static void main(String[] args) {// 生产者配置Properties producerConfig = new Properties();// bootstrap.serversproducerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-node01:9092,kafka-node02:9092,kafka-node03:9092,");// 设置key的序列化器producerConfig.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置value的序列化器producerConfig.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerConfig);String topic = "test-vip";ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "messageKey", "messageValue");// 同步发送
//        try {//            // 如果服务器返回错误,get()抛出异常,发送成功返回RecordMetadata 对象
//            producer.send(producerRecord).get();
//        } catch (Exception e) {//            e.printStackTrace();
//        }// 异步发送try {// 如果服务器返回错误,get()抛出异常,发送成功返回RecordMetadata 对象producer.send(producerRecord, new ProducerCallBack());} catch (Exception e) {e.printStackTrace();}}
}class ProducerCallBack implements Callback {public void onCompletion(RecordMetadata recordMetadata, Exception e) {// 如果服务器返回错误,会抛出非空异常if(e != null){e.printStackTrace();}}
}

生产者配置参数

  • acks: 指定必须要有多少个分区副本收到消息,生产者才会认为消息写入成功

    • acks = 0: 生产者在成功写入消息之前不会等待任何来自服务器的响应
    • acks = 1: 集群首领节点收到消息,返回相应给生产者确认消息发送成功,否则收到错误响应,生产者重新发送消息
    • acks = all: 所有复制节点全部收到消息,集群响应生产者
  • buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息,如果生产者发送消息速度超过发送到服务器的速度,导致生产者空间不足。send() 方法调用要么被阻塞,要么抛出异常取决于max.block.ms:阻塞时长

  • compression.type:默认情况下,消息发送时不会被压缩。该参数可以设置为snappy、gzip 或lz4,它指定了
    消息被发送给broker 之前使用哪一种压缩算法进行压缩。可选值:snappy,gazip等

  • retries: 生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误;。默认情况下,生产者会在每次重试之间等待100ms,取决于retry.backoff.ms 参数

  • batch.size: 指定一个批次使用的内存大小 字节数

  • linger.ms: 参数指定了生产者在发送批次之前等待更多消息加入批次的时间

  • client.id: 任意的字符串,服务器会用它来识别消息的来源

  • max.in.flight.requests.per.connection: 指定了生产者在收到服务器响应之前可以发送多少个消息,值越高,内存占据越多设为1 可以保证消息是按照发送的顺序写入服务器的,即使生了重试。

  • timeout.ms: 指定broker 等待同步副本返回消息确认的时间,与asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么broker 就会返回一个错误

  • request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间

  • metadata.fetch.timeout.ms: 指定了生产者在获取元数据(时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误抛出异常或执行回调

  • max.block.ms: 调用send() 方法或使用partitionsFor() 方法获取元数据时生产者的阻塞时间,超出该时间,生产者抛出异常

  • max.request.size: 控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指
    单个请求里所有消息总的大小

  • message.max.bytes: 该参数由broker设置,与max.request.size要匹配

  • receive.buffer.bytes;send.buffer.bytes:TCP socket 接收和发送数据包的缓冲区大小,设为-1,就使用操作系统的默认值

发送消息的顺序问题

kafka保证单个分区内有序,但如果retries 设为非零整数max.in.flight.requests.per.connection设为比1 大的数,那么就由可能发生顺序相反。保证严格顺序下,可以设置:retries 设为0;max.in.flight.requests.per.connection = 1

序列化器

  • Avro序列化(常用)
  • json(常用)
  • 自定义(不推荐)

分区

默认分区器

  1. 消息键可以设置为null, 如果使用默认的分区器,记录将随机的发送到主题内各个可用的分区。分区使用**轮询(Round Robin)**算法将消息均衡地分布到各个分区上
  2. 键不为null,根据散列值将消息映射到特定的分区,同一个键总是被映射到同一个分区

自定义分区器

public class CustomPartitioner implements Partitioner {public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Integer numPartitions = cluster.partitionCountForTopic(topic);// 分区逻辑if ((keyBytes == null) || (!(key instanceof String))){throw new InvalidRecordException("expect the record has the key");}if("svip".equals(key.toString())){// 返回分区编号return numPartitions - 1;}return (Math.abs(Utils.murmur2(keyBytes) % (numPartitions - 2)));}public void close() {}public void configure(Map<String, ?> map) {}
}

配置生产者调用:

kafka-生产者使用相关推荐

  1. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

  2. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  3. java实现Kafka生产者示例

    使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 3 ...

  4. kafka生产者和消费者端的数据不一致

    撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...

  5. mysql作为kafka生产者_Kafka之生产者

    [TOC] 从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序.在 Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编 ...

  6. kafka生产者开发方式

    [README] 本文记录了 kafka生产者开发方式: [1]生产者概览 [1.1]kafka发送消息过程 [1.2]创建kafka生产者 1)创建kafka生产者, 有3个必选属性: bootst ...

  7. spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

    [README] 0,为啥要看  DefaultKafkaProducerFactory? 最近在基于 springboot 开发kafka模块,发现 kafakTemplate构造器传入了 Defa ...

  8. java客户端作为kafka生产者测试

    [README] 1.本文主要对 java客户端作为kafka 生产者进行测试, 消费者由 centos的kafka命令行线程扮演: 2.消息发送: kafka的生产者采用异步发送消息的方式,在消息发 ...

  9. java作为kafka生产者实验及Expiring超时问题解决

    [README] java作为生产者,centos 作为消费者: [1]生产者代码 -- pom.xml <!-- 依赖 --> <dependencies><depen ...

  10. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

最新文章

  1. ddl mysql_mysql 5.6 在线 DDL
  2. 初识前端——个人总结
  3. 68.connect-flash 用法详解 req,flash()
  4. poj 3131 双向搜索+hash判重
  5. 关于hibernate中提示can not create table ******
  6. Oracle中创建、修改、删除序列
  7. Spring Bean范围
  8. 计算机应用基础试模块5ACCSE,2015年计算机二级《Access》上机最后冲刺卷(1)
  9. linux pwm 调屏_Linux驱动学习之:PWM驱动
  10. 高效精准分析定位系统BUG,让你无所不能
  11. Javascript实现计数器,定时警告和停止
  12. Diverse Team(CF-988A)
  13. 【linux系统编程】基础开发工具:gcc/g++
  14. java执行cmd命令并获取返回结果字符串
  15. bash之sed与awk初步
  16. Android:日常学习笔记(7)———探究UI开发(4)
  17. 《四海小记c++学习之路》队列/银行叫号系统
  18. 数据结构之树(3)——二叉平衡树(AVL)
  19. MxNet创建ILSVRC2012.rec文件
  20. 10个优秀的图标搜索引擎

热门文章

  1. Excel - 字符串处理函数:LEFT, RIGHT, MID, LEN 和 FIND
  2. android 方向传感器,10.11 传感器专题(2)——方向传感器
  3. PositiveUnlabeled Data Learning——第四弹(Semi-Supervised Classification/AUC Optimization)
  4. 发布订阅模式(一):tiny-emitter
  5. 大文件CSV导入MYSQL_将大csv文件导入cp网站出租搭建mysql数据库
  6. cortana连不上网络_Alexa,为什么Cortana仍在我的计算机上?
  7. 失去了翅膀,却依然坚信只要拼搏就可以飞翔
  8. JMeter插件之PerfMon监控服务器性能
  9. 决策表(决策树)[软件工程]
  10. 在命令行使用 Pandoc 进行文件转换