全局有序?

业务: 1 2 3 5
消费: 1 5 2 3

1.一个topic 一个分区 3个 虽然保证全局有序,但是性能下降 生产(很多公司也在使用,或则没有吧太在意)
2.单分区有序,那么我们想方法把同一个特征数据写到一个分区

p0 p1 p2

id money
业务系统:
insert into t values(1,1)
update t set age= 200 where id =1
update t set age= 400 where id =1
update t set age= 1000000 where id =1
delete from t where id =1
最终的结果的是0条
如何把特征的数据写一个分区里面
producer send api (key,value)
key: erp.t.1 null
value:SQL数据
业务系统: 特征值
insert into t values(1,1) erp.t.1 hash 5 %3=1…2 -->p2
update t set age= 200 where id =1 erp.t.1
update t set age= 400 where id =1 erp.t.1
update t set age= 1000000 where id =1 erp.t.1
delete from t where id =1 erp.t.1
源码读取:https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
这5条记录都发送到一个分区 有序的发送 ,那么消费时也不会出现紊乱

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.kafka.clients.producer.internals;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;/*** The default partitioning strategy:* <ul>* <li>If a partition is specified in the record, use it* <li>If no partition is specified but a key is present choose a partition based on a hash of the key* <li>If no partition or key is present choose a partition in a round-robin fashion*/
public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}}

重点:

只要把同一个表的同一个主键的数据发到同一个分区即可(如果多数据库得加入数据库名)

分区定义如下:

private int partitionDefine(String keyToPartition) {if (keyToPartition == null) {return new Random().nextInt(numPartitions);} else {return Math.abs(keyToPartition.hashCode()) % numPartitions;}
}

传入的参数 tableName+主键
这样,消费到的数据就是有序的。不同的场景灵活运用即可。

Kafka如何做到全局有序相关推荐

  1. Kafka 如何保证消息全局有序

    全局顺序就目前的应用范围来讲,可以列举出来的也就限于binlog日志传输,如mysql binlog日志传输要求全局的顺序,不能有任何的乱序. 方法1 全局使用一个生产者 全局使用一个消费者(并严格到 ...

  2. 大数据面试重点之kafka(三)

    Kafka如何保证全局有序? 可回答:1)Kafka消费者怎么保证有序性?2)Kafka生产者写入数据怎么保证有序?3)Kafka可以保证 数据的局部有序,如何保证数据的全局有序?4)Kafka消息的 ...

  3. kafka架构与原理 ,消息的可靠性与一致性幂等性,数据存储、zookeeper、使用场景

    一.Kafka概述 Kafka作为一个商业级消息中间件 ,发布和订阅记录流,它类似于一个消息队列 先了解下Kafka的基本原理,然后通过对kakfa的存储机制.复制原理.同步原理.可靠性和持久性保证等 ...

  4. 携程实时计算平台架构与实践丨DataPipeline

    文 | 潘国庆 携程大数据平台实时计算平台负责人 本文主要从携程大数据平台概况.架构设计及实现.在实现当中踩坑及填坑的过程.实时计算领域详细的应用场景,以及未来规划五个方面阐述携程实时计算平台架构与实 ...

  5. 大数据面试题V2.0,641页,39w字

    大家好,我是蓦然 原文链接如下: 大数据面试题V2.0,641页,39w字面试题来源:牛客网大数据面经,从约500篇面经选取.https://mp.weixin.qq.com/s?__biz=MzI3 ...

  6. kafka虞兮叹三(消息有序消费)

    两种方案: 方案一,kafka topic 只设置一个partition分区   方案二,producer将消息发送到指定partition分区 解析: 方案一:kafka默认保证同一个partiti ...

  7. 原来这才是 Kafka!(多图+深入)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:xybaby cnblogs.com/xybaby/p/90 ...

  8. 再见,Kafka!再见,RocketMQ!

    在开源的业界已经有这么多消息队列中间件了,Pulsar 作为一个新势力到底有什么优点呢? Pulsar 自从出身就不断的再和其他的消息队列(Kafka,RocketMQ 等等)做比较. 但是 Puls ...

  9. kafka 学习总结

    本文转自: https://blog.csdn.net/iverson2010112228/article/details/82631554 主要内容 kafka系统架构概要介绍 kafka重要组件 ...

最新文章

  1. html5下拉智能,HTML5新增标签 + 智能表单
  2. 一些计算机知识的总结(转)
  3. 从30岁到35岁:为你的生命多积累一些厚度(转)
  4. k8s部署rook-ceph
  5. 高并发系统处理之——限流
  6. 如何自动保存邮件草稿
  7. android 环信消息红点,环信3.0获取会话消息列表
  8. hibernate异常之--count查询异常
  9. React开发(278):ant design message res保证正确信息提示
  10. java formfile_基于Struts文件上传(FormFile)详解
  11. linux so文件统一目录,linux加载指定目录的so文件
  12. tp5中php正则怎么写,详解tp5中phpmailer的使用
  13. android功耗(9)----MTK功耗问题需要哪种log
  14. SpringBoot之RabbitMQ的使用
  15. 2021扬州市高考成绩查询,2021高考成绩查询系统登录官网入口
  16. 来自微信团队的6个开源项目
  17. D-Bus 性能分析
  18. Xshell免费版安装 常用连接linux工具
  19. 硬盘服务器哪个好用吗,服务器用固态硬盘好还是机械硬盘好
  20. 初创公司几个投资人,各占多少股份合适

热门文章

  1. 移动端适配之二:visual viewport、layout viewport和ideal viewport介绍 1
  2. 生鲜连锁品牌天鲜配获千万级PreA轮战略投资,2018年销售额将达1亿元
  3. R语言中GCC编译的问题(续)
  4. mysqldump简单解析
  5. [实战]MVC5+EF6+MySql企业网盘实战(24)——视频列表
  6. 指向结构体变量的指针变量
  7. 误码率matlab怎么计算,Matlab 仿真(7,4)汉明码 传输误码率
  8. css颜色 333是什么颜色,纯css实现的颜色扇附图
  9. 三维点云学习(4)5-DBSCNA python 复现-2-kd-_tree加速
  10. Web Components系列(七) ——自定义组件的生命周期