在Java中,经典的生产者-消费者模式相对简单,因为我们有java.util.concurrent.BlockingQueue 。 为了避免繁忙的等待和容易出错的手动锁定,我们只需利用put()take() 。 如果队列已满或为空,它们都将阻塞。 我们需要的是一堆线程共享对同一队列的引用:一些正在生产而其他正在消耗。 当然,队列必须具有有限的容量,否则,如果生产者的表现优于消费者,我们很快就会用光内存。 格雷格·扬(Greg Young)在波兰Devoxx期间对这条规则的强调不够:

永远不要创建无限队列

使用

这是最简单的例子。 首先,我们需要一个将对象放在共享队列中的生产者:

import lombok.Value;
import lombok.extern.slf4j.Slf4j;@Slf4j
@Value
class Producer implements Runnable {private final BlockingQueue<User> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user = new User("User " + System.currentTimeMillis());log.info("Producing {}", user);queue.put(user);TimeUnit.SECONDS.sleep(1);}} catch (Exception e) {log.error("Interrupted", e);}}
}

生产者只需每秒将User类的实例(无论它是什么)发布到给定队列。 显然,在现实生活中,将User在队列中是系统中某些操作(例如用户登录)的结果。 同样,消费者从队列中获取新项目并进行处理:

@Slf4j
@Value
class Consumer implements Runnable {private final BlockingQueue<User> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user = queue.take();log.info("Consuming: {}", user);}} catch (Exception e) {log.error("Interrupted", e);}}
}

再次,在现实生活中,处理将意味着存储在数据库中或对用户运行某些欺诈检测。 我们使用队列将处理线程与消耗线程解耦,例如减少延迟。 为了运行一个简单的测试,让我们启动几个生产者和消费者线程:

BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(new Producer(queue),new Producer(queue),new Consumer(queue),new Consumer(queue),new Consumer(queue)
);final List<Thread> threads = runnables.stream().map(runnable -> new Thread(runnable, threadName(runnable))).peek(Thread::start).collect(toList());TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);//...private static String threadName(Runnable runnable) {return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}

我们有2个生产者和3个消费者,似乎一切正常。 在现实生活中,您可能会有一些隐式生产者线程,例如HTTP请求处理线程。 在使用者方面,您很可能会使用线程池。 这种模式效果很好,但是特别是在消费方面是很底层的。

介绍

本文的目的是介绍一种抽象,其行为类似于生产者方的队列,但表现为来自消费者方的RxJava的Observable 。 换句话说,我们可以将添加到队列中的对象视为可以在客户端映射,过滤,撰写等的流。 有趣的是,这不再是排在后面的队列。 ObservableQueue<T>仅将所有新对象直接转发给订阅的使用者,并且在没有人监听(“可观察到的” )的情况下不缓冲事件。 ObservableQueue<T>本身并不是队列,它只是一个API与另一个API之间的桥梁。 它类似于java.util.concurrent.SynchronousQueue ,但是如果没有人对使用感兴趣,则将对象简单地丢弃。

这是第一个实验性实现。 这只是一个玩具代码,不要认为它已准备就绪。 另外,我们稍后将对其进行简化:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());private final Observable<T> observable = Observable.create(subscriber -> {subscriber.add(new Subscription() {@Overridepublic void unsubscribe() {subscribers.remove(subscriber);}@Overridepublic boolean isUnsubscribed() {return false;}});subscribers.add(subscriber);});public Observable<T> observe() {return observable;}@Overridepublic boolean add(T t) {return offer(t);}@Overridepublic boolean offer(T t) {subscribers.forEach(subscriber -> subscriber.onNext(t));return true;}@Overridepublic T remove() {return noSuchElement();}@Overridepublic T poll() {return null;}@Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}@Overridepublic T peek() {return null;}@Overridepublic void put(T t) throws InterruptedException {offer(t);}@Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}@Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException("Use observe() instead");}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}@Overridepublic int remainingCapacity() {return 0;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends T> c) {c.forEach(this::offer);return true;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return true;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<T> iterator() {return Collections.emptyIterator();}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return a;}@Overridepublic int drainTo(Collection<? super T> c) {return 0;}@Overridepublic int drainTo(Collection<? super T> c, int maxElements) {return 0;}@Overridepublic void close() throws IOException {subscribers.forEach(rx.Observer::onCompleted);}
}

