来自 ImportNew,作者:唐尤华

https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1

1. 为什么要写这篇文章

几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库。 但是,当深入实现细节时,我们想起了一位智者曾经说过的话:“细节决定成败”。最终我们意识到 NoSQL 不是解决所有问题的银弹,而 NoSQL vs RDMS 的答案是:“视情况而定”。

类似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了避免再犯同样的错误,我们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差异,以及如何确定各自框架的正确用法。

本文中用到的术语在这里有更详细的描述。

2. 分析并发框架的示例用例

3. 快速更新线程配置

在开始比较并发框架的之前,让我们快速复习一下如何配置最佳线程数以提高并行任务的性能。 这个理论适用于所有框架,并且在所有框架中使用相同的线程配置来度量性能。

  • 对于内存任务,线程的数量大约等于具有最佳性能的内核的数量,尽管它可以根据各自处理器中的超线程特性进行一些更改。

    • 例如,在8核机器中,如果对应用程序的每个请求都必须在内存中并行执行4个任务,那么这台机器上的负载应该保持为 @2 req/sec,在 ThreadPool 中保持8个线程。

  • 对于 I/O 任务,ExecutorService 中配置的线程数应该取决于外部服务的延迟。

    • 与内存中的任务不同,I/O 任务中涉及的线程将被阻塞,并处于等待状态,直到外部服务响应或超时。 因此,当涉及 I/O 任务线程被阻塞时,应该增加线程的数量,以处理来自并发请求的额外负载。

    • I/O 任务的线程数应该以保守的方式增加,因为处于活动状态的许多线程带来了上下文切换的成本,这将影响应用程序的性能。 为了避免这种情况,应该根据 I/O 任务中涉及的线程的等待时间按比例增加此机器的线程的确切数量以及负载。

参考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/

4. 性能测试结果

性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意: 这些结果仅对该配置有意义,并不表示一个框架比另一个框架更好)。

5. 使用执行器服务并行化 IO 任务

5.1 何时使用?

如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。

5.2 什么时候适用?

如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使情况变得更糟。

当外部服务延迟增加到 400ms 时,性能测试结果如下(请求速率 @50 req/sec,8核)。

5.3 所有任务按顺序执行示例

// I/O 任务:调用外部服务
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();// 合并来自外部服务的响应
// (内存中的任务将作为此操作的一部分执行)
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 构建最终响应并将其发送回客户端
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
return response;

5.4 I/O 任务与 ExecutorService 并行执行代码示例

// 添加 I/O 任务
List<Callable<String>> ioCallableTasks = new ArrayList<>();
ioCallableTasks.add(JsonService::getPosts);
ioCallableTasks.add(JsonService::getComments);
ioCallableTasks.add(JsonService::getAlbums);
ioCallableTasks.add(JsonService::getPhotos);// 调用所有并行任务
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);// 获取 I/O  操作(阻塞调用)结果
String posts = futuresOfIOTasks.get(0).get();
String comments = futuresOfIOTasks.get(1).get();
String albums = futuresOfIOTasks.get(2).get();
String photos = futuresOfIOTasks.get(3).get();// 合并响应(内存中的任务是此操作的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 构建最终响应并将其发送回客户端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

6. 使用执行器服务并行化 IO 任务(CompletableFuture)

与上述情况类似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务

6.1 何时使用?

如果没有 AsyncResponse,性能与 ExecutorService 相同。 如果多个 API 调用必须异步并且链接起来,那么这种方法更好(类似 Node 中的 Promises)。

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);// I/O 任务
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();// 从 I/O 任务(阻塞调用)获得响应
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();// 合并响应(内存中的任务将是此操作的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 构建最终响应并将其发送回客户端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

7. 使用 ExecutorService 并行处理所有任务

使用 ExecutorService 并行处理所有任务,并使用 @suspended AsyncResponse response 以非阻塞方式发送响应。

图片来自 http://tutorials.jenkov.com/java-nio/nio-vs-io.html
  • HTTP 线程处理传入请求的连接,并将处理传递给 Executor Pool,当所有任务完成后,另一个 HTTP 线程将把响应发送回客户端(异步非阻塞)。

  • 性能下降原因:

    • 在同步通信中,尽管 I/O 任务中涉及的线程被阻塞,但是只要进程有额外的线程来承担并发请求负载,它仍然处于运行状态。

    • 因此,以非阻塞方式保持线程所带来的好处非常少,而且在此模式中处理请求所涉及的成本似乎很高。

    • 通常,对这里讨论采用的例子使用异步非阻塞方法会降低应用程序的性能。

7.1 何时使用?

如果用例类似于服务器端聊天应用程序,在客户端响应之前,线程不需要保持连接,那么异步、非阻塞方法比同步通信更受欢迎。在这些用例中,系统资源可以通过异步、非阻塞方法得到更好的利用,而不仅仅是等待。

