
1 Kafka消息队列简介

1.1 基本术语

  • Broker
  • Topic
  • Partition
  • Producer
    负责发布消息到Kafka broker
  • Consumer
    消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

1.2 消息队列

1.2.1 基本特性

  1. 可扩展

    • 在不需要下线的情况下进行扩容
    • 数据流分区(partition)存储在多个机器上
  2. 高性能
    • 单个broker就能服务上千客户端
    • 单个broker每秒种读/写可达每秒几百兆字节
    • 多个brokers组成的集群将达到非常强的吞吐能力
    • 性能稳定,无论数据多大
    • Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能
  3. 持久存储
    • 存储在磁盘上
    • 冗余备份到其他服务器上以防止丢失

1.2.2 消息格式

  1. 一个topic对应一种消息格式,因此消息用topic分类
  2. 一个topic代表的消息有1个或者多个patition(s)组成
  3. 一个partition中
    • 一个partition应该存放在一到多个server上

      • 如果只有一个server,就没有冗余备份,是单机而不是集群
      • 如果有多个server
        • 一个server为leader,其他servers为followers;leader需要接受读写请求;followers仅作冗余备份;leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
    • 消息按顺序存放,顺序不可变
    • 只能追加消息,不能插入
    • 每个消息都有一个offset,用作消息ID, 在一个partition中唯一
    • offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的
    • 消息有超时日期,过期则删除

1.2.3 生产者 producer

  • producer将消息写入kafka
  • 写入要指定topic和partition
  • 消息如何分到不同的partition,算法由producer指定

1.2.4 消费者 consumer

  • consumer读取消息并作处理
  • consumer group
    • 这个概念的引入为了支持两种场景:每条消息分发一个消费者,每条消息广播给消费组的所有消费者
    • 多个consumer group订阅一个topic,该topci的消息广播给group内所有consumer
    • 一条消息发送到一个consumer group后,只能由该group的一个consumer接收和使用
    • 一个group中的每个consumer对应一个partition可以带来如下好处
      • 可以按照partition的数目进行并发处理
      • 每个partition都只有一个consumer读取,因而保证了消息被处理的顺序是按照partition的存放顺序进行,注意这个顺序受到producer存放消息的算法影响

  • 一个Consumer可以有多个线程进行消费,线程数应不多于topic的partition数,因为对于一个包含一或多消费线程的consumer group来说,一个partition只能分给其中的一个消费线程消费,且让尽可能多的线程能分配到partition(不过实际上真正去消费的线程及线程数还是由线程池的调度机制来决定)。这样如果线程数比partition数多,那么单射分配也会有多出的线程,它们就不会消费到任何一个partition的数据而空转耗资源 。
  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

2. 安装和使用



2.1 启动Zookeeper


启动: ./bin/zookeeper-server-start.sh config/zookeeper.properties & ,config/zookeeper.properties是Zookeeper的配置文件。

结束: ./bin/zookeeper-server-stop.sh


2.2 启动Kafka服务器

2.2.1 配置文件


broker.id:          每一个broker在集群中的唯一表示,要求是正数
log.dirs:           kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
log.retention.hours:    数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
zookeeper.connect:     指定ZooKeeper的connectstring,以hostname:port的形式,可有多个以逗号分隔,如hostname1:port1,hostname2:port2,hostname3:port3,还可有路径,如:hostname1:port1,hostname2:port2,hostname3:port3/kafka,注意要事先在zk中创建/kafka节点,否则会报出错误:java.lang.IllegalArgumentException: Path length must be > 0



