最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息。通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、 解耦、 削峰等几大好处,而且开始考虑最大的好处,可以实现架构的水平扩展,下游系统出现性能瓶颈,容器平台伸缩增加一些实例消费能力很快就提上来了,整体系统架构上不用任何变动。理论上,我们项目数据量再大整体架构上高可用都没有问题。在使用kafka过程中也遇到一些问题:

1. 消息逐渐积压,消费能力跟不上;

2.某个消费者实例因为某些异常原因挂掉,造成少量数据丢失的问题。

针对消费积压的问题,通过研究kafka多线程消费的原理,解决了消费积压的问题。所以,理解多线程的Consumer模型是非常有必要,对于我们正确处理kafka多线程消费很重要。

kafka多线程消费模式
说kafka多线程消费模式前,我们先来说下kafka本身设计的线程模型和ConcurrentmodificationException异常的原因。见官方文档:

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

ConcurrentmodificationException异常的出处见以下代码:

/**
     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
     * supported).
     * @throws IllegalStateException if the consumer has been closed
     * @throws ConcurrentModificationException if another thread already has the lock
     */
    private void acquire() {
        ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        refcount.incrementAndGet();
    }
该方法acquire 会在KafkaConsumer的大部分公有方法调用第一句就判断是否正在同一个KafkaConsumer被多个线程调用。

"正在"怎么理解呢?我们顺便看下KafkaConsumer的commitAsync 这个方法就知道了。

@Override
    public void commitAsync(OffsetCommitCallback callback) {
        acquire(); // 引用开始
        try {
            commitAsync(subscriptions.allConsumed(), callback);
        } finally {
            release(); //引用释放
        }
    }
我们看KafkaConsumer的release方法就是释放正在操作KafkaConsumer实例的引用。

/**
     * Release the light lock protecting the consumer from multi-threaded access.
     */
    private void release() {
        if (refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);
    }
通过以上的代码理解,我们可以总结出来kafka多线程的要点: kafka的KafkaConsumer必须保证只能被一个线程操作。

下面就来说说,我理解的Kafka能支持的两种多线程模型,首先,我们必须保证操作KafkaConsumer实例的只能是一个线程,那我们要想多线程只能用在消费ConsumerRecord List上动心思了。下面列举我理解的kafka多线程消费模式。

模式一  1个Consumer模型对应一个线程消费,最多可以有topic对应的partition个线程同时消费Topic。

模式二 1个Consumer和多个线程消费模型,保证只有一个线程操作KafkaConsumer,其它线程消费ConsumerRecord列表。

注意 第二种模式其实也可以支持多个Consumer,用户最多可以启用partition总数个Consumer实例,然后,模式二跟模式一唯一的差别就是模式二在单个Consuemr里面是多线程消费,而模式一单个Consumer里面是单线程消费。

以上两种kafka多线程消费模式优缺点对比:

kafka多线程消费模式实现    
关于多线程消费模式具体实现都是选择基于spring-kafka实现,毕竟站在巨人肩膀上,站的高望的远少加班???,以下就是模式二的具体实现,模式一的话就是对模式二的简化,具体实现如下。

@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Value("${kafka.bootstrap-servers}")
    private String servers;
 
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch-size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
 
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
 
    @Value("${msg.consumer.max.poll.records}")
    private int maxPollRecords;
 
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }
 
    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
 
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        // 此处并发度设置的都是Consumer个数,可以设置1到partition总数,
        // 但是,所有机器实例上总的并发度之和必须小于等于partition总数
        // 如果,总的并发度小于partition总数,有一个Consumer实例会消费超过一个以上partition
        factory.setConcurrency(2);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
 
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        return propsMap;
    }
 
}
具体业务代码在BaseConsumer:

public abstract class BaseConsumer implements ApplicationListener<ConsumerStoppedEvent> {
 
    private static final Logger LOG = LoggerFactory.getLogger(BaseConsumer.class);
 
    @Value("${kafka.consumer.thread.min}")
    private int consumerThreadMin;
 
    @Value("${kafka.consumer.thread.max}")
    private int consumerThreadMax;
 
