
业务: 1 2 3 5
消费: 1 5 2 3

1.一个topic 一个分区 3个 虽然保证全局有序,但是性能下降 生产(很多公司也在使用,或则没有吧太在意)

p0 p1 p2

id money
insert into t values(1,1)
update t set age= 200 where id =1
update t set age= 400 where id =1
update t set age= 1000000 where id =1
delete from t where id =1
producer send api (key,value)
key: erp.t.1 null
业务系统: 特征值
insert into t values(1,1) erp.t.1 hash 5 %3=1…2 -->p2
update t set age= 200 where id =1 erp.t.1
update t set age= 400 where id =1 erp.t.1
update t set age= 1000000 where id =1 erp.t.1
delete from t where id =1 erp.t.1
这5条记录都发送到一个分区 有序的发送 ,那么消费时也不会出现紊乱

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.kafka.clients.producer.internals;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;/*** The default partitioning strategy:* <ul>* <li>If a partition is specified in the record, use it* <li>If no partition is specified but a key is present choose a partition based on a hash of the key* <li>If no partition or key is present choose a partition in a round-robin fashion*/
public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}}




private int partitionDefine(String keyToPartition) {if (keyToPartition == null) {return new Random().nextInt(numPartitions);} else {return Math.abs(keyToPartition.hashCode()) % numPartitions;}

传入的参数 tableName+主键


  1. Kafka 如何保证消息全局有序

