原文地址:https://www.xilidou.com/2018/01/22/merge-request/

在高并发系统中,我们经常遇到这样的需求:系统产生大量的请求,但是这些请求实时性要求不高。我们就可以将这些请求合并,达到一定数量我们统一提交。最大化的利用系统性IO,提升系统的吞吐性能。

所以请求合并框架需要考虑以下两个需求:

  1. 当请求收集到一定数量时提交数据
  2. 一段时间后如果请求没有达到指定的数量也进行提交

我们就聊聊一如何实现这样一个需求。

阅读这篇文章你将会了解到:

  • ScheduledThreadPoolExecutor
  • 阻塞队列
  • 线程安全的参数
  • LockSuppor的使用

设计思路和实现

我们就聊一聊实现这个东西的具体思路是什么。希望大家能够学习到分析问题,设计模块的一些套路。

  1. 底层使用什么数据结构来持有需要合并的请求?

    • 既然我们的系统是在高并发的环境下使用,那我们肯定不能使用,普通的ArrayList来持有。我们可以使用阻塞队列来持有需要合并的请求。
    • 我们的数据结构需要提供一个 add() 的方法给外部,用于提交数据。当外部add数据以后,需要检查队列里面的数据的个数是否达到我们限额?达到数量提交数据,不达到继续等待。
    • 数据结构还需要提供一个timeOut()的方法,外部有一个计时器定时调用这个timeOut方法,如果方法被调用,则直接向远程提交数据。
    • 条件满足的时候线程执行提交动作,条件不满足的时候线程应当暂停,等待队列达到提交数据的条件。所以我们可以考虑使用 LockSuppor.park()LockSuppor.unpark 来暂停和激活操作线程。

经过上面的分析,我们就有了这样一个数据结构:

private static class FlushThread<Item> implements Runnable{private final String name;//队列大小private final int bufferSize;//操作间隔private int flushInterval;//上一次提交的时间。private volatile long lastFlushTime;private volatile Thread writer;//持有数据的阻塞队列private final BlockingQueue<Item> queue;//达成条件后具体执行的方法private final Processor<Item> processor;//构造函数public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {this.name = name;this.bufferSize = bufferSize;this.flushInterval = flushInterval;this.lastFlushTime = System.currentTimeMillis();this.processor = processor;this.queue = new ArrayBlockingQueue<>(queueSize);}//外部提交数据的方法public boolean add(Item item){boolean result = queue.offer(item);flushOnDemand();return result;}//提供给外部的超时方法public void timeOut(){//超过两次提交超过时间间隔if(System.currentTimeMillis() - lastFlushTime >= flushInterval){start();}}//解除线程的阻塞private void start(){LockSupport.unpark(writer);}//当前的数据是否大于提交的条件private void flushOnDemand(){if(queue.size() >= bufferSize){start();}}//执行提交数据的方法public void flush(){lastFlushTime = System.currentTimeMillis();List<Item> temp = new ArrayList<>(bufferSize);int size = queue.drainTo(temp,bufferSize);if(size > 0){try {processor.process(temp);}catch (Throwable e){log.error("process error",e);}}}//根据数据的尺寸和时间间隔判断是否提交private boolean canFlush(){return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;}@Overridepublic void run() {writer = Thread.currentThread();writer.setName(name);while (!writer.isInterrupted()){while (!canFlush()){//如果线程没有被打断,且不达到执行的条件,则阻塞线程LockSupport.park(this);}flush();}}}
  1. 如何实现定时提交呢?

通常我们遇到定时相关的需求,首先想到的应该是使用 ScheduledThreadPoolExecutor定时来调用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()...那需要再努力学习,多看源码了。

  1. 怎样进一步的提升系统的吞吐量?

我们使用的FlushThread 实现了 Runnable 所以我们可以考虑使用线程池来持有多个FlushThread

所以我们就有这样的代码:


public class Flusher<Item> {private final FlushThread<Item>[] flushThreads;private AtomicInteger index;//防止多个线程同时执行。增加一个随机数间隔private static final Random r = new Random();private static final int delta = 50;private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);private static ExecutorService POOL = Executors.newCachedThreadPool();public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {this.flushThreads = new FlushThread[threads];if(threads > 1){index = new AtomicInteger();}for (int i = 0; i < threads; i++) {final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);flushThreads[i] = flushThread;POOL.submit(flushThread);//定时调用 timeOut()方法。TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);}}// 对 index 取模,保证多线程都能被addpublic boolean add(Item item){int len = flushThreads.length;if(len == 1){return flushThreads[0].add(item);}int mod = index.incrementAndGet() % len;return flushThreads[mod].add(item);}//上文已经描述private static class FlushThread<Item> implements Runnable{...省略}
}
  1. 面向接口编程,提升系统扩展性:
public interface Processor<T> {void process(List<T> list);
}

使用

我们写个测试方法测试一下:

//实现 Processor 将 String 全部输出
public class PrintOutProcessor implements Processor<String>{@Overridepublic void process(List<String> list) {System.out.println("start flush");list.forEach(System.out::println);System.out.println("end flush");}
}

public class Test {public static void main(String[] args) throws InterruptedException {Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());int index = 1;while (true){stringFlusher.add(String.valueOf(index++));Thread.sleep(1000);}}
}

执行的结果:


start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

我们发现并没有达到10个数字就触发了flush。因为出发了超时提交,虽然还没有达到规定的5
个数据,但还是执行了 flush。

如果我们去除 Thread.sleep(1000); 再看看结果:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush

每5个数一次提交。完美。。。。