// 为异步执行提交并行任务
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);// 当 /posts API 返回响应时,它将与来自 /comments API 的响应结合在一起
// 作为这个操作的一部分,将执行内存中的一些任务
CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);// 当 /albums API 返回响应时,它将与来自 /photos API 的响应结合在一起
// 作为这个操作的一部分,将执行内存中的一些任务
CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);// 构建最终响应并恢复 http 连接,把响应发送回客户端
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);

8. RxJava

  • 这与上面的情况类似,唯一的区别是 RxJava 提供了更好的 DSL 可以进行流式编程,下面的例子中没有体现这一点。

  • 性能优于 CompletableFuture 处理并行任务。

8.1 何时使用?

如果编码的场景适合异步非阻塞方式,那么可以首选 RxJava 或任何响应式开发库。 还具有诸如 back-pressure 之类的附加功能,可以在生产者和消费者之间平衡负载。

int userId = new Random().nextInt(10) + 1;
ExecutorService executor = CustomThreads.getExecutorService(8);// I/O 任务
Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
.subscribeOn(Schedulers.from(executor));
Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
.subscribeOn(Schedulers.from(executor));
Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
.subscribeOn(Schedulers.from(executor));
Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
.subscribeOn(Schedulers.from(executor));// 合并来自 /posts 和 /comments API 的响应
// 作为这个操作的一部分,将执行内存中的一些任务
Observable<String> postsAndCommentsObservable = Observable
.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
.subscribeOn(Schedulers.from(executor));// 合并来自 /albums 和 /photos API 的响应
// 作为这个操作的一部分,将执行内存中的一些任务
Observable<String> albumsAndPhotosObservable = Observable
.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
.subscribeOn(Schedulers.from(executor));// 构建最终响应
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribeOn(Schedulers.from(executor))
.subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));

9. Disruptor

[Queue vs RingBuffer]

图片1: http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

图片2: https://www.baeldung.com/lmax-disruptor-concurrency

  • 在本例中,HTTP 线程将被阻塞,直到 disruptor 完成任务,并且使用 countdowlatch 将 HTTP 线程与 ExecutorService 中的线程同步。

  • 这个框架的主要特点是在没有任何锁的情况下处理线程间通信。在 ExecutorService 中,生产者和消费者之间的数据将通过 Queue传递,在生产者和消费者之间的数据传输过程中涉及到一个锁。 Disruptor 框架通过一个名为 Ring Buffer 的数据结构(它是循环数组队列的扩展版本)来处理这种生产者-消费者通信,并且不需要任何锁。

  • 这个库不适用于我们在这里讨论的这种用例。仅出于好奇而添加。

9.1 何时使用?

Disruptor 框架在下列场合性能更好:与事件驱动的体系结构一起使用,或主要关注内存任务的单个生产者和多个消费者。

static {int userId = new Random().nextInt(10) + 1;// 示例 Event-Handler; count down latch 用于使线程与 http 线程同步EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {event.posts = JsonService.getPosts();event.countDownLatch.countDown();};// 配置 Disputor 用于处理事件DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler).handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2).thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2).handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);DISRUPTOR.start();
}// 对于每个请求,在 RingBuffer 中发布一个事件:
Event event = null;
RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {event = ringBuffer.get(sequence);event.countDownLatch = countDownLatch;event.startTime = System.currentTimeMillis();
} finally {ringBuffer.publish(sequence);
}
try {event.countDownLatch.await();
} catch (InterruptedException e) {e.printStackTrace();
}

10. Akka

图片来自: https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/

  • Akka 库的主要优势在于它拥有构建分布式系统的本地支持。

  • 它运行在一个叫做 Actor System 的系统上。这个系统抽象了线程的概念,Actor System 中的 Actor 通过异步消息进行通信,这类似于生产者和消费者之间的通信。

  • 这种额外的抽象级别有助于 Actor System 提供诸如容错、位置透明等特性。

  • 使用正确的 Actor-to-Thread 策略,可以对该框架进行优化,使其性能优于上表所示的结果。 虽然它不能在单个节点上与传统方法的性能匹敌,但是由于其构建分布式和弹性系统的能力,仍然是首选。

10.1 示例代码

