rxjava 被观察者_RxJava:从未来到可观察
rxjava 被观察者
大约4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我几周前看到Matthew在Code Mesh上发表演讲后,我才对它有所了解。
最近它似乎越来越流行,我注意到Netflix编写了一个Java版本RxJava 。
我以为可以尝试通过更改在探索cypher的MERGE函数时暴露的Observable而不是Future的代码来尝试一下。
回顾一下,我们有50个线程,我们进行了100次迭代,在这些迭代中我们创建了随机(用户,事件)对。 我们最多创建10个用户和50个事件,目标是同时发送相同对的请求。
在另一篇文章的示例中,我丢弃了每个查询的结果,而在这里我返回了结果,因此我有一些要订阅的内容。
代码的轮廓如下所示:
public class MergeTimeRx
{public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}
使用RxJava的好处是,没有提到我们如何获取ExecutionResult的集合,这并不重要。 我们只有它们的流,并且通过在Observable上调用订阅函数,只要有另一个函数可用,我们就会得到通知。
我发现的大多数示例都显示了如何从单个线程生成事件,但是我想使用线程池,以便可以同时触发许多请求。 processEvents方法最终看起来像这样:
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}
我不确定这是否是使用Observable的正确方法,因此如果我记错了,请在评论中让我知道。
我不确定处理错误的正确方法是什么。 我最初在catch块中调用了observer#onError ,但这意味着不会再产生不是我想要的事件。
如果您想使用它,该代码可以作为要点 。 我添加了以下依赖关系以获取RxJava库:
<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency>
翻译自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html
rxjava 被观察者
rxjava 被观察者_RxJava:从未来到可观察相关推荐
- RxJava:从未来到可观察
大约4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我几周前看到Matthew在Code Mesh上发表演讲之后,我才对它有所了解 ...
- java中退订程序怎么写_如何在RxJava中的自定义Observable中获得观察者的退订操作的通知...
订阅者抽象类实际上有一个方法add,可以添加将取消订阅订阅的订阅. def myObservable = Observable.create({ aSubscriber -> val liste ...
- 数据结构链表例程_如何掌握RxJava例程的四个结构
数据结构链表例程 by Ayusch Jain 通过Ayusch Jain 如何掌握RxJava例程的四个结构 (How to get a grip on the four constructs of ...
- 10章 RxJava源码分析
本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10 ...
- Rxjava使用入门
什么是 ReactiveX? ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API.它组合了观察者模式,迭代器模式和函数式编程的优秀思想. 实时数据处理是一件普通 ...
- RxJava和RxAndroid学习记录
目录 1 概念和说明 1.1 响应式编程 1.2 RxJava 1.3 关于RxJava和RxAndroid 1.4 关于响应式编程和普通编程 2. 基本使用 2.1 基本元素关系图 2.2 代码示例 ...
- 1. RxJava概述
零.函数响应式编程概念 如果你曾经使用过java,那一定知道面向对象(OOP)的编程思想,也听说过AOP(面向切面编程)的编程思想; 1> 响应式编程(简称RP) 在计算机中,响应式编程是一种面 ...
- Android内容观察者
内容观察者是做什么的? 内容观察者主要用来观察数据库是否被操作了. 以查询数据库为例: 首先注册一个内容观察者(App1): 1 //false 观察的Uri必须是一个确切的Uri 如果是true,只 ...
- 行为设计模式 - 观察者设计模式
行为设计模式 - 观察者设计模式 观察者模式是行为设计模式之一.当您对对象的状态感兴趣并希望在有任何更改时收到通知时,观察者设计模式非常有用.在观察者模式中,监视另一个对象状态的对象称为Observe ...
最新文章
- 查缺补漏 | Python自定义函数
- ueditor1.4.3配置过程(包含单独上传文件以及图片的使用)
- 郭明錤:苹果将在明年第二季度推出增强现实头盔
- EF CodeFirst 学习 1 - 用fluent API设置元数据,
- platform设备的添加
- OEL修改字符集失败 -bash: /root: is a directory
- 如何调用CAD命令行命令?
- Android FFMpeg 播放器
- 让幽默态度在工作上发挥蝴蝶效应
- Android开发_ARN是什么
- FTP:vsftpd中的local_umask和anon_umask
- 分椰子c语言csdn,水手分椰子——迭代法、递归解题
- 控制台忘记mysql登录密码_许久没登录的MYSQL忘记了密码,这样操作不用重装
- 计算机打开页面恢复默认大小,win10怎样恢复Windows窗口默认的大小?
- 苹果发布全新旗舰,对国内手机市场将造成什么影响?
- macos say批量将文本转换为语音
- php 将百分数处理成小数,php将百分数如何转小数
- 火狐浏览器httpRequest插件
- 杰力科创-- 单片机-- DLT8P64SC--人体感应三档调光调色灯方案
- 公用计算机互联网工程验收规范,公用计算机互联网工程验收规范.doc