1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE filedistributed with3 # this work foradditional information regarding copyright ownership.4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except incompliance with6 # the License.  You may obtain a copy of the License at7 #8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #10 # Unless required by applicable law or agreed to inwriting, software11 # distributed under the License is distributed on an "AS IS"BASIS,12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13 # See the License forthe specific language governing permissions and14 # limitations under the License.15 # see kafka.server.KafkaConfig foradditional details and defaults16
17 ############################# Server Basics #############################18
19 # The id of the broker. This must be set to a unique integer foreach broker.20 broker.id=1
22 ############################# Socket Server Settings #############################23
24 # The address the socket server listens on. It will get the value returned from25 # java.net.InetAddress.getCanonicalHostName() ifnot configured.26 #   FORMAT:27 #     listeners = security_protocol://host_name:port
28 #   EXAMPLE:29 #     listeners = PLAINTEXT://your.host.name:9092
30 listeners=PLAINTEXT://
32 # Hostname and port the broker will advertise to producers and consumers. If not set,33 # it uses the value for "listeners" ifconfigured.  Otherwise, it will use the value34 # returned from java.net.InetAddress.getCanonicalHostName().35 #advertised.listeners=PLAINTEXT://your.host.name:9092
37 # The number of threads handling network requests38 num.network.threads=3
40 # The number of threads doing disk I/O41 num.io.threads=8
43 # The send buffer (SO_SNDBUF) used by the socket server44 socket.send.buffer.bytes=102400
46 # The receive buffer (SO_RCVBUF) used by the socket server47 socket.receive.buffer.bytes=102400
49 # The maximum size of a request that the socket server will accept (protection against OOM)50 socket.request.max.bytes=104857600
53 ############################# Log Basics #############################54
55 # A comma seperated list of directories under whichto store log files56 log.dirs=/usr/local/kafka/kafka_2.11-
58 # The default number of log partitions per topic. More partitions allow greater59 # parallelism for consumption, but this will also result in morefiles across60 # the brokers.61 num.partitions=2
62 auto.create.topics.enable=false
64 # The number of threads per data directory to be used forlog recovery at startup and flushing at shutdown.65 # This value is recommended to be increased for installations with data dirs located inRAID array.66 num.recovery.threads.per.data.dir=1
68 ############################# Log Flush Policy #############################69
70 # Messages are immediately written to the filesystem but by default we only fsync() to sync
71 # the OS cache lazily. The following configurations control the flush of data to disk.72 # There are a few important trade-offs here:73 #    1. Durability: Unflushed data may be lost ifyou are not using replication.74 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.75 #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.76 # The settings below allow one to configure the flush policy to flush data after a period of timeor77 # every N messages (or both). This can be done globally and overridden on a per-topic basis.78
79 # The number of messages to accept before forcing a flush of data to disk80 #log.flush.interval.messages=10000
82 # The maximum amount of time a message can sit ina log before we force a flush83 #log.flush.interval.ms=1000
85 ############################# Log Retention Policy #############################86
87 # The following configurations control the disposal of log segments. The policy can88 # be set to delete segments after a period of time, or after a given size has accumulated.89 # A segment will be deleted whenever *either*of these criteria are met. Deletion always happens90 # from the end of the log.91
92 # The minimum age of a log file to be eligible fordeletion93 log.retention.hours=4
95 # A size-based retention policy for logs. Segments are pruned from the log as longas the remaining96 # segments don't drop below log.retention.bytes.
97 #log.retention.bytes=1073741824
99 # The maximum size of a log segment file. When this size is reached a new log segment will be created.100 log.segment.bytes=1073741824
102 # The interval at which log segments are checked to see ifthey can be deleted according103 # to the retention policies104 log.retention.check.interval.ms=300000
106 ############################# Zookeeper #############################107
108 # Zookeeper connection string (see zookeeper docs fordetails).109 # This is a comma separated host:port pairs, each corresponding to a zk110 # server. e.g. ",,".111 # You can also append an optional chroot stringto the urls to specify the112 # root directory forall kafka znodes.113 zookeeper.connect=,,
115 # Timeout in ms forconnecting to zookeeper116 zookeeper.connection.timeout.ms=6000

注意auto.create.topics.enable字段,若为true则如果producer写入某个不存在的topic时会自动创建该topic,若为false则需要事先创建否则会报错:failed after 3 retries。

2.2.2 命令

启动: bin/kafka-server-start.sh config/server.properties ,生产环境最好以守护程序启动:nohup  &

结束: bin/kafka-server-stop.sh

2.2.3 Kafka在Zookeeper中的存储结构

若上述的zookeeper.connect的值没有路径,则为根路径,启动Zookeeper和Kafka,命令行连接Zookeeper后,用 get / 命令可发现有 consumers、config、controller、admin、brokers、zookeeper、controller_epoch 这几个目录。

其结构如下:(具体可参考:apache kafka系列之在zookeeper中存储结构)

2.3 使用


2.3.1 命令行客户端

创建topic: bin/kafka-topics.sh--create  --zookeeper  localhost:2181--replication-factor 1  --partitions  1  --topic test

列出所有topic: bin/kafka-topics.sh --list --zookeeper localhost:2181

