问题描述

最近公司有个转发服务,业务逻辑是从kafka消费到大量的数据,然后放入一个队列中。之后用一个线程池,去消费这个队列。

但是发现这四个线程消费队列的地方又严重的延迟。特此想解决此问题。

贴代码

  • 往队列里push数据
void KafkaConsumer::msgConsume(RdKafka::Message* message, void* opaque)
{KafkaConsumer::Data cData;int errcode = message->err();if (errcode == RdKafka::ERR__TIMED_OUT){return;}else if (errcode == RdKafka::ERR_NO_ERROR)  //消费数据,放入队列{Data *pData=new Data;pData->buffer.writeBlock(static_cast<const char*>(message->payload()),static_cast<int>(message->len())); // payload 装载,载荷;这里就是里面的内容//pData->topic = message->topic()->name();  pData->topic = message->topic_name();   // 注意这里pData->ipartition = message->partition();_cMutex.lock();_cDataQue.push(pData); // 放入队列_cMutex.unlock();}else if (RdKafka::ERR__PARTITION_EOF){if (_exit_eof) _run = false;}else{LOG(INFO) << "kafkaConsumer--error: Consumer failed:" << message->errstr();}
}
  • 取队列数据,处理篇
void KafkaConsumer::run(void* param)
{int tag;memcpy(&tag,&param,sizeof(int));while (1){if (tag == CDATA){if(_cDataQue.size() == 0) {usleep(2000);continue;}_cMutex.lock();while(_cDataQue.size()>0) // 处理一次就都得处理完?!!{Data *pData = _cDataQue.pop(); // 队列中取出HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!SAFE_DELETE(pData);}_cMutex.unlock();} else {break;}}
}

代码错误分析

_cMutex.lock();
            while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
            {
                Data *pData = _cDataQue.pop(); // 队列中取出
                HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
                SAFE_DELETE(pData);
            }
            _cMutex.unlock();

线程在数据队列_cDataQue中的数据时,先上锁,然后不断的循环取出队列中的数据并处理。(取出数据 和处理数据在一起)

处理完每条数据之后delete.

当锁定时的整个队列中的数据处理完毕之后,解锁。

定义几个变量:

N : 锁时队列的长度

T1: pop 一条数据的时间

T2:HandleMsg 函数执行的时间

T3:push 一条数据的时间

此活动中的动作:

1. kafka消费到数据,锁队列,写队列,解锁队列。

2.数据解析线程,锁队列,读数据,解锁队列,处理数据。

此时的处理方式,几乎没有发挥多线程的优势,每次都是把锁时的队列的全部内容处理完。其他三个线程和生产数据的线程干等

t = N * (T1+T2) 的时间。 若此时是程序刚启动。kafka瞬间消费到很多数据成万条的数据。 那么t 将是一个很大的时间。且kafka消费到的数据还不能及时的存放如队列中。于是就造成了延迟。

隐患就是:

1.根本没发挥多线程的优势和能力

2.若数据量大,取数据和处理数据放一起,导致锁态占用的时间很长,影响其他线程(往queue里放数据的线程)干活

3.其他线程竞争不到,干等,浪费CPU时间。一个线程死干活,处理不完,数据堆积。延迟。

改进方法

1. 将取数据的地方放在锁的里面,处理数据的地方放在锁的外面。

2.每次取固定数量的nCount 个数据,放在一个容器里。然后出锁后慢慢处理。

同时,每次取固定数量的来处理,锁占用的时间是固定的,t = nCount * T1 .也就是说,其他3个处理线程和1个往queue里塞数据的线程。最多只等 3 * t 的时间就能拿到 queue的控制权,并对其进行操作。

而数据处理的时间 T2 与queue的操作(加锁,读取,塞入)没有关系。

不过要控制nCount的值,太小。锁的次数很频繁; 太大,t 的时间会变大。

这样多线程就用其来了。队列应用也灵活了。处理能力大大提升。

void KafkaConsumer::run(void* param)
{int tag;memcpy(&tag,&param,sizeof(int));while (1){if(_cDataQue.size() == 0) {usleep(2000);continue;}std::vector<Data*> vDatas;_cMutex.lock();while(_cDataQue.size()>0) {//上锁的时间尽量短,为其他线程争取到和写入线程腾出时间Data *pData = _cDataQue.pop(); // 队列中取出vDatas.push_back(pData);if(vDatas.size() > 10){ //这里能限制这个长度 ,最多弄10条。处理快,节省时间。break;}}_cMutex.unlock();// 将处理移除在锁之外,慢慢处理这些数据,处理完释放for(vector<Data*>::iterator iter = vDatas.begin(); iter != vDatas.end(); ++iter){Data *pData = *iter;HandleMsg(pData);SAFE_DELETE(pData);}    }
}

用生活实例来解释描述:

1.角色 : 大厨 (生产者) , 取餐台/口(queue),包子(数据),顾客(消费处理线程)

2.动作:生产数据(push进queue),取出数据(pop出queue),占住取餐台(Lock),放开取餐台(UNLock),吃包子(HandleMsg)

方案一

大厨们生产包子,锁住取餐口,放下包子。然后顾客1 占住取餐口,假如这里有10个包子,他就取一个吃了,再去一个吃了,直到10个取完吃完才离开取餐口。此时,大厨没法往里放包子,其他三个顾客都干等着。

方案二

大厨们生产包子,占住取餐口,放下包子。顾客1,占住取餐口,取了10个包子,去一边吃去。顾客2 ,马上来也取10个,然后一遍吃去。同理顾客3,4 也一样。当然这里只是理想情况,顾客1去完之后,也可能大厨又占住取餐口,放了1w个包子。

关键是,每次取餐口被占用的时间,之后顾客们取包子的时间。非常短。而且每个顾客取完之后就去一边吃包子。同时大家可能都在吃包子,实现了多线程处理。


哈哈。就酱紫。


多线程消费一个队列问题相关推荐

  1. redis list 实现消息队列 多线程消费

    redis list 实现消息队列 多线程消费 redis list 实现消息队列 多线程消费 redis list 实现消息队列 多线程消费 利用redis list 命令: Redis Brpop ...

  2. python 多进程——使用进程池,多进程消费的数据)是一个队列的时候,他会自动去队列里依次取数据...

    我的mac 4核,因此每次执行的时候同时开启4个线程处理: # coding: utf-8import time from multiprocessing import Pooldef long_ti ...

  3. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  4. java kafka 多线程消费

    我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...

  5. rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?

    上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...

  6. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  7. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  8. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  9. 【多线程】阻塞队列的C++多线程 实现 BlockingQueue

    阻塞队列在存放和获取队列中的数据时需要使用多线程,一个线程专门负责向队列中存放元素,另一个线程专门从队列中获取元素.也可多开辟跟多的线程进行存取. 规范的方法也正是存放和获取队列元素分别在不同的线程中 ...

最新文章

  1. 领域驱动设计(DDD)架构演进和DDD的几种典型架构介绍(图文详解)
  2. MIT人工智能独立设系!拆分EECS为EE、CS、AI+决策三部分,直接归学院管理
  3. 用python编写一个高效搜索代码工具
  4. SAP UI5 jQuery.sap.setObject
  5. SQL注入-SQL注入的WAF绕过(十六)
  6. SQL Server查询中特殊字符的处理方法
  7. fastJson null字符串转空 null数字转0
  8. 软考信息安全工程师学习笔记三(1.3 信息安全管理基础)
  9. 枚举算法:求解不等式
  10. 大数据可视化有什么优点
  11. 团队作业——四则运算网页版
  12. in the java search_Java SearchRequest.indices方法代碼示例
  13. 我在 B 站学习深度学习(生动形象,跃然纸上)
  14. matlab fft函数画幅度谱,如何在FFT幅度谱上绘制掩模线
  15. 计算机转换外界信息原理,高级文秘及办公自动化教程-计算机基础
  16. http post muti form
  17. 获取付费网站图标图片的方法
  18. EXCEL——自定义单元格格式
  19. java 线程锁Lock
  20. 打印机无法获取IP地址备忘录

热门文章

  1. 上课笔记-台大哲学概论(一)
  2. 高中生计算机竞赛培训,奥林匹克信息竞赛培训
  3. 西北工业大学计算机专业课考什么,2020西北工业大学计算机考研初试科目、参考书目、招生人数...
  4. CSS---(字体图标,插画,背景图)阿里Iconfont在线图标使用
  5. 易语言自绘系列教程(美易自绘)(第一部分)
  6. 《Linux/UNIX OpenLDAP实战指南》——2.3 Linux平台安装
  7. 解决动画库Animate.css在谷歌浏览器中没有效果的问题
  8. 等几何分析编程记录 --- 未完待续
  9. 使用while 循环1234568910
  10. CAD(布置厨洁具)(尺寸标注)5.12