将一个简单远程调用的方式例子改为异步调用
第一版:https://www.cnblogs.com/nxzblogs/p/12766025.html
第二版:使用RxJava :(RxJava:https://github.com/ReactiveX/RxJava)

package com.xsxy.asynctest.test04;import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 广播调用RPC*/
public class Test02RxJavaAsyncRpcCallTest {public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();public static ThreadPoolExecutor BIZ_EXECUTOR = new ThreadPoolExecutor(AVAILABLE_PROCESSORS, AVAILABLE_PROCESSORS, 10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException {// rxJavaRpcCall();// aSyncRxJavaRpcCall();// aSyncRpcCall2();aSync();// aSyncUserBizExecutor();}/*** rxjava 同步执行* 消耗时间大概为20s,因为rpcCall方法是同步调用的,调用线程就是main线程*/public static void rxJavaRpcCall() {long start = System.currentTimeMillis();Flowable.fromArray(genIpList().toArray(new String[0])).map(ip -> rpcCall(ip, ip)).subscribe(System.out::println);// 大概10sSystem.out.println("sync execute rxjavaRpcCall consume:" + (System.currentTimeMillis() - start));}/*** 异步调用* <p>* 在RxJava中,操作运算符不能直接使用Threads或ExecutorServices进行异步处理,而需要使用Schedulers来抽象统一API背后* 的并发调度线程池。RxJava提供了几个可通过Schedulers访问的标准调度执行器。* ● Schedulers.computation():在后台运行固定数量的专用线程来计算密集型工作。大多数异步操作符使用它作为其默认调度线程池。* ● Schedulers.io():在动态变化的线程集合上运行类I/ O或阻塞操作。* ● Schedulers.single():以顺序和FIFO方式在单个线程上运行。* ● Schedulers.trampoline():在其中一个参与线程中以顺序和FIFO方式运行,通常用于测试目的。* <p>* RxJava还可以让我们通过Schedulers.from(Executor)将现有的Executor(及其子类型,如ExecutorService)包装到Scheduler中。* 例如,可以将其用于具有更大但仍然固定的线程池(与calculate()和io()不同)*/public static void aSyncRxJavaRpcCall() throws InterruptedException {long start = System.currentTimeMillis();// 使用 observeOn 让 rpcCall 的执行由main函数所在线程切换到IO线程// 顺序调用Flowable.fromArray(genIpList().toArray(new String[0]))// 切换到io线程执行.observeOn(Schedulers.io())// 映射结果.map(ip -> rpcCall(ip, ip))// 订阅消费者.subscribe(System.out::println);// main函数不会等rpcCall调用完毕System.out.println("sync execute rxjavaRpcCall consume:" + (System.currentTimeMillis() - start));// 上边代码在没有执行完10调用,main函数就结束了,因为IO线程时Deamon线程,而JVM退出的时机时没有用户线程// 所以需要将main函数挂起Thread.currentThread().join();// ##########################################################################################/*上代码我们挂起了main函数所在线程,上面的代码运行时main函数所在线程会马上返回,然后执行sout输出打印,并挂起自己;具体的10次rpc调用是在IO线程内执行的,到这里我们释放了main函数所在线程来执行rpc调用,但是IO线程内的10个rpc调用还是顺序执行的*/}/*** 让10个rpc调用顺序执行转换为异步并发执行前,我们先看看另外一个操作符subscribeOn是如何在发射元素的线程执行比较耗时* 的操作时切换为异步执行的*/public static void aSyncRpcCall2() throws InterruptedException {long start = System.currentTimeMillis();Flowable.fromCallable(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();}return "Done";})// 发射元素异步执行// .subscribeOn(Schedulers.io())// 切换到io线程执行.observeOn(Schedulers.io())// 订阅消费者.subscribe(System.out::println);// 该输出语句按理说时会很快输出的,但是事实并不是,还是会消耗2s左右,这个是因为,虽然消费时异步// 使用observeOn方法让接收元素和处理元素的逻辑从main函数所在线程切换为其他线程,但是发射元素还是同步执行的// 所以我们还需要让发射元素的逻辑异步化,而subscribeOn就是做这个事情的// (放开上边的subscribeOn方法,这样使用subscribeOn元素发射与observeOn接收操作全部都异步化)System.out.println("consume:" + (System.currentTimeMillis() - start));Thread.currentThread().join();// ##########################################################################################/*默认情况下被观察对象与其上施加的操作符链的运行以及把运行结果通知给观察者对象使用的是调用subscribe方法所在的线程,SubscribeOn操作符可以通过设置Scheduler来改变这个行为,让上面的操作切换到其他线程来执行。ObserveOn操作符可以指定一个不同的Scheduler让被观察者对象使用其他线程来把结果通知给观察者对象,并执行观察者的回调函数。所以如果流发射元素时有耗时的计算或者阻塞IO,则可以通过使用subscribeOn操作来把阻塞的操作异步化(切换到其他线程来执行)。另外如果一旦数据就绪(数据发射出来),则可以通过使用observeOn来切换使用其他线程(比如前台或者GUI线程)来对数据进行处理。需要注意SubscribeOn这个操作符指定的是被观察者对象(发布者)本身在哪个调度器上执行,而且和在流上的操作链中SubscribeOn的位置无关,并且整个调用链上调用多次时,只有第一次才有效。而ObservableOn则是指定观察者对象(订阅者)在哪个调度器上接收被观察者发来的通知,在操作符链上每当调用了ObservableOn这个操作符时都会进行线程的切换*/}/*** 回到10次rpc调用,如何使用flatmap和subscribeOn将同步转为异步*/public static void aSync() {long start = System.currentTimeMillis();Flowable.fromArray(genIpList().toArray(new String[0]))// flatMap 将所有的ip转换为 flowAble对象.flatMap(ip ->// 将每个ip作为数据源获取一个流对象Flowable.just(ip)// 讲发射逻辑改为异步.subscribeOn(Schedulers.io())// 使用map将ip对象转为rpc调用结果  以上ipList所有的数据都是并发调用的.map(v -> rpcCall(v, v)))// 阻塞所有的rpc并发调用结束 阻塞的是main线程.blockingSubscribe(System.out::println);// 因为rpc调用是并发进行的,所以耗时大概为2.5秒System.out.println("async consume: " + (System.currentTimeMillis() - start));}/*** 回到10次rpc调用,如何使用flatmap和subscribeOn将同步转为异步* <p>* 使用自定义线程池*/public static void aSyncUserBizExecutor() {long start = System.currentTimeMillis();Flowable.fromArray(genIpList().toArray(new String[0]))// flatMap 将所有的ip转换为 flowAble对象.flatMap(ip ->// 将每个ip作为数据源获取一个流对象Flowable.just(ip)// 讲发射逻辑改为异步.subscribeOn(Schedulers.from(BIZ_EXECUTOR))// 使用map将ip对象转为rpc调用结果  以上ipList所有的数据都是并发调用的.map(v -> rpcCall(v, v)))// 阻塞所有的rpc并发调用结束 阻塞的是main线程.blockingSubscribe(System.out::println);// 因为rpc调用是并发进行的,所以耗时大概为6.6秒System.out.println("async consume: " + (System.currentTimeMillis() - start));}/*** 简单的rpcCall** @param ip* @param params*/public static String rpcCall(String ip, String params) {try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();}return params;}/*** 生成ipList** @return*/public static List<String> genIpList() {List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add("192.168.0." + i);}return list;}
}

将一个简单远程调用的方式例子改为异步调用 -- 2相关推荐

  1. 将一个简单远程调用的方式例子改为异步调用

    将一个简单远程调用的方式例子改为异步调用 package com.xsxy.asynctest.test03;import java.util.ArrayList; import java.util. ...

  2. delphi 异步 调用 带参数_Dubbo 关于同步/异步调用的几种方式

    我们知道,Dubbo 缺省协议采用单一长连接,底层实现是 Netty 的 NIO 异步通讯机制:基于这种机制,Dubbo 实现了以下几种调用方式: 同步调用 异步调用 参数回调 事件通知 同步调用 同 ...

  3. java 异步调用方法_java异步调用方法有哪些?如何实现异步调用?

    你知道java异步调用方法都有哪些吗?下面的文章内容,就对这方面的问题做了一下整理,一起来看看java异步调用的方法吧! 1.利用Spring的异步方法去执行 注:没有返回值 在启动类又或者是配置类加 ...

  4. 异步调用WebService方式!

    WebService方法是不需要作任何修改的,只是在调用时采用异步的方式,这样在循环中,速度会显得快一点. 原来的方式: HotelMagWeb.com.china_sms.www.MainServi ...

  5. 关于webservice的异步调用简单实例

    于webservice的异步调用简单实例 无论在任何情况下,被调用方的代码无论是被异步调用还是同步调用的情况下,被调用方的代码都是一样的, 下面,我们就以异步调用一个webservice 为例作说明. ...

  6. 如何从异步调用返回响应?

    我有一个函数foo ,它发出Ajax请求. 如何返回foo的响应? 我尝试从success回调中返回值,以及将响应分配给函数内部的局部变量并返回该局部变量,但这些方法均未真正返回响应. functio ...

  7. 限时购校验小工具dubbo异步调用实现限

    本文来自网易云社区 作者:张伟 背景 限时购是网易考拉目前比较常用的促销形式,但是前期创建一个限时购活动时需要各个BU按照指定的Excel格式进行选品提报,为了保证提报数据准确,运营需要人肉校验很多信 ...

  8. LabVIEW异步调用VI

    LabVIEW异步调用VI 如通过子VI节点或通过引用调用节点以标准调用方法调用一个VI,数据流在被调用节点处暂停直到子VI返回结果.然后数据流从节点的输出端继续. 与上述方式不同,异步调用VI时,子 ...

  9. 第11章 异步消息与异步调用

    开心一笑 [老爸斗地主把豆豆输光了,叫我给他充值.我说他不要在游戏里花钱,打发时间玩玩得了.老爸一下火了:小时候你要哪个玩具老子不给你买了,现在让你给我买点豆豆你都不肯,看来老了是指望不上你了... ...

最新文章

  1. CentOS7配置防火墙
  2. oozie的作业调度
  3. 【Groovy】闭包 Closure ( 闭包类 Closure 简介 | 闭包 parameterTypes 和 maximumNumberOfParameters 成员用法 )
  4. 40岁了,还要跟小年青一样埋头敲代码吗?
  5. 浅谈电磁学——基本电现象
  6. How product extension field is involved in search scenario
  7. asp.net core监控—引入Prometheus(一)
  8. 怎么清空topic数据_20.Roscpp/Rospy:Topic_demo
  9. Python 数据结构与算法——归并排序
  10. 我的LINUX学习之路之十三之用脚本通过PXE安装LINUX
  11. 11 Steps Attackers Took to Crack Target
  12. 泛函分析 04.03 有界线性算子 - 一致有界原则
  13. 【ZYNQ】从入门到秃头02 ZYNQ硬件介绍和Vivado开发流程
  14. ArcGIS动态表格扩展模块Mapping and Charting Solutions使用教程及下载地址
  15. Apache Tomcat 文件包含漏洞(CNVD-2020-10487)修复方法
  16. linux安装gcc5.4教程,arm-linux-gcc-5.4.0安装方法
  17. 菜鸟爬取中关村手机详情页参数及报价
  18. 一篇好文,以在迷茫时阅读(文章转载自CSDN)
  19. 春招进来的新人23岁Java开发上来秀了波操作,真是扮猪吃老虎
  20. APP热潮来临 图解九种商业模式

热门文章

  1. JavaScript复杂判断的更优雅写法
  2. MySQL in语句排序
  3. pci总线定时协议_汽车总线测试的“解忧杂货店”
  4. openwrt 需要高级浏览器_OpenWrt的新(shi)奇(yong)玩法
  5. laravel项目运行 php artisan cache:clear 命令报错
  6. php 单元测试 静态类,可选的PHP类型提示/检查单元测试或静态分析?
  7. rds基于什么开发_元王RDS--让H公司的10多年的设计经验重获新生!
  8. java自动关闭吗_JAVA问题--浏览器老是自动关闭
  9. php表单确认密码,jQuery表单验证之密码确认实例详解
  10. keil4如何设置自动缩进_如何设置私聊自动回复?