1. RxJava 并行操作

被观察者( Observable/Flowable/Single/Completable/May )发射的数据流可以经历各种线程切换,但是数据流的各个元素之间不会产生并行执行的效果。井行不是并发,也不是同步,更不是异步。

并发( concurrency )是指一个处理器同时处理多个任务。并行( parallelism )是多个处理器

或者是多核的处理器同时处理多个不同的任务。井行是同时发生的多个并发事件,具有井发的含义,而并发则不一定是并行。

1.1 借助 flatMap 实现并行

在 RxJava 中可以借助 flatMap 操作符来实现

Observable.range(1, 100)

.flatMap(new Function>() {

@Override

public ObservableSource apply(Integer integer) throws Exception {

return Observable.just(integer)

.subscribeOn(Schedulers.computation())

.map(new Function() {

@Override

public String apply(Integer integer) throws Exception {

return integer.toString();

}

});

}

})

.subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

Log.d(TAG, "Next: " + s);

}

});

flatMap 操作符的原理是将这个 Observable 转化为为多个以原 Observable 发射的数据作为源数据 Observable,然后再将这多个 Observable 发射的数据整合发射出来。最后的顺序可能会交错地发射出来。

flatMap 会对原始 Observable 发射的每一项数据执行变换操作。在这里,生成的每个 Observable 使用线程池(指定了 computation 作为 Scheduler )并发地执行。

还可以使用 ExecutorService 来创建一个 Scheduler 对刚才的代码稍微做一些改动。

int threadNum = Runtime.getRuntime().availableProcessors() + 1;

final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);

final Scheduler scheduler = Schedulers.from(executorService);

Observable.range(1, 100)

.flatMap(new Function>() {

@Override

public ObservableSource apply(Integer integer) throws Exception {

return Observable.just(integer)

.subscribeOn(scheduler)

.map(new Function() {

@Override

public String apply(Integer integer) throws Exception {

return integer.toString();

}

});

}

})

.doFinally(new Action() {

@Override

public void run() throws Exception {

Log.d(TAG, "Finally.");

executorService.shutdown();

}

})

.subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

Log.d(TAG, "Next: " + s);

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "Error.");

}

}, new Action() {

@Override

public void run() throws Exception {

Log.d(TAG, "Complete.");

}

});

当完成所有的操作之后, executorService 需要执行 shutdown() 来关闭 ExecutorService。我们可以使用 doFinally 操作符来执行 shutdown()。

doFinally 操作符可以在 onError 或者 onComplete 之后调用指定的操作,或由下游处理。

1.2 通过 Round-Robin 算法实现并行

Round-Robin 算法是最简单的一种负载均衡算法。它的原理是把来自用户的请求轮流分配给内部的服务器:从服务器 1 开始,直到服务器 N、,然后重新开始循环,也被称为啥希取模法,是非常常用的数据分片方法。 Round-Robin 算法的优点是简洁,它无须记录当前所有连接的状态,所以是一种无状态调度。

通过 Round-Robin 算法把数据按线程数分组,例如分成 5 组,每组个数相同,一起发送处理。这样做的目的是可以减少 Observable 的创建 ,从而节省系统资源,但是会增加处理时间。Round-Robin 算法可以看成是对时间和空间的综合考虑。

final AtomicInteger batch = new AtomicInteger(0);

Observable.range(1, 100)

.groupBy(new Function() {

@Override

public Integer apply(Integer integer) throws Exception {

return batch.getAndIncrement() % 5;

}

})

.flatMap(new Function, ObservableSource>>() {

@Override

public ObservableSource> apply(GroupedObservable integerIntegerGroupedObservable) throws Exception {

return integerIntegerGroupedObservable.observeOn(Schedulers.io())

.map(new Function() {

@Override

public String apply(Integer integer) throws Exception {

return integer.toString();

}

});

}

})

.subscribe(new Consumer() {

@Override

public void accept(Object o) throws Exception {

Log.d(TAG, "Next: " + o);

}

});

2. ParallelFlowable

2.1 ParallelFlowable 介绍

RxJava 2.0.5 版本新增了 ParallelFlowable API ,它允许并行地执行一些操作符,例如 map、filter、concatMap、flatMap、collect、reduce 等。

public abstract class ParallelFlowable { }

ParallelFlowable 是并行的 Flowable 版本,并不是新增的被观察者类型。在 ParallelFlowable 中,很多典型的操作符( take、skip 等)是不可用的。

在 RxJava 并没有 ParallelObservable ,因为在 RxJava 2.x 之后, Observable 不再支持背压。