    private ThreadPoolExecutor consumeExecutor;
 
    private volatile boolean isClosePoolExecutor = false;
 
    @PostConstruct
    public void init() {
 
        this.consumeExecutor = new ThreadPoolExecutor(
                getConsumeThreadMin(),
                getConsumeThreadMax(),
                // 此处最大最小不一样没啥大的意义,因为消息队列需要达到 Integer.MAX_VALUE 才有点作用,
                // 矛盾来了,我每次批量拉下来不可能设置Integer.MAX_VALUE这么多,
                // 个人觉得每次批量下拉的原则 觉得消费可控就行,
                // 不然,如果出现异常情况下,整个服务示例突然挂了,拉下来太多,这些消息会被重复消费一次。
                1000 * 60,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
    }
 
    /**
     * 收到spring-kafka 关闭Consumer的通知
     * @param event 关闭Consumer 事件
     */
    @Override
    public void onApplicationEvent(ConsumerStoppedEvent event) {
 
        isClosePoolExecutor = true;
        closeConsumeExecutorService();
 
    }
 
    private void closeConsumeExecutorService() {
 
        if (!consumeExecutor.isShutdown()) {
 
            ThreadUtil.shutdownGracefully(consumeExecutor, 120, TimeUnit.SECONDS);
            LOG.info("consumeExecutor stopped");
 
        }
 
    }
 
    @PreDestroy
    public void doClose() {
        if (!isClosePoolExecutor) {
            closeConsumeExecutorService();
        }
    }
 
    @KafkaListener(topics = "${msg.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
    public void onMessage(List<String> msgList, Acknowledgment ack) {
 
        CountDownLatch countDownLatch = new CountDownLatch(msgList.size());
 
        for (String message : msgList) {
            submitConsumeTask(message, countDownLatch);
        }
 
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOG.error("countDownLatch exception ", e);
        }
 
        // 本次批量消费完,手动提交
        ack.acknowledge();
        LOG.info("finish commit offset");
 
    }
 
    private void submitConsumeTask(String message, CountDownLatch countDownLatch) {
        consumeExecutor.submit(() -> {
            try {
                onDealMessage(message);
            } catch (Exception ex) {
                LOG.error("on DealMessage exception:", ex);
            } finally {
                countDownLatch.countDown();
            }
        });
    }
 
    /**
     * 子类实现该抽象方法处理具体消息的业务逻辑
     * @param message kafka的消息
     */
    protected abstract void onDealMessage(String message);
 
    private int getConsumeThreadMax() {
        return consumerThreadMax;
    }
 
    private int getConsumeThreadMin() {
        return consumerThreadMin;
    }
 
    public void setConsumerThreadMax(int consumerThreadMax) {
        this.consumerThreadMax = consumerThreadMax;
    }
 
    public void setConsumerThreadMin(int consumerThreadMin) {
        this.consumerThreadMin = consumerThreadMin;
    }
}
其中,closeConsumeExecutorService方法就是为了服务实例异常退出或者多机房上线kill的情况下,尽最大可能保证本次拉下来的任务被消费掉。最后,附上closeConsumeExecutorService实现,觉得RocketMQ源码这个实现的不错,就借用过来了,在此表示感谢。

public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
        // Disable new tasks from being submitted.
        executor.shutdown();
        try {
            // Wait a while for existing tasks to terminate.
            if (!executor.awaitTermination(timeout, timeUnit)) {
                executor.shutdownNow();
                // Wait a while for tasks to respond to being cancelled.
                if (!executor.awaitTermination(timeout, timeUnit)) {
                    LOGGER.warn(String.format("%s didn't terminate!", executor));
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted.
            executor.shutdownNow();
            // Preserve interrupt status.
            Thread.currentThread().interrupt();
        }
    }
下面回到使用kafka遇到的第二个问题,怎么解决消费者实例因为某些原因挂掉,造成少量数据丢失的问题。其实,通过我们上面的写法,已经不会出现因为某些原因服务实例(docker、物理机)挂掉,丢数据的情况。因为我们是先拉取后消费,消费完才手动提交kafka确认offset。实在还存在万一退出时候调用的closeConsumeExecutorService方法还没有消费完数据,表示这个时候offset肯定没有手动提交,这一部分数据也不会丢失,会在服务实例恢复了重新拉取消费。

以上的代码存在极小的可能瑕疵,比如,我们双机房切换上线,某机房实例有一部分数据没有消费,下次会重复消费的问题。其实,这个问题我们在业务上通过在配置中心配置一个标识符来控制,当改变标识符控制某些机房停止拉取kafka消息,这个时候我们就可以安全操作,不担心kafka没有消费完,下次重复消费的问题了。
————————————————
版权声明:本文为CSDN博主「Johnniecsdn」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Johnnyz1234/article/details/98318528

正确处理kafka多线程消费的姿势相关推荐

  1. kafka多线程消费

    1.zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper 2.kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN ...

  2. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  3. java kafka 多线程消费

    我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...

  4. kafka 多线程消费

    一. 1.Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费).即消费并行度和分区数一致. ...

