rxjava 被观察者

大约4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我几周前看到Matthew在Code Mesh上发表演讲后,我才对它有所了解。

最近它似乎越来越流行,我注意到Netflix编写了一个Java版本RxJava 。

我以为可以尝试通过更改在探索cypher的MERGE函数时暴露的Observable而不是Future的代码来尝试一下。

回顾一下,我们有50个线程,我们进行了100次迭代,在这些迭代中我们创建了随机(用户,事件)对。 我们最多创建10个用户和50个事件,目标是同时发送相同对的请求。

在另一篇文章的示例中,我丢弃了每个查询的结果,而在这里我返回了结果,因此我有一些要订阅的内容。

代码的轮廓如下所示:

public class MergeTimeRx
{public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}

使用RxJava的好处是,没有提到我们如何获取ExecutionResult的集合,这并不重要。 我们只有它们的流,并且通过在Observable上调用订阅函数,只要有另一个函数可用,我们就会得到通知。

我发现的大多数示例都显示了如何从单个线程生成事件,但是我想使用线程池,以便可以同时触发许多请求。 processEvents方法最终看起来像这样:

private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}

我不确定这是否是使用Observable的正确方法,因此如果我记错了,请在评论中让我知道。

我不确定处理错误的正确方法是什么。 我最初在catch块中调用了observer#onError ,但这意味着不会再产生不是我想要的事件。

如果您想使用它,该代码可以作为要点 。 我添加了以下依赖关系以获取RxJava库:

<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency>
参考: RxJava 从未来到我们的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可观察到。

翻译自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html

rxjava 被观察者

rxjava 被观察者_RxJava:从未来到可观察相关推荐

  1. RxJava:从未来到可观察

    大约4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我几周前看到Matthew在Code Mesh上发表演讲之后,我才对它有所了解 ...

  2. java中退订程序怎么写_如何在RxJava中的自定义Observable中获得观察者的退订操作的通知...

    订阅者抽象类实际上有一个方法add,可以添加将取消订阅订阅的订阅. def myObservable = Observable.create({ aSubscriber -> val liste ...

  3. 数据结构链表例程_如何掌握RxJava例程的四个结构

    数据结构链表例程 by Ayusch Jain 通过Ayusch Jain 如何掌握RxJava例程的四个结构 (How to get a grip on the four constructs of ...

  4. 10章 RxJava源码分析

    本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10 ...

  5. Rxjava使用入门

    什么是 ReactiveX?    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API.它组合了观察者模式,迭代器模式和函数式编程的优秀思想.    实时数据处理是一件普通 ...

  6. RxJava和RxAndroid学习记录

    目录 1 概念和说明 1.1 响应式编程 1.2 RxJava 1.3 关于RxJava和RxAndroid 1.4 关于响应式编程和普通编程 2. 基本使用 2.1 基本元素关系图 2.2 代码示例 ...

  7. 1. RxJava概述

    零.函数响应式编程概念 如果你曾经使用过java,那一定知道面向对象(OOP)的编程思想,也听说过AOP(面向切面编程)的编程思想; 1> 响应式编程(简称RP) 在计算机中,响应式编程是一种面 ...

  8. Android内容观察者

    内容观察者是做什么的? 内容观察者主要用来观察数据库是否被操作了. 以查询数据库为例: 首先注册一个内容观察者(App1): 1 //false 观察的Uri必须是一个确切的Uri 如果是true,只 ...

  9. 行为设计模式 - 观察者设计模式

    行为设计模式 - 观察者设计模式 观察者模式是行为设计模式之一.当您对对象的状态感兴趣并希望在有任何更改时收到通知时,观察者设计模式非常有用.在观察者模式中,监视另一个对象状态的对象称为Observe ...

最新文章

  1. 查缺补漏 | Python自定义函数
  2. ueditor1.4.3配置过程(包含单独上传文件以及图片的使用)
  3. 郭明錤:苹果将在明年第二季度推出增强现实头盔
  4. EF CodeFirst 学习 1 - 用fluent API设置元数据,
  5. platform设备的添加
  6. OEL修改字符集失败 -bash: /root: is a directory
  7. 如何调用CAD命令行命令?
  8. Android FFMpeg 播放器
  9. 让幽默态度在工作上发挥蝴蝶效应
  10. Android开发_ARN是什么
  11. FTP:vsftpd中的local_umask和anon_umask
  12. 分椰子c语言csdn,水手分椰子——迭代法、递归解题
  13. 控制台忘记mysql登录密码_许久没登录的MYSQL忘记了密码,这样操作不用重装
  14. 计算机打开页面恢复默认大小,win10怎样恢复Windows窗口默认的大小?
  15. 苹果发布全新旗舰,对国内手机市场将造成什么影响?
  16. macos say批量将文本转换为语音
  17. php 将百分数处理成小数,php将百分数如何转小数
  18. 火狐浏览器httpRequest插件
  19. 杰力科创-- 单片机-- DLT8P64SC--人体感应三档调光调色灯方案
  20. 公用计算机互联网工程验收规范,公用计算机互联网工程验收规范.doc

热门文章

  1. UVALive 8518 - Sum of xor sum
  2. Spark SQL(四)之DataSet与RDD转换
  3. MyBatis】MyBatis一级缓存和二级缓存
  4. TCP 三次握手原理,你真的理解吗
  5. 分享10道常考Java面试题及答案
  6. Tomcat 服务自动启动
  7. Promise解决异步操作问题
  8. 《四世同堂》金句摘抄(十四)
  9. MyBatis中增删改操作
  10. 2020蓝桥杯省赛---java---B---4( 合并检测)