然而在并行处理中背压是必不可少的, 否则会淹没在并行操作符的内部队列中。

也不存在 ParallelSingle、ParallelCompletable、ParallelMaybe

2.1.1 ParallelFlowable 实现并行

在相应的操作符上调用 Flowable 的 parallel() 就会返回 ParallelFlowable

ParallelFlowable parallelFlowable = Flowable.range(1, 100).parallel();

parallelFlowable

.runOn(Schedulers.io())

.map(new Function() {

@Override

public String apply(Integer integer) throws Exception {

return integer.toString();

}

})

.sequential()

.subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

Log.d(TAG, "Next: " + s);

}

});

其中 parallel() 调用了 ParallelFlowable.from

public final ParallelFlowable parallel() {

return ParallelFlowable.from(this);

}

public static ParallelFlowable from(@NonNull Publisher extends T> source) {

return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize());

}

Paralle!ParallelFlowable 的 from() 方法是通过 Publisher 并以循环的方式在多个“轨道” (CPU 数)上消费它的。

默认情况下,并行级别被设置为可用 CPU 的数量 ( Runtime.getRuntime().availableProcessors() ),井且顺序源的预取量设置为 Flowable.bufferSize()。两者都可以通过重载 parallel() 方法来指定。

public final ParallelFlowable parallel(int parallelism) {

ObjectHelper.verifyPositive(parallelism, "parallelism");

return ParallelFlowable.from(this, parallelism);

}

public final ParallelFlowable parallel(int parallelism, int prefetch) {

ObjectHelper.verifyPositive(parallelism, "parallelism");

ObjectHelper.verifyPositive(prefetch, "prefetch");

return ParallelFlowable.from(this, parallelism, prefetch);

}

如果己经使用了必要的井行操作,则可以通过 ParallelFlowable.sequential() 操作符返回到顺序流。

2.1.2 ParallelFlowable 与 Scheduler

ParallelFlowable 遵循与 Flowable 相同的异步原理,因此 parallel() 本身并不引入顺序源的异

步消耗,只准备并行流,但是可以通过 runOn(Scheduler) 操作符定义异步。这点与 Flowable

很大不同, Flowable 使用 subscribeOn, observeOn 操作符。

runOn() 可以指定 prefetch 的数量。

public final ParallelFlowable runOn(@NonNull Scheduler scheduler) {

return runOn(scheduler, Flowable.bufferSize());

}

public final ParallelFlowable runOn(@NonNull Scheduler scheduler, int prefetch) {

ObjectHelper.requireNonNull(scheduler, "scheduler");

ObjectHelper.verifyPositive(prefetch, "prefetch");

return RxJavaPlugins.onAssembly(new ParallelRunOn(this, scheduler, prefetch));

}

2.2 ParallelFlowable 的操作符

并非所有的顺序操作在并行世界中都是有意义的。目前 ParallelFlowable 只支持如下操作

map, filter, flatMap, concatMap, reduce, collect, sorted, toSortedList, compose,fromArray, doOnCancel, doOnError, doOnComplete, doOnNext, doAfterNext, doOnSubscribe, doAfterTerminated, doOnRequest

这些 ParallelFlowable 可用的操作符,使用方法与 Flowable 中的用法

2.3 parallelFlowable 和 Flowable.flatMap 比较

Observable.flatMap 来实现并行, Flowable.flatMap 实现井行的原理和 Observable.flatMap 实现并行的原理相同。

那么什么时候使用 flatMap 进行并行处理比较好,什么时候使用 ParallelFlowable 比较好呢?

RxJava 本质上是连续的,借助 flatMap 操作符进行分离和加入一个序列可能会变得很复杂,

并引起一定的开销 。是如果使用 ParallelF!owable ,则开销会更小。

然而, parallelFlowable!Flowable 的操作符很有限,如果有一些特殊的操作需要并行执行,而这些操

作不能用 parallelFlowable 所支持的操作符来表达,那么就应该使用基于 Flowable.flatMap 来实

现井行。

因此,优先推荐使用 parallelFlowable ,对于无法使用 parallelFlowable 的操作符,则可以使

flatMap 来实现井行。

如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)