查看topic信息(包括分区、副本情况等): kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ,会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点

往某topic生产消息: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

从某topic消费消息: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (默认用一个线程消费指定topic的所有分区的数据)

删除某个Kafka groupid:连接Zookeeper后用rmr命令,如删除名为JSI的消费组: rmr /consumers/JSI


./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm --zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822 --topic GPS2各参数:--group指MirrorMaker消费源集群时指定的group.id-zkconnect指源集群的zookeeper地址--topic指定查的topic,没指定则返回所有topic的消费情况

2.3.2 Java客户端


1 importkafka.admin.DeleteTopicCommand;2 importkafka.admin.TopicCommand;3
4 /**
5 *@authorzsm6 * @date 2016年9月27日 上午10:26:427 *@version1.08 * @parameter9 *@since
10 *@return
11  */
12 public classJTopic {13     public static void createTopic(String zkAddr, String topicName, int partition, intreplication) {14         String[] options = new String[] { "--create", "--zookeeper", zkAddr, "--topic", topicName, "--partitions",15                 partition + "", "--replication-factor", replication + ""};16 TopicCommand.main(options);17 }18
19     public static voidlistTopic(String zkAddr) {20         String[] options = new String[] { "--list", "--zookeeper", zkAddr };21 TopicCommand.main(options);22 }23
24     public static voiddescribeTopic(String zkAddr, String topicName) {25         String[] options = new String[] { "--describe", "--zookeeper", zkAddr, "--topic", topicName, };26 TopicCommand.main(options);27 }28
29     public static voidalterTopic(String zkAddr, String topicName) {30         String[] options = new String[] { "--alter", "--zookeeper", zkAddr, "--topic", topicName, "--partitions", "5"};31 TopicCommand.main(options);32 }33
34     //通过删除zk里面对应的路径来实现删除topic的功能,只会删除zk里面的信息,Kafka上真实的数据并没有删除
35     public static voiddeleteTopic(String zkAddr, String topicName) {36         String[] options = new String[] { "--zookeeper", zkAddr, "--topic", topicName };37 DeleteTopicCommand.main(options);38 }39
40     public static voidmain(String[] args) {41         //TODO Auto-generated method stub
43         String myTestTopic = "ZsmTestTopic";44         int myPartition = 4;45         int myreplication = 1;46
47         //createTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic, myPartition, myreplication);48         //listTopic(ConfigureAPI.KafkaProperties.ZK);
49 describeTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);50         //alterTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);51         //deleteTopic(ConfigureAPI.KafkaProperties.ZK, myTestTopic);
52 }53
54 }

1 packagecom.zsm.kfkdemo;2
3 importjava.util.ArrayList;4 importjava.util.List;5 importjava.util.Properties;6
7 importcom.zsm.kfkdemo.ConfigureAPI.KafkaProperties;8
9 importkafka.javaapi.producer.Producer;10 importkafka.producer.KeyedMessage;11 importkafka.producer.ProducerConfig;12
13 /**
14 * 可以指定规则(key和分区函数)以让消息写到特定分区:15 * <p>16 * 1、若发送的消息没有指定key则Kafka会随机选择一个分区17 * </p>18 * <p>19 * 2、否则,若指定了分区函数(通过partitioner.class)则该函数以key为参数确定写到哪个分区20 * </p>21 * <p>22 * 3、否则,Kafka根据hash(key)%partitionNum确定写到哪个分区23 * </p>24 *25 *@authorzsm26 * @date 2016年9月27日 上午10:26:4227 *@version1.028 * @parameter29 *@since
30 *@return
31  */
32 public class JProducer extendsThread {33     private Producer<String, String>producer;34     privateString topic;35     private final int SLEEP = 10;36     private final int msgNum = 1000;37
38     publicJProducer(String topic) {39         Properties props = newProperties();40         props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);//如192.168.6.127:9092,         //request.required.acks42         //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees43         //(some data will be lost when a server fails).44         //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server45         //acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).46         //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be47         //lost as long as at least one in sync replica remains.
48         props.put("request.required.acks", "-1");49         //配置value的序列化类
50         props.put("serializer.class", "kafka.serializer.StringEncoder");51         //配置key的序列化类
52         props.put("key.serializer.class", "kafka.serializer.StringEncoder");53         //提供自定义的分区函数将消息写到分区上,未指定的话Kafka根据hash(messageKey)%partitionNum确定写到哪个分区
54         props.put("partitioner.class", "com.zsm.kfkdemo.MyPartitioner");55         producer = new Producer<String, String>(newProducerConfig(props));56         this.topic =topic;57 }58
59 @Override60     public voidrun() {61         boolean isBatchWriteMode = true;62         System.out.println("isBatchWriteMode: " +isBatchWriteMode);63         if(isBatchWriteMode) {64             //批量发送
65             int batchSize = 100;66             List<KeyedMessage<String, String>> msgList = new ArrayList<KeyedMessage<String, String>>(batchSize);67             for (int i = 0; i < msgNum; i++) {68                 String msg = "Message_" +i;69                 msgList.add(new KeyedMessage<String, String>(topic, i + "", msg));70                 //msgList.add(new KeyedMessage<String, String>(topic, msg));//未指定key,Kafka会自动选择一个分区
71                 if (i % batchSize == 0) {72 producer.send(msgList);73                     System.out.println("Send->[" + msgList + "]");74 msgList.clear();75                     try{76 sleep(SLEEP);77                     } catch(Exception ex) {78 ex.printStackTrace();79 }80 }81 }82 producer.send(msgList);83         } else{84             //单个发送
85             for (int i = 0; i < msgNum; i++) {86                 KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, i + "", "Message_" +i);87                 //KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, "Message_" + i);//未指定key,Kafka会自动选择一个分区
88 producer.send(msg);89                 System.out.println("Send->[" + msg + "]");90                 try{91 sleep(SLEEP);92                 } catch(Exception ex) {93 ex.printStackTrace();94 }95 }96 }97
98         System.out.println("send done");99 }100
101     public static voidmain(String[] args) {102         JProducer pro = newJProducer(KafkaProperties.TOPIC);103 pro.start();104 }105 }

