徒手撸框架--高并发环境下的请求合并
原文地址:https://www.xilidou.com/2018/01/22/merge-request/
在高并发系统中,我们经常遇到这样的需求:系统产生大量的请求,但是这些请求实时性要求不高。我们就可以将这些请求合并,达到一定数量我们统一提交。最大化的利用系统性IO,提升系统的吞吐性能。
所以请求合并框架需要考虑以下两个需求:
- 当请求收集到一定数量时提交数据
- 一段时间后如果请求没有达到指定的数量也进行提交
我们就聊聊一如何实现这样一个需求。
阅读这篇文章你将会了解到:
- ScheduledThreadPoolExecutor
- 阻塞队列
- 线程安全的参数
- LockSuppor的使用
设计思路和实现
我们就聊一聊实现这个东西的具体思路是什么。希望大家能够学习到分析问题,设计模块的一些套路。
底层使用什么数据结构来持有需要合并的请求?
- 既然我们的系统是在高并发的环境下使用,那我们肯定不能使用,普通的
ArrayList
来持有。我们可以使用阻塞队列来持有需要合并的请求。 - 我们的数据结构需要提供一个 add() 的方法给外部,用于提交数据。当外部add数据以后,需要检查队列里面的数据的个数是否达到我们限额?达到数量提交数据,不达到继续等待。
- 数据结构还需要提供一个timeOut()的方法,外部有一个计时器定时调用这个timeOut方法,如果方法被调用,则直接向远程提交数据。
- 条件满足的时候线程执行提交动作,条件不满足的时候线程应当暂停,等待队列达到提交数据的条件。所以我们可以考虑使用
LockSuppor.park()
和LockSuppor.unpark
来暂停和激活操作线程。
- 既然我们的系统是在高并发的环境下使用,那我们肯定不能使用,普通的
经过上面的分析,我们就有了这样一个数据结构:
private static class FlushThread<Item> implements Runnable{private final String name;//队列大小private final int bufferSize;//操作间隔private int flushInterval;//上一次提交的时间。private volatile long lastFlushTime;private volatile Thread writer;//持有数据的阻塞队列private final BlockingQueue<Item> queue;//达成条件后具体执行的方法private final Processor<Item> processor;//构造函数public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {this.name = name;this.bufferSize = bufferSize;this.flushInterval = flushInterval;this.lastFlushTime = System.currentTimeMillis();this.processor = processor;this.queue = new ArrayBlockingQueue<>(queueSize);}//外部提交数据的方法public boolean add(Item item){boolean result = queue.offer(item);flushOnDemand();return result;}//提供给外部的超时方法public void timeOut(){//超过两次提交超过时间间隔if(System.currentTimeMillis() - lastFlushTime >= flushInterval){start();}}//解除线程的阻塞private void start(){LockSupport.unpark(writer);}//当前的数据是否大于提交的条件private void flushOnDemand(){if(queue.size() >= bufferSize){start();}}//执行提交数据的方法public void flush(){lastFlushTime = System.currentTimeMillis();List<Item> temp = new ArrayList<>(bufferSize);int size = queue.drainTo(temp,bufferSize);if(size > 0){try {processor.process(temp);}catch (Throwable e){log.error("process error",e);}}}//根据数据的尺寸和时间间隔判断是否提交private boolean canFlush(){return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;}@Overridepublic void run() {writer = Thread.currentThread();writer.setName(name);while (!writer.isInterrupted()){while (!canFlush()){//如果线程没有被打断,且不达到执行的条件,则阻塞线程LockSupport.park(this);}flush();}}}
- 如何实现定时提交呢?
通常我们遇到定时相关的需求,首先想到的应该是使用 ScheduledThreadPoolExecutor
定时来调用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()
...那需要再努力学习,多看源码了。
- 怎样进一步的提升系统的吞吐量?
我们使用的FlushThread
实现了 Runnable
所以我们可以考虑使用线程池来持有多个FlushThread
。
所以我们就有这样的代码:
public class Flusher<Item> {private final FlushThread<Item>[] flushThreads;private AtomicInteger index;//防止多个线程同时执行。增加一个随机数间隔private static final Random r = new Random();private static final int delta = 50;private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);private static ExecutorService POOL = Executors.newCachedThreadPool();public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {this.flushThreads = new FlushThread[threads];if(threads > 1){index = new AtomicInteger();}for (int i = 0; i < threads; i++) {final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);flushThreads[i] = flushThread;POOL.submit(flushThread);//定时调用 timeOut()方法。TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);}}// 对 index 取模,保证多线程都能被addpublic boolean add(Item item){int len = flushThreads.length;if(len == 1){return flushThreads[0].add(item);}int mod = index.incrementAndGet() % len;return flushThreads[mod].add(item);}//上文已经描述private static class FlushThread<Item> implements Runnable{...省略}
}
- 面向接口编程,提升系统扩展性:
public interface Processor<T> {void process(List<T> list);
}
使用
我们写个测试方法测试一下:
//实现 Processor 将 String 全部输出
public class PrintOutProcessor implements Processor<String>{@Overridepublic void process(List<String> list) {System.out.println("start flush");list.forEach(System.out::println);System.out.println("end flush");}
}
public class Test {public static void main(String[] args) throws InterruptedException {Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());int index = 1;while (true){stringFlusher.add(String.valueOf(index++));Thread.sleep(1000);}}
}
执行的结果:
start flush
1
2
3
end flush
start flush
4
5
6
7
end flush
我们发现并没有达到10个数字就触发了flush。因为出发了超时提交,虽然还没有达到规定的5
个数据,但还是执行了 flush。
如果我们去除 Thread.sleep(1000);
再看看结果:
start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
每5个数一次提交。完美。。。。
总结
一个比较生动的例子给大家讲解了一些多线程的具体运用。学习多线程应该多思考多动手,才会有比较好的效果。希望这篇文章大家读完以后有所收获,欢迎交流。
github地址:https://github.com/diaozxin007/framework
徒手撸框架系列文章地址:
徒手撸框架--实现IoC
徒手撸框架--实现Aop
徒手撸框架--高并发环境下的请求合并相关推荐
- 【高并发】高并发环境下构建缓存服务需要注意哪些问题?我和阿里P9聊了很久!...
写在前面 周末,跟阿里的一个朋友(去年晋升为P9了)聊了很久,聊的内容几乎全是技术,当然了,两个技术男聊得最多的话题当然就是技术了.从基础到架构,从算法到AI,无所不谈.中间又穿插着不少天马行空的想象 ...
- 高并发环境下,6个构建缓存服务需要注意的问题
摘要:高并发环境下如何构建缓存服务,你知道吗? 本文分享自华为云社区<[高并发]高并发环境下构建缓存服务需要注意哪些问题?>,作者:冰 河. 缓存特征 (1)命中率:命中数/(命中数+没有 ...
- 【高并发】高并发环境下构建缓存服务需要注意哪些问题?我和阿里P9聊了很久!
写在前面 周末,跟阿里的一个朋友(去年晋升为P9了)聊了很久,聊的内容几乎全是技术,当然了,两个技术男聊得最多的话题当然就是技术了.从基础到架构,从算法到AI,无所不谈.中间又穿插着不少天马行空的想象 ...
- 高并发环境下如何优化Tomcat性能?看完我懂了!
来自:冰河技术 写在前面 Tomcat作为最常用的Java Web服务器,随着并发量越来越高,Tomcat的性能会急剧下降,那有没有什么方法来优化Tomcat在高并发环境下的性能呢? Tomcat运行 ...
- 【高并发】在高并发环境下该如何构建应用级缓存?
来自:冰河技术 写在前面 随着我们的系统负载越来越高,系统的性能就会有所下降,此时,我们可以很自然地想到使用缓存来解决数据读写性能低下的问题.但是,立志成为资深架构师的你,是否能够在高并发环境下合理并 ...
- oom 如何避免 高并发_【高并发】高并发环境下如何防止Tomcat内存溢出?看完我懂了!!...
[高并发]高并发环境下如何防止Tomcat内存溢出?看完我懂了!! 发布时间:2020-04-19 00:47, 浏览次数:126 , 标签: Tomcat 写在前面 随着系统并发量越来越高,Tomc ...
- 在高并发环境下该如何构建应用级缓存
摘要:立志成为资深架构师的你,是否能够在高并发环境下合理并且高效的构建应用级缓存呢? 本文分享自华为云社区<[高并发]在高并发环境下该如何构建应用级缓存?>,作者:冰 河. 随着我们的系统 ...
- tomcat 请求超时_高并发环境下如何优化Tomcat性能?看完我懂了!
来自:冰河技术 写在前面 Tomcat作为最常用的Java Web服务器,随着并发量越来越高,Tomcat的性能会急剧下降,那有没有什么方法来优化Tomcat在高并发环境下的性能呢? Tomcat运行 ...
- java支付宝支付_Java 高并发环境下的性能优化,揭秘支付宝技术内幕
前言 高并发经常会发生在有大活跃用户量,用户高聚集的业务场景中,如:秒杀活动,定时领取红包等. 为了让业务可以流畅的运行并且给用户一个好的交互体验,我们需要根据业务场景预估达到的并发量等因素,来设计适 ...
最新文章
- swift Sequence 和 SubSequence
- PHP SOAP 扩展的使用
- JAVA-初步认识-第五章-数组-常见操作-最值
- img.width一直是0的问题--记录(二)
- C++中关于配置文件的问题
- python解决https私密连接警告信息
- 【初探移动前端开发03】jQuery Mobile(上)
- LeetCode 22. 括号生成(回溯/DP)
- YOLO-LITE:专门面向CPU的实时目标检测
- Redis 实现用户积分排行榜
- js排序算法详解-快速排序
- JS 获得FileUpload1 的完整路径
- php导出页面为pdf文件大小,PHP HTML 生成 PDF|php快速导出pdf文件
- Greedy search 和 beam search
- 微信小程序针对iphoneX以上机型,获取底部高度
- 易共享android工具下载,EasyShare app
- 自然语言处理--模仿莎士比亚风格自动生成诗歌
- linux如何重新分区
- 原子核的奥秘:核力的发现
- 电商横幅BANNER素材PSD分层模板|多品类,都能借鉴!
热门文章
- SpringCloud学习指南【更新】
- python3,进程间的通信
- Ubuntu下好的PDF阅读器介绍
- zz 传苹果平板电脑的UI界面将具备“快速学习”功能
- 关于“抵制”易语言的通告
- A deep learning model integrating FCNNs and CRFs for brain tumor segmentation
- python程序写诗_pytorch下使用LSTM神经网络写诗实例
- 闪迪u盘量产工具万能版_我身边的“闪迪色”闪迪彩色手机U盘系列| 大家测573...
- android volley设置编码,Volley 概览 | Android 开发者 | Android Developers
- pvr转png工具_pngtosvg 一个神奇的在线工具