// 来自 controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());// handler :
public Receive createReceive() {return receiveBuilder().match(Request.class, request -> {Event event = request.event; // Ideally, immutable data structures should be used here.request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());}).match(Event.class, e -> {if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {int userId = new Random().nextInt(10) + 1;String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,e.comments);String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,e.photos);String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;e.response = response;e.countDownLatch.countDown();}}).build();
}

11. 总结

  • 根据机器的负载决定 Executor 框架的配置,并检查是否可以根据应用程序中并行任务的数量进行负载平衡。

  • 对于大多数传统应用程序来说,使用响应式开发库或任何异步库都会降低性能。只有当用例类似于服务器端聊天应用程序时,这个模式才有用,其中线程在客户机响应之前不需要保留连接。

  • Disruptor 框架在与事件驱动的架构模式一起使用时性能很好; 但是当 Disruptor 模式与传统架构混合使用时,就我们在这里讨论的用例而言,它并不符合标准。 这里需要注意的是,Akka 和 Disruptor 库值得单独写一篇文章,介绍如何使用它们来实现事件驱动的架构模式。

  • 这篇文章的源代码可以在 GitHub 上找到。

来,带你鸟瞰 Java 中的并发框架!相关推荐

  1. 互联网java常用框架_来,带你鸟瞰 Java 中4款常用的并发框架!

    1. 为什么要写这篇文章 几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库. 但是,当深入实现细节时,我们想起了一位智者曾经说过 ...

  2. 《Java并发编程的艺术》——Java中的并发工具类、线程池、Execute框架(笔记)

    文章目录 八.Java中的并发工具类 8.1 等待多线程完成的CountDownLatch 8.2 同步屏障CyclicBarrier 8.2.1 CyclicBarrier简介 8.2.2 Cycl ...

  3. 《Java并发编程的艺术》读后笔记-Java中的并发工具类(第八章)

    文章目录 <Java并发编程的艺术>读后笔记-Java中的并发工具类(第八章) 1.等待多线程完成的CountDownLatch 2.同步屏障CyclicBarrier 2.1 Cycli ...

  4. 一文带你理解Java中Lock的实现原理

    转载自   一文带你理解Java中Lock的实现原理 当多个线程需要访问某个公共资源的时候,我们知道需要通过加锁来保证资源的访问不会出问题.java提供了两种方式来加锁,一种是关键字:synchron ...

  5. java异常处理怎么加_带你了解Java中的异常处理(上)

    当当当当当当,各位看官,好久不见,甚是想念. 今天我们来聊聊Java里的一个小妖精,那就是异常. 什么是异常?什么是异常处理? 异常嘛,顾名思义就是不正常,(逃),是Java程序运行时,发生的预料之外 ...

  6. Java 中的 Swing 框架现在是不是被淘汰了?

    关于java中的Swing框架,我先说下如下的观点. 1 只要是用java开发的商业项目,就指着来挣钱的项目,都不会用Swing框架. 2 所以对java初学者来说,根本没必要学swing,甚至连类似 ...

  7. java中的集合框架_JAVA中的集合框架(上)List

    第一节 JAVA中的集合框架概述 集合的概念,现实生活中:很多事物凑在一起就是一个集合:数学中的集合:具有相同属性事物的总体:JAVA中的集合:是一种工具类,就像是容器,储存任意数量的具有共同属性的对 ...

  8. 【Log】(二)Java 中的日志框架 JCL、SLF

    [Log](一)Java 中的日志框架 JUL.Log4j [Log](二)Java 中的日志框架 JCL.SLF [Log](三)Java 中的日志框架 logback.log4j2 前言 JUL ...

  9. 一篇blog带你了解java中的锁

    前言 最近在复习锁这一块,对java中的锁进行整理,本文介绍各种锁,希望给大家带来帮助. Java的锁 乐观锁 乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人 ...

最新文章

  1. Nodejs.热部署方法
  2. Android Service的思考(3)
  3. R使用neuralnet包构建神经网络回归模型并与线性回归模型对比实战
  4. 苹果为何加强云计算布局 汤换药也换?
  5. java中集合的区别_Java中的集合与集合之间的区别
  6. VS Code的golang开发配置 之 代码提示
  7. ros(2) 发布者publisher的编程实现
  8. 1024告诉身边的程序员,今天他过节日
  9. dlut-KFQ概率上机2
  10. java was datasource_mybatis默认的数据源连接池(PooledDataSource和UnPooledDataSource)
  11. TensorFlow中的通信机制——Rendezvous(二)gRPC传输
  12. 国际标准UTC时间转化北京时间
  13. java mschart_vb之mschart控件小结
  14. 计算机EV录屏培训体会,停课不停学19|好用的EV录屏软件助力线上教学
  15. 如何服务器备份到移动硬盘,数据安全第一!威联通如何外接硬盘备份和同步
  16. Flash C++编译器
  17. FLV格式的视频歌曲地址600首,复制地址可插入外链播放器专用
  18. ios13 微信提示音插件_ios13怎么设置微信提示音
  19. 老人与科技:解决老人的“数字鸿沟”,全世界都在努力!
  20. Centos7- wget未找到命令,there are no enabled repos 解决办法

热门文章

  1. Java 文件及文件夹复制
  2. 我是如何用机器学习技术帮助 HR 省时间的
  3. docker对aufs触发的bug
  4. 海量存储之十八–一致性和高可用专题
  5. Yik-Chung Wu ---Time synchronization for wireless sensor networks
  6. OBS集成WebRTC
  7. Flask实战1-轻博客
  8. caffe学习(二):利用mnist数据集训练并进行手写数字识别(windows)
  9. 数字图像处理实验(5):PROJECT 04-01 [Multiple Uses],Two-Dimensional Fast Fourier Transform
  10. 可以ping通但远程桌面不行_【第1624期】HTML5:lt;agt;的ping属性之死亡ping与隐私追踪...