3、读:(对于Consumer,需要注意 auto.commit.enable 和 auto.offset.reset 这两个字段)

1 packagecom.zsm.kfkdemo;2
3 importjava.text.MessageFormat;4 importjava.util.HashMap;5 importjava.util.List;6 importjava.util.Map;7 importjava.util.Properties;8
9 importcom.zsm.kfkdemo.ConfigureAPI.KafkaProperties;10
11 importkafka.consumer.Consumer;12 importkafka.consumer.ConsumerConfig;13 importkafka.consumer.ConsumerIterator;14 importkafka.consumer.KafkaStream;15 importkafka.javaapi.consumer.ConsumerConnector;16 importkafka.message.MessageAndMetadata;17
18 /**
19 * 同一consumer group的多线程消费可以两种方法实现:20 * <p>21 * 1、实现单线程客户端,启动多个去消费22 * </p>23 * <p>24 * 2、在客户端的createMessageStreams里为topic指定大于1的线程数,再启动多个线程处理每个stream25 * </p>26 *27 *@authorzsm28 * @date 2016年9月27日 上午10:26:4229 *@version1.030 * @parameter31 *@since
32 *@return
33  */
34 public class JConsumer extendsThread {35
36     privateConsumerConnector consumer;37     privateString topic;38     private final int SLEEP = 20;39
40     publicJConsumer(String topic) {41         consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());42         this.topic =topic;43 }44
45     privateConsumerConfig consumerConfig() {46         Properties props = newProperties();47         props.put("zookeeper.connect", KafkaProperties.ZK);48         props.put("group.id", KafkaProperties.GROUP_ID);49         props.put("auto.commit.enable", "true");//默认为true,让consumer定期commit offset,zookeeper会将offset持久化,否则只在内存,若故障则再消费时会从最后一次保存的offset开始
50         props.put("auto.commit.interval.ms", KafkaProperties.INTERVAL + "");//经过INTERVAL时间提交一次offset
51         props.put("auto.offset.reset", "largest");//What to do when there is no initial offset in ZooKeeper or if an offset is out of range
52         props.put("zookeeper.session.timeout.ms", KafkaProperties.TIMEOUT + "");53         props.put("zookeeper.sync.time.ms", "200");54         return newConsumerConfig(props);55 }56
57 @Override58     public voidrun() {59         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();60         topicCountMap.put(topic, new Integer(1));//线程数
61         Map<String, List<KafkaStream<byte[], byte[]>>> streams =consumer.createMessageStreams(topicCountMap);62         KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);//若上面设了多个线程去消费,则这里需为每个stream开个线程做如下的处理
64         ConsumerIterator<byte[], byte[]> it =stream.iterator();65         MessageAndMetadata<byte[], byte[]> messageAndMetaData = null;66         while(it.hasNext()) {67             messageAndMetaData =it.next();68             System.out.println(MessageFormat.format("Receive->[ message:{0} , key:{1} , partition:{2} , offset:{3} ]",69                     new String(messageAndMetaData.message()), newString(messageAndMetaData.key()),70                     messageAndMetaData.partition() + "", messageAndMetaData.offset() + ""));71             try{72 sleep(SLEEP);73             } catch(Exception ex) {74 ex.printStackTrace();75 }76 }77 }78
79     public static voidmain(String[] args) {80         JConsumer con = newJConsumer(KafkaProperties.TOPIC);81 con.start();82 }83 }

