kata

在第1部分:线程池中,我们设计并实现了相对简单的系统,用于实时处理事件。 确保您阅读了上一部分,因为它包含一些我们将重用的类。 以防万一这是要求:

一个系统每秒发送大约一千个事件。 每个Event至少具有两个属性:

  • clientId –我们期望一个客户端每秒最多可以处理几个事件
  • UUID –全球唯一

消耗一个事件大约需要10毫秒。 设计此类流的使用者:

  1. 允许实时处理事件
  2. 与一个客户端有关的事件应按顺序进行处理,即,您不能并行处理同一clientId事件
  3. 如果10秒钟内出现重复的UUID ,请将其删除。 假设10秒钟后不会出现重复

到目前为止,我们提出的是线程池和共享缓存的组合。 这次我们将使用RxJava实现解决方案。 首先,我没有透露EventStream的实现方式,仅提供了API:

interface EventStream {void consume(EventConsumer consumer);}

实际上,对于手动测试,我构建了一个简单的RxJava流,其行为与系统的要求类似:

@Slf4j
class EventStream {void consume(EventConsumer consumer) {observe().subscribe(consumer::consume,e -> log.error("Error emitting event", e));}Observable<Event> observe() {return Observable.interval(1, TimeUnit.MILLISECONDS).delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)).map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())).flatMap(this::occasionallyDuplicate, 100).observeOn(Schedulers.io());}private Observable<Event> occasionallyDuplicate(Event x) {final Observable<Event> event = Observable.just(x);if (Math.random() >= 0.01) {return event;}final Observable<Event> duplicated =event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);return event.concatWith(duplicated);}}

了解此模拟器的工作原理不是必不可少的,但很有趣。 首先,我们产生的源源不断的Long值( 012 ...)每毫秒使用(每秒千个事件) interval()操作。 然后,我们使用delay()运算符将每个事件延迟01_000微秒之间的随机时间。 这样,事件将在难以预测的时刻出现,而情况会更加现实。 最后,我们将每个Long值映射(使用ekhem, map()运算符) map()到一个随机Event其中clientId介于1_0001_100 (包含在内)之间。

最后一点很有趣。 我们想模拟偶尔的重复。 为此,我们将每个事件(使用flatMap() )映射到自身(在99%的情况下)。 但是,在1%的情况下,我们两次返回此事件,第二次发生在10毫秒至5秒后。 在实践中,该事件的重复实例将在其他数百个事件之后出现,这使流的行为逼真。

EventStream交互的方式有两种-通过consume()回调和通过observe()流。 我们可以利用Observable<Event>来快速建立功能与第1部分非常相似但更简单的处理管道。

缺少背压

利用RxJava的第一个幼稚方法很快就失败了:

EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(new ProjectionMetrics(new MetricRegistry()));es.observe().subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

ClientProjectionProjectionMetrics等人来自第1部分 )。 我们几乎立即获得MissingBackpressureException ,这是预期的。 还记得我们的第一个解决方案是如何通过处理越来越多的延迟来滞后吗? RxJava尝试避免这种情况,并避免队列溢出。 由于使用者( ClientProjection )无法实时处理事件,因此抛出MissingBackpressureException 。 这是快速失败的行为。 最快的解决方案是像以前一样使用RxJava的功能将消耗转移到单独的线程池中:

EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(new ClientProjection(new ProjectionMetrics(new MetricRegistry())));es.observe().flatMap(e -> clientProjection.consume(e, Schedulers.io())).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

EventConsumer接口具有一个辅助方法,该方法可以在提供的Scheduler上异步使用事件:

@FunctionalInterface
interface EventConsumer {Event consume(Event event);default Observable<Event> consume(Event event, Scheduler scheduler) {return Observable.fromCallable(() -> this.consume(event)).subscribeOn(scheduler);}}

通过在单独的Scheduler.io()使用flatMap()使用事件,可以异步调用每个使用。 这次事件几乎是实时处理的,但是存在更大的问题。 由于某种原因,我用FailOnConcurrentModification装饰了ClientProjection 。 事件彼此独立使用,因此可能会同时处理同一clientId两个事件。 不好。 幸运的是,在RxJava中解决此问题比使用普通线程要容易得多:

es.observe().groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

有点改变了。 首先,我们将事件按clientId分组。 这将单个Observable流拆分为流 。 每个名为byClient子流代表与同一clientId相关的所有事件。 现在,如果我们映射到此子流,我们可以确保与同一个clientId相关的事件不会同时处理。 外部流是惰性的,因此我们必须订阅它。 与其单独订阅每个事件,我们不每秒收集事件并进行计数。 这样,我们每秒就会收到一个Integer类型的单个事件,该事件表示每秒消耗的事件数。