java并行编程_RxJava(十一): 并行编程相关推荐

  1. 分解和合并:Java 也擅长轻松的并行编程!

    多核处理器现在已广泛应用于服务器.台式机和便携机硬件.它们还扩展到到更小的设备,如智能电话和平板电脑.由于进程的线程可以在多个内核上并行执行,因此多核处理器为并发编程打开了一扇扇新的大门.为实现应用程 ...

  2. 分解和合并:Java 也擅长轻松的并行编程! 作者:Julien Ponge

    文中的程序我也测试过了,  注意下面的红字部分,在测试的时候我们需要保护测试环境尽可能等价,要么 分成2次测试  一次输出串行的时间  一次输出并行的时间,如果想在一个方法中比较,那么两者的先后顺序就 ...

  3. C#并发编程之初识并行编程

    写在前面 之前微信公众号里有一位叫sara的朋友建议我写一下Parallel的相关内容,因为手中商城的重构工作量较大,一时之间无法抽出时间.近日,这套系统已有阶段性成果,所以准备写一下Parallel ...

  4. MPI并行程序开发设计----------------------------------并行编程模型和算法等介绍

    ---------------------------------------------------------------------------------------------------- ...

  5. 速学堂(java)第十一章编程题答案(自写)

    速学堂(java)第十一章编程题答案(自写) 1.设计一个多线程的程序如下:设计一个火车售票模拟程序.假如火车站要有100张火车票要卖出,现在有5个售票点同时售票,用5个线程模拟这5个售票点的售票情况 ...

  6. c语言mpi并行程序,高性能计算之并行编程技术MPI并行程序设计(完整版).pdf

    高性能计算之并行编程技术MPI并行程序设计(完整版) 高性能计算之并行编程技术 -- MPI并行程序设计 都志辉 编著 李三立 审阅 陈渝 刘鹏 校对 I 内容提要 本书介绍目前最常见的并行程序- M ...

  7. python并发处理同一个文件_python并发编程(并发与并行,同步和异步,阻塞与非阻塞)...

    最近在学python的网络编程,学会了socket通信,并利用socket实现了一个具有用户验证功能,可以上传下载文件.可以实现命令行功能,创建和删除文件夹,可以实现的断点续传等功能的FTP服务器.但 ...

  8. Java(十一) 网络编程

    网络编程 第一章 网络编程入门 1.1软件结构 C/S结构 :全称为Client/Server结构,是指客户端和服务器结构.常见程序有QQ.迅雷等软件. B/S结构 :全称为Browser/Serve ...

  9. java里函数式表达式_Java8函数式编程 (一) 数据流和lambda表达式

    JDK 1.8中引入了函数式编程(functional programming,FP),如果您已习惯OOP,一定会感到困惑:什么是函数式编程?这样的编程模式有什么好处? 本文将通过简单的实例令读者对函 ...

最新文章

  1. 解决getOutputStream() has already been called for this response[java io流]
  2. 四、Android学习第四天——JAVA基础回顾(转)
  3. find命令查找某些文件并将其拷贝到指定目录
  4. 【GitHub加速工具,让你的GitHub、StackOverflow网站流畅度快到飞起,建议收藏~】
  5. MySql实现远程连接
  6. 程序员因太过耿直,致苹果官网出现bug......
  7. 中兴软件笔试 c语言,【中兴通讯员工笔试试题及答案】 - 面试网
  8. mysql cast numeric_(转载)mysql decimal、numeric数据类型
  9. 284、超详细的光纤熔纤、盘纤教程,值得收藏
  10. [dpdk] SDK编译配置
  11. Hadoop搭建之Centos 7.0系统安装
  12. WordPress 网站怎么做会员中心功能【会员中心】
  13. codeforces1153F Serval and Bonus Proble【期望DP】
  14. NLP入门--Word2Vec(CBOW)实战
  15. 【单片机】STM32 最小板 学习笔记
  16. 高中计算机备课教案模板,高中信息技术教案:《程序设计基础》教案模板
  17. 关于HttpClient绕过SSL认证以及NTLM认证
  18. weiit-saas第六篇《如何快速制作与搭建微信公众号商城》
  19. 2021.05.11丨COG分析柱状图绘制
  20. 电子对抗(雷达)发展现状

热门文章

  1. 安装vmtools之后任然不能在虚拟机和主机之间复制粘贴的问题
  2. PyTorch 深度学习实践 第4讲
  3. Webpack4 配置TS Loader
  4. mysql登陆策略_教你mysql mssql服务器安全设置策略
  5. GK Summay算法(ϵ−approximate ϕ−quantile)
  6. 螺旋矩阵---易懂系列
  7. java期末考试复习题_java期末考试复习题库 试题题库.doc
  8. Mysql出现问题:mysqld: error while loading shared libraries: libaio.so.1: cannot open shared object解决方案
  9. ​predis操作大全​
  10. 萃余液P507/P204除油工艺