关于它有两个有趣的事实:

  1. 我们必须跟踪所有订户,即愿意接收新商品的消费者。 如果其中一个订阅者不再感兴趣,我们必须删除该订阅者,否则会发生内存泄漏(请继续阅读!)
  2. 此队列的行为就好像它始终为空。 它永远不会保存任何项目–当您将某些内容放入此队列时,它会自动传递给订阅者并被遗忘
  3. 从技术上讲,此队列是无界的(!),这意味着您可以根据需要放置任意数量的项目。 但是,由于将项目传递给所有订户(如果有)并立即丢弃,因此此队列实际上始终为空(请参见上文)
  4. 生产者可能仍会生成太多事件,而消费者可能无法跟上这一步– RxJava现在具有背压支持,本文未介绍。

假设我正确实现了队列协定,生产者可以像使用其他BlockingQueue<T>一样使用ObservableQueue<T> 。 但是,消费者看起来更轻巧,更聪明:

final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));

上面的代码仅打印"B""C" 。 由于ObservableQueue会在没有人监听的情况下丢弃项目,因此设计会丢失"A" 。 显然, Producer类现在使用users队列。 一切正常,您可以随时调用users.observe()并应用数十个Observable运算符之一。 但是有一个警告:默认情况下,RxJava不执行任何线程处理,因此消耗与产生线程在同一线程中发生! 我们失去了生产者-消费者模式的最重要特征,即线程去耦。 幸运的是,RxJava中的所有内容都是声明性的,线程调度也是如此:

users.observe().observeOn(Schedulers.computation()).forEach(user ->log.info("User logged in: {}", user));

现在让我们看一下RxJava的真正功能。 假设您要计算每秒登录的用户数,其中每个登录都作为事件放入队列中:

users.observe().map(User::getName).filter(name -> !name.isEmpty()).window(1, TimeUnit.SECONDS).flatMap(Observable::count).doOnCompleted(() -> log.info("System shuts down")).forEach(c -> log.info("Logins in last second: {}", c));

性能也是可以接受的,这样的队列每秒可以在我的一个订户的笔记本电脑上接受约300万个对象。 将此类视为使用队列到现代反应世界的旧系统的适配器。 可是等等! 使用ObservableQueue<T>很容易,但是使用subscribers同步集的实现似乎太底层了。 幸运的是有Subject<T, T>SubjectObservable “另一面” –您可以将事件推送到Subject但是它仍然实现Observable ,因此您可以轻松地创建任意Observable 。 使用Subject实现之一, ObservableQueue外观如何:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {private final Subject<T, T> subject = PublishSubject.create();public Observable<T> observe() {return subject;}@Overridepublic boolean add(T t) {return offer(t);}@Overridepublic boolean offer(T t) {subject.onNext(t);return true;}@Overridepublic void close() throws IOException {subject.onCompleted();}@Overridepublic T remove() {return noSuchElement();}@Overridepublic T poll() {return null;}@Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}@Overridepublic T peek() {return null;}@Overridepublic void put(T t) throws InterruptedException {offer(t);}@Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}@Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException("Use observe() instead");}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}@Overridepublic int remainingCapacity() {return 0;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends T> c) {c.forEach(this::offer);return true;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return true;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<T> iterator() {return Collections.emptyIterator();}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return a;}@Overridepublic int drainTo(Collection<? super T> c) {return 0;}@Overridepublic int drainTo(Collection<? super T> c, int maxElements) {return 0;}}

上面的实现更加简洁,我们完全不必担心线程同步。

翻译自: https://www.javacodegeeks.com/2015/07/consuming-java-util-concurrent-blockingqueue-as-rx-observable.html