使用全局状态的不纯,非惯常,容易出错,不安全的重复数据删除解决方案

现在我们必须删除重复的UUID 。 丢弃重复项的最简单但非常愚蠢的方法是利用全局状态。 我们可以通过在filter()运算符之外可用的缓存中查找重复项来简单地过滤掉重复项:

final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();es.observe().filter(e -> seenUuids.getIfPresent(e.getUuid()) == null).doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())).subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

如果要监视此机制的使用,只需添加指标:

Meter duplicates = metricRegistry.meter("duplicates");es.observe().filter(e -> {if (seenUuids.getIfPresent(e.getUuid()) != null) {duplicates.mark();return false;} else {return true;}})

从操作员内部访问全局状态,尤其是可变状态是非常危险的,并且破坏了RxJava的唯一目的–简化并发。 显然,我们使用了Guava的线程安全Cache ,但是在许多情况下,很容易错过从多个线程访问共享全局可变状态的地方。 如果您发现自己在运算符链之外对某些变量进行了变异,请非常小心。

RxJava 1.x中的自定义

RxJava 1.x有一个distinct()运算符,大概可以完成此工作:

es.observe().distinct(Event::getUuid).groupBy(Event::getClientId)

不幸的是, distinct()在内部将所有密钥( UUID distinct()存储在不断增长的HashSet 。 但是我们只关心最近10秒钟内的重复! 通过复制粘贴DistinctOperator的实现,我创建了DistinctEvent运算符,该运算符利用Guava的缓存仅存储了最后10秒钟的UUID值。 我故意在此运算符中对Event进行硬编码,而不是使其通用性更强,以使代码更易于理解:

class DistinctEvent implements Observable.Operator<Event, Event> {private final Duration duration;DistinctEvent(Duration duration) {this.duration = duration;}@Overridepublic Subscriber<? super Event> call(Subscriber<? super Event> child) {return new Subscriber<Event>(child) {final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder().expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS).<UUID, Boolean>build().asMap();@Overridepublic void onNext(Event event) {if (keyMemory.put(event.getUuid(), true) == null) {child.onNext(event);} else {request(1);}}@Overridepublic void onError(Throwable e) {child.onError(e);}@Overridepublic void onCompleted() {child.onCompleted();}};}
}

用法非常简单,整个实现(加上自定义运算符)如下:

es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

实际上,如果您跳过每秒的日志记录,它甚至可以更短:

es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

该解决方案比以前的基于线程池和装饰器的解决方案要短得多。 唯一尴尬的部分是自定义运算符,它在存储太多历史UUID时避免内存泄漏。 幸运的是RxJava 2得以解救!

RxJava 2.x和更强大的内置

实际上,我是从提交公关RxJava具有更强大的执行这种紧密distinct()操作。 但是在我检查2.x分支之前,它是: distinct()允许提供自定义Collection ,而不是硬编码的HashSet 。 信不信由你,依赖倒置不仅涉及Spring框架或Java EE。 当库允许您提供其内部数据结构的自定义实现时,这也是DI。 首先,我创建一个辅助方法,该方法可以构建由Map<UUID, Boolean>支持,由Cache<UUID, Boolean>支持的Set<UUID> Cache<UUID, Boolean> 。 我们一定喜欢代表团!

private Set<UUID> recentUuids() {return Collections.newSetFromMap(CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).<UUID, Boolean>build().asMap());
}

有了这种方法,我们可以使用以下表达式实现整个任务:

es.observe().distinct(Event::getUuid, this::recentUuids).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

优雅,简洁,清晰! 它看起来像是一个问题:

  • 观察事件流
  • 仅考虑不同的UUID
  • 客户分组活动
  • 为每个客户消耗(顺序)

希望您喜欢所有这些解决方案,并发现它们对您的日常工作很有用。

也可以看看:

  • 小规模流处理kata。 第1部分:线程池
  • 小规模流处理kata。 第2部分:RxJava 1.x / 2.x

翻译自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-2-rxjava-1-x2-x.html

kata