总结

一个比较生动的例子给大家讲解了一些多线程的具体运用。学习多线程应该多思考多动手,才会有比较好的效果。希望这篇文章大家读完以后有所收获,欢迎交流。

github地址:https://github.com/diaozxin007/framework

徒手撸框架系列文章地址:

徒手撸框架--实现IoC

徒手撸框架--实现Aop

徒手撸框架--高并发环境下的请求合并相关推荐

  1. 【高并发】高并发环境下构建缓存服务需要注意哪些问题?我和阿里P9聊了很久!...

    写在前面 周末,跟阿里的一个朋友(去年晋升为P9了)聊了很久,聊的内容几乎全是技术,当然了,两个技术男聊得最多的话题当然就是技术了.从基础到架构,从算法到AI,无所不谈.中间又穿插着不少天马行空的想象 ...

  2. 高并发环境下,6个构建缓存服务需要注意的问题

    摘要:高并发环境下如何构建缓存服务,你知道吗? 本文分享自华为云社区<[高并发]高并发环境下构建缓存服务需要注意哪些问题?>,作者:冰 河. 缓存特征 (1)命中率:命中数/(命中数+没有 ...

  3. 【高并发】高并发环境下构建缓存服务需要注意哪些问题?我和阿里P9聊了很久!

    写在前面 周末,跟阿里的一个朋友(去年晋升为P9了)聊了很久,聊的内容几乎全是技术,当然了,两个技术男聊得最多的话题当然就是技术了.从基础到架构,从算法到AI,无所不谈.中间又穿插着不少天马行空的想象 ...

  4. 高并发环境下如何优化Tomcat性能?看完我懂了!

    来自:冰河技术 写在前面 Tomcat作为最常用的Java Web服务器,随着并发量越来越高,Tomcat的性能会急剧下降,那有没有什么方法来优化Tomcat在高并发环境下的性能呢? Tomcat运行 ...

  5. 【高并发】在高并发环境下该如何构建应用级缓存?

    来自:冰河技术 写在前面 随着我们的系统负载越来越高,系统的性能就会有所下降,此时,我们可以很自然地想到使用缓存来解决数据读写性能低下的问题.但是,立志成为资深架构师的你,是否能够在高并发环境下合理并 ...

  6. oom 如何避免 高并发_【高并发】高并发环境下如何防止Tomcat内存溢出?看完我懂了!!...

    [高并发]高并发环境下如何防止Tomcat内存溢出?看完我懂了!! 发布时间:2020-04-19 00:47, 浏览次数:126 , 标签: Tomcat 写在前面 随着系统并发量越来越高,Tomc ...

  7. 在高并发环境下该如何构建应用级缓存

    摘要:立志成为资深架构师的你,是否能够在高并发环境下合理并且高效的构建应用级缓存呢? 本文分享自华为云社区<[高并发]在高并发环境下该如何构建应用级缓存?>,作者:冰 河. 随着我们的系统 ...

  8. tomcat 请求超时_高并发环境下如何优化Tomcat性能?看完我懂了!

    来自:冰河技术 写在前面 Tomcat作为最常用的Java Web服务器,随着并发量越来越高,Tomcat的性能会急剧下降,那有没有什么方法来优化Tomcat在高并发环境下的性能呢? Tomcat运行 ...

  9. java支付宝支付_Java 高并发环境下的性能优化,揭秘支付宝技术内幕

    前言 高并发经常会发生在有大活跃用户量,用户高聚集的业务场景中,如:秒杀活动,定时领取红包等. 为了让业务可以流畅的运行并且给用户一个好的交互体验,我们需要根据业务场景预估达到的并发量等因素,来设计适 ...

最新文章

  1. swift Sequence 和 SubSequence
  2. PHP SOAP 扩展的使用
  3. JAVA-初步认识-第五章-数组-常见操作-最值
  4. img.width一直是0的问题--记录(二)
  5. C++中关于配置文件的问题
  6. python解决https私密连接警告信息
  7. 【初探移动前端开发03】jQuery Mobile(上)
  8. LeetCode 22. 括号生成(回溯/DP)
  9. YOLO-LITE:专门面向CPU的实时目标检测
  10. Redis 实现用户积分排行榜
  11. js排序算法详解-快速排序
  12. JS 获得FileUpload1 的完整路径
  13. php导出页面为pdf文件大小,PHP HTML 生成 PDF|php快速导出pdf文件
  14. Greedy search 和 beam search
  15. 微信小程序针对iphoneX以上机型,获取底部高度
  16. 易共享android工具下载,EasyShare app
  17. 自然语言处理--模仿莎士比亚风格自动生成诗歌
  18. linux如何重新分区
  19. 原子核的奥秘:核力的发现
  20. 电商横幅BANNER素材PSD分层模板|多品类,都能借鉴!

热门文章

  1. SpringCloud学习指南【更新】
  2. python3,进程间的通信
  3. Ubuntu下好的PDF阅读器介绍
  4. zz 传苹果平板电脑的UI界面将具备“快速学习”功能
  5. 关于“抵制”易语言的通告
  6. A deep learning model integrating FCNNs and CRFs for brain tumor segmentation
  7. python程序写诗_pytorch下使用LSTM神经网络写诗实例
  8. 闪迪u盘量产工具万能版_我身边的“闪迪色”闪迪彩色手机U盘系列| 大家测573...
  9. android volley设置编码,Volley 概览  |  Android 开发者  |  Android Developers
  10. pvr转png工具_pngtosvg 一个神奇的在线工具