  5. 几种kafka多线程消费方式

    kafka API   https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/Kafka ...

  6. 一看就会的kafka多线程顺序消费【内附Demo哦】

    Hello,这里是爱 Coding,爱 Hiphop,爱喝点小酒的 AKA 柏炎. Kafka是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于 ...

  7. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  8. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  9. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

最新文章

  1. jQuery 1.9使用$.support替代$.browser的使用方法
  2. 为什么计算机系统安全具有整体性质,操作系统全局性质的形式化描述和验证
  3. java string 后几位_java中String占几个位元组
  4. java的知识点29——join:合并线程 插队线程、线程的状态
  5. AI:2020年6月22日北京智源大会演讲分享之09:00-09:50 全体大会《AI精度与隐私的博弈》
  6. 【图像处理】——实现二值图像的轮廓边界跟踪以及轮廓面积周长的求解(connectedComponentsWithStats()函数和connectedComponents()函数)
  7. html文字依次显示,利用定时器和css3动画制作文字依次渐变显示的效果
  8. 王者荣耀8月15日服务器维护,王者荣耀8月15日更新维护到什么时候 王者荣耀8月15日更新时间分享...
  9. java https post get请求_JAVA利用HttpClient进行POST和GET请求(HTTPS)
  10. matlab常用操作 随手记录
  11. vsftp.conf 配置文件详解
  12. python center用法_【语言学习】python——字符串
  13. OSChina 周六乱弹 —— 程序猿到底有多少个不解风情的瞬间?
  14. Elasticsearch入门四:Elasticsearch-搜索、过滤、聚合
  15. 【微信小程序】性能分析Trace工具
  16. 标准ASCII编码表
  17. 家庭记账的最简单方法
  18. Ubuntu出现“dpkg: 依赖关系问题使得libbsd0:i386的配置工作不能继续”错误
  19. 崩坏3服务器维护2月8号,《崩坏3》2月8日更新内容 符华月轮正式上线
  20. 错误:Error: Could not open client transport with JDBC Uri: jdbc:hive2://ducking:10000: java.net.Connec

热门文章

  1. 软件工程领域相关的技术标准_女生是否适合学习软件工程专业,以及是否能够有好的就业机会...
  2. Shell的sort、uniq、tr、cut、命令和 正则表达式
  3. 给刚开始学习Linux的小白们的福利——资源已经分享,可随时下载
  4. mysql将大表定时转储_mysql数据库数据定时封装转储
  5. ast java_Java代码分析器(一): JDT入门
  6. android 定制ui,AndroidSDK-UI定制
  7. linux高级运维笔试简答题及答案,企业linux初级和高级运维面试常问题目问答总结技巧讲解(2020年录制)...
  8. jquery parsley ajax,用户体验超棒且功能强大使用简单的javascript表单验证 - Parsley.js...
  9. 程序员吐槽_阿里程序员回老家被哥们吐槽,破IT就破IT,还阿里巴巴
  10. 计算机技术大神,2017考研:计算机科学与技术学科大神给你的套路