将java.util.concurrent.BlockingQueue用作rx.Observable相关推荐

  1. java.util.concurrent BlockingQueue详解

    什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞 ...

  2. java.util.concurrent.BlockingQueue指南

    2. 概述 在本文中,我们将介绍一个最有用的java.util.concurrent.BlockingQueue来解决并发生产者 - 消费者问题.我们可以看一下BlockingQueue 接口的API ...

  3. java.util.concurrent包API学习笔记

    newFixedThreadPool 创建一个固定大小的线程池. shutdown():用于关闭启动线程,如果不调用该语句,jvm不会关闭. awaitTermination():用于等待子线程结束, ...

  4. java.util.concurrent 包下面的所有类

    java.util.concurrent 包下面的所有类 原子操作数类: java.util.concurrent.atomic.AtomicBoolean.class java.util.concu ...

  5. java多线程学习-java.util.concurrent详解

    http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...

  6. java.util.concurrent 编程范例

    http://kanglecjr.iteye.com/blog/1070934 java.util.concurrent 范例之一 博客分类: Java JavaTomcatthread java.u ...

  7. 使用java.util.concurrent包处理多线程

    出处:http://blog.csdn.net/hjl_168562/article/details/8158023 1.使用拥有固定的线程数的线程池执行线程任务 package com.justin ...

  8. java.util.concurrent 多线程框架

    JDK5中的一个亮点就是将Doug Lea的并发库引入到Java标准库中.Doug Lea确实是一个牛人,能教书,能出书,能编码,不过这在国外还是比较普遍的,而国内的教授们就相差太远了. 一般的服务器 ...

  9. Java并发包-java.util.concurrent详解

    转载自https://blog.csdn.net/axi295309066/article/details/65665090 一.阻塞队列BlockingQueue BlockingQueue通常用于 ...

最新文章

  1. 前端 重构时需要注意的事项_驾驶式扫地车的功能特点和使用时需要注意事项...
  2. IAR J-Link下载程序出现错误提示:Failed to get CPU status after 4 retries Retry?
  3. selenium 示例_Selenium测试中所需的功能和示例
  4. JSF – PrimeFaces和休眠集成项目
  5. 递归获取字符串内的所有图片src地址
  6. 百度吉利成立的汽车公司名称曝光,百度持股55%
  7. matlab中rgb转hsv,matlab实现RGB与HSV(HSB)、HSL和HSI的色彩空间互转
  8. MathType requires a newer version of MT Extra等MathType问题的不兼容性解决方案
  9. oracle sql抓取1
  10. 能在学生用计算机的面上画的图画,小学生电脑绘画作品欣赏
  11. eclipse启动报错,显示找不到指定路径的JRE
  12. Android打开第三方地图使用方法
  13. 百度刚刚晋升的29岁最年轻副总裁李明远
  14. 关注我的微信公众帐号小q机器人(xiaoqrobot)
  15. Smss.exe 进程分析--NT 源码--当机方法
  16. qt中添加背景图片(stylesheet)
  17. 虚拟机Centos网络配置
  18. POJ,ZOJ题目分类(多篇整合版,分类很细致,全面)
  19. 杰瑞学Perl之Hello World
  20. 侃谈移动端音视频发展与现状

热门文章

  1. 新闻发布项目——实体类(newsTb)
  2. 互换性与技术测量教材pdf_【检验】临床生物化学检验技术(第6版)人民卫生出版社【电子教材PDF】【人卫教材电子版】...
  3. ConcurrentHashMap--自用,非教学
  4. serv-u 数据备份_如何使用用户数据脚本在EC2实例上安装Apache Web Server
  5. spring vaadin_在Spring Boot中使用Vaadin的简介
  6. jdk 1.8 不可变常量_在JDK 10中不可变与不可修改
  7. apache.camel_Apache Camel 2.20发布–新增功能
  8. neo4j 嵌入式_在嵌入式Neo4j中使用Neo4j浏览器
  9. servlets_jQuery Ajax – Servlets集成:构建完整的应用程序
  10. Quarkus的其他(非标准)CDI功能