1         <dependency>
2             <groupId>org.apache.kafka</groupId>
3             <artifactId>kafka_2.9.2</artifactId>
4             <version></version>
5             <exclusions>
6                 <exclusion>
7                     <groupId>com.sun.jmx</groupId>
8                     <artifactId>jmxri</artifactId>
9                 </exclusion>
10                 <exclusion>
11                     <groupId>com.sun.jdmk</groupId>
12                     <artifactId>jmxtools</artifactId>
13                 </exclusion>
14                 <exclusion>
15                     <groupId>javax.jms</groupId>
16                     <artifactId>jms</artifactId>
17                 </exclusion>
18             </exclusions>
19         </dependency>

3 MirrorMaker



3.1 使用

运行 ./kafka-run-class.sh kafka.tools.MirrorMaker --help 查看使用说明,如下:

1 Option                                  Description2 ------                                  -----------
3 --blacklist <Java regex (String)>Blacklist of topics to mirror.4 --consumer.config <config file>Consumer config to consume from a5 source cluster. You may specify6 multiple of these.7 --help                                  Print this message.8 --num.producers <Integer: Number of     Number of producer instances (default:9   producers>                              1)10 --num.streams <Integer: Number of       Number of consumption streams.11   threads>                                (default: 1)12 --producer.config <config file>Embedded producer config.13 --queue.size <Integer: Queue size inNumber of messages that are buffered14   terms of number of messages>between the consumer and producer15                                           (default: 10000)16 --whitelist <Java regex (String)>       Whitelist of topics to mirror.

3.2 启动

./bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config  zsmSourceClusterConsumer.config  --num.streams 2 --producer.config zsmTargetClusterProducer.config --whitelist="ds*"--consumer.config所指定的文件里至少需要有zookeeper.connect、group.id两字段--producer.config至少需要有metadata.broker.list字段,指定目标集群的brooker列表--whitelist指定要同步的topic


4 Kafka监控工具(KafkaOffsetMonitor)



java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \com.quantifind.kafka.offsetapp.OffsetGetterWeb \--zk,, \--port 8087 \--refresh 10.seconds \--retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log &

其中,zk按照host1:port1,host2:port2…的格式去写即可,port为开启web界面的端口号,refresh为刷新时间,retain为数据保留时间(单位seconds, minutes, hours, days)

5 Kafka集群管理工具(Kafka Manager)





5.1 安装

需要从Github下载源码并安装sbt工具编译生成安装包,生成的时间很长且不知为何一直出错,所以这里用网友已编译好的包 (备份链接)。



unzip kafka-manager-1.0-SNAPSHOT.zip




./bin/kafka-manager -Dconfig.file=conf/application.conf (启动后在Zookeeper根目录下可发现增加了kafka-manager目录)


./bin/kafka-manager -Dhttp.port=9001 -Dkafka-manager.zkhosts=",,"

5.2 使用

访问web页面,在Cluster->Add Cluster,输入要监控的Kafka集群的Zookeeper即可。

6 进阶

  • 在当前的kafka版本实现中,对于zookeeper的所有操作都是由kafka controller来完成的(serially的方式)
  • offset管理:kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted topic中。同时Kafka又在内存中维护了三元组来维护最新的offset信息,consumer来取最新offset信息时直接从内存拿即可。当然,kafka允许你快速checkpoint最新的offset信息到磁盘上。
  • 如何确定分区数:分区数的确定与硬件、软件、负载情况等都有关,要视具体情况而定,不过依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 =  Tt / max(Tp, Tc)

7 参考资料