kata_小规模流处理kata。 第2部分:RxJava 1.x / 2.x相关推荐

  1. kata_小规模流处理kata。 第1部分:线程池

    kata 我再次为我的公司在GeeCON 2016上举办了编程竞赛. 这次分配需要设计并根据以下要求选择实施系统: 一个系统每秒发送大约一千个事件. 每个Event至少具有两个属性: clientId ...

  2. 小规模流处理kata。 第2部分:RxJava 1.x / 2.x

    在第1部分:线程池中,我们设计并实现了相对简单的系统,用于实时处理事件. 确保您已阅读上一部分,因为它包含一些我们将重用的类. 以防万一这是要求: 一个系统每秒传送约一千个事件. 每个Event至少具 ...

  3. 小规模流处理kata。 第1部分:线程池

    我再次为我的公司在GeeCON 2016上举办了编程竞赛. 这次分配需要设计并根据以下要求选择实施系统: 一个系统每秒传送约一千个事件. 每个Event至少具有两个属性: clientId –我们期望 ...

  4. 迈克尔 杰克逊mv_用杰克逊流式传输大型JSON文件– RxJava常见问题解答

    迈克尔 杰克逊mv 在上一篇文章中,我们学习了如何解析过大的XML文件并将其转换为RxJava流. 这次让我们看一个大的JSON文件. 我们的示例将基于微小的colors.json,其中包含将近150 ...

  5. 用杰克逊流式传输大型JSON文件– RxJava常见问题解答

    在上一篇文章中,我们学习了如何解析过大的XML文件并将其转换为RxJava流. 这次让我们看一个大的JSON文件. 我们的示例将基于微小的colors.json,其中包含将近150种这种格式的记录: ...

  6. Spring,Reactor和ElasticSearch:从回调到React流

    Spring 5(以及Boot 2,将在数周内到货)是一次革命. 不是" XML上的注释 "或" Java上的注释类 "的革命. 这是一个真正的革命性框架,可以 ...

  7. Spring,Reactor和ElasticSearch:从回调到反应流

    Spring 5(以及Boot 2,在数周之内到货)是一次革命. 不是" XML上的注释 "或" Java上的注释类 "的革命. 这是一个真正的革命性框架,可以 ...

  8. 这一周,我们迁移学习 | 内有福利

    全体员工 发自 亚龙湾 量子位 出品 | 公众号 QbitAI 海边的周一的感受,是和别处不同的.都是沙滩摆着一把躺椅,头顶上预备着遮阳伞,可以随时刷手机. 写稿的人,傍午傍晚散了工,每每花四文铜钱, ...

  9. Reactor实例解析

    \ 要点 \\ Reactor是一个运行在Java8之上的响应式流框架,它提供了一组响应式风格的API\\t 除了个别API上的区别,它的原理跟RxJava很相似\\t 它是第四代响应式框架,支持操作 ...

最新文章

  1. python跟java 效率_对比平台--Java与Python之间的性能差异
  2. 图论-最短路径--3、SPFA算法O(kE)
  3. “小霸王学习机”再现?树莓派400正式发布,售价70美元
  4. 有空时深入阅读这两篇文章
  5. 如何手工删除残留的DFS NAMESPACE
  6. linux C 中的volatile使用【转】
  7. Oracle 数据库升级
  8. 收文处理和发文处理的环节_液相色谱使用中样品预处理需要注意的几个环节汇总!...
  9. Spring Boot 设置允许跨域的两种配置方式
  10. seaborn无法下载数据的问题
  11. 人脸识别相似度计算方法
  12. 团队任务5:事后诸葛亮会议
  13. 【转】推荐系统入门实践:世纪佳缘会员推荐(完整版)
  14. 拉格朗日乘数法 —— 通俗理解
  15. Android studio语音识别集成科大讯飞语音转文字
  16. php出生年月日怎么写,出生年月日五行查询表,五行属性查询表
  17. 正在考虑写一本书《中国有所没有围墙的大学,影响了世界几千年》第一章请给个反响...
  18. 【动态规划】买卖股票
  19. 文件服务器资源管理器的作用,文件服务器资源管理器 (FSRM) 概述
  20. dnf鹰犬boss机器人_[DNF鹰犬boss怎么打] 兰蒂卢斯的鹰犬boss

热门文章

  1. P1251-餐巾计划问题【费用流】
  2. 数学基础知识(高精、快速幂、龟速乘……)
  3. #6029. 「雅礼集训 2017 Day1」市场(势能,区间除)
  4. 【高斯消元】球形空间产生器(luogu 4035/金牌导航 高斯消元-1)
  5. 【期望】彩色圆环(金牌导航 期望-5)
  6. YL杯超级篮球赛(jzoj 1325)
  7. 近似乘积(jzoj 3925)
  8. React绑定this的三种方式
  9. ByteBuffer的使用
  10. Oracle入门(十二G)之序列