hadoop10---消息队列
java消息队列 BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。锁也是用来控制线程同步的,释放锁的时候谁拿到了锁是没有预测的,所以谁拿到了锁是不可控的。BlockingQueue消息队列就可以控制。队列是先进先出的。 阻塞:我往队列里面放任务的时候别人不能放,我取任务的时候别人不能取。队列满了就不能put了。 也支持不阻塞队列。
主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。插入:1)add(anObject):把任务Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常,不好。2)offer(anObject):表示如果可能的话,将任务Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。尝试放一下,放不进去就等一会再去放。3)put(anObject):把任务Runnable加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续, 有阻塞, 放不进去就等待读取:4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null; 取不到返回null5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止; 阻塞, 取不到就一直等其他 int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准, 不能保证数据的准确性,因为有可能队列正在加入或者移除。 boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改 变了返回true public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true int drainTo(Collection<? super E> c); //移除此队列中所有可用的元素,并将它们添加到给定 collection 中。取出放到集合中 int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,指定了移 动的数量; 取出指定个数放到集合
BlockingQueue有四个具体的实现类,常用的两种实现类为:1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。数组的长度是不可变的。2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。 LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。LinkedBlockingQueue和ArrayBlockingQueue区别:LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.生产者消费者的示例代码: 见代码
package cn.itcast_02_blockingqueue.main;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;//阻塞队列 public class Test {public static void main(String[] args) throws Exception {//队列里面随便放什么都可以BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);// BlockingQueue<String> queue = new LinkedBlockingQueue<String>();// 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);Consumer consumer = new Consumer(queue);Producer producer = new Producer(queue);for (int i = 0; i < 3; i++) {new Thread(producer, "Producer" + (i + 1)).start();}for (int i = 0; i < 5; i++) {new Thread(consumer, "Consumer" + (i + 1)).start();}new Thread(producer, "Producer" + (5)).start();} }class Consumer implements Runnable{ BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { try { String consumer = Thread.currentThread().getName();System.out.println(consumer); String temp = queue.take();//如果队列为空,会阻塞当前线程 System.out.println(consumer+"get a product:"+temp); } catch (InterruptedException e) { e.printStackTrace(); } } } //一个Runnable就是一个任务,放在一个线程里面执行 /*class MyThreadWithImpliment implements RunnableThread thread1 = new Thread(new MyThreadWithImpliment(1), "thread-1");thread1.start(); */class Producer implements Runnable { BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { System.out.println("I have made a product:" + Thread.currentThread().getName()); String temp = "A Product, 生产线程:" + Thread.currentThread().getName(); queue.put(temp);//如果队列是满的话,会阻塞当前线程 } catch (InterruptedException e) { e.printStackTrace(); } } }
package cn.itcast_02_blockingqueue.main;import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;public class TestBlockingQueue {public static void main(String[] args) {//队列里面随便放什么都可以BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);// BlockingQueue<String> queue = new LinkedBlockingQueue<String>();// 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);for (int i = 0; i < 3; i++) {new Thread(producer, "Producer" + (i + 1)).start();}for (int i = 0; i < 5; i++) {new Thread(consumer, "Consumer" + (i + 1)).start();}new Thread(producer, "Producer" + (5)).start();} }class TestBlockingQueueConsumer implements Runnable{ BlockingQueue<String> queue; Random random = new Random();public TestBlockingQueueConsumer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { try { Thread.sleep(random.nextInt(10));System.out.println(Thread.currentThread().getName()+ "trying...");String temp = queue.take();//如果队列为空,会阻塞当前线程 System.out.println(Thread.currentThread().getName() + " get a job " +temp); } catch (InterruptedException e) { e.printStackTrace(); } } }class TestBlockingQueueProducer implements Runnable {BlockingQueue<String> queue;Random random = new Random();public TestBlockingQueueProducer(BlockingQueue<String> queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(random.nextInt(10));String task = Thread.currentThread().getName() + " made a product " + i;System.out.println(task);queue.put(task);} catch (InterruptedException e) {e.printStackTrace();}}} }
hadoop10---消息队列相关推荐
- Redis 笔记(04)— list类型(作为消息队列使用、在列表头部添加元素、尾部删除元素、查看列表长度、遍历指定列表区间元素、获取指定区间列表元素、阻塞式获取列表元素)
Redis 的列表是链表而不是数组.这意味着 list 的插入和删除操作非常快,时间复杂度为 O(1),但是索引定位很慢,时间复杂度为 O(n). 当列表弹出了最后一个元素之后,该数据结构自动被删除, ...
- 2021年大数据Kafka(一):❤️消息队列和Kafka的基本介绍❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 消息队列和Kafka的基本介绍 一.什么是消息队列 二.消息队列的应用场景 ...
- java多线程消息队列_java多线程消息队列的实现
1.定义一个队列缓存池: private static List queueCache = new LinkedList(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该 ...
- 关于创建zeromq消息队列,设置和更改IP地址,远程可以访问,不只是本地链接。python代码。
关于zeromq的创建,绑定本地,和绑定其他客户端的方法. 网上一大堆关于zmq的通信模式的介绍,包括三种类型,具体我就不在描述. 但是他们给的demo,都是创建本地作为server服务端,也作为cl ...
- Linux进程间通信(IPC)-------消息队列
消息队列是进程间通信的一种方法,他有两个操作,一个进程来发送消息(也就是向内存中写入数据),另一个是获取消息(也就是另外一个进程在内存中读取数据) 下面来看消息队列的 创建,写入,读取等需要用到的函数 ...
- 【部署类】专题:消息队列MQ、进程守护Supervisor
目录 1 背景需求 2 技术方案 2.1 消息队列 2.2 进程守护 3 源码介绍 3.1 supervisor部分 3.1.1 supervisord.conf 内容 3.1.2 MM3D.conf ...
- websphere mq 查看队列中是否有数据_全网最全的 “消息队列”
消息队列的使用场景 以下介绍消息队列在实际应用常用的使用场景.异步处理.应用解耦.流量削锋和消息通讯四个场景. 1]异步处理:场景说明:用户注册后,需要发注册邮件和注册短信. 引入消息队列后架构如下: ...
- linux进程间通信:POSIX 消息队列 ----异步通信
在上一篇中linux进程间通信:POSIX 消息队列我们知道消息队列中在消息个数达到了队列所能承载的上限,就会发生消息的写阻塞. 阻塞式的通信影响系统效率,进程之间在通信收到阻塞时并不能去做其他事情, ...
- linux进程间通信:POSIX 消息队列
文章目录 基本介绍 相关编程接口 编程实例 消息队列通信实例 消息队列属性设置实例 基本介绍 关于消息队列的基本介绍,前面在学习system V的消息队列时已经有过了解,linux进程间通信:syst ...
- linux进程间通信:消息队列实现双端通信
双端通信描述 利用消息队列针对发送接受消息的类型唯一性 进行多个客户端之间消息传递,而不需要server端进行消息转发. 同时消息队列的读阻塞和写阻塞特性(消息队列中已经写入数据,如果再不读出来,则无 ...
最新文章
- oracle valueerror,Oracle VALUE_ERROR异常(挑战题编号000005)
- 重磅!泰晤士发布重量级学科排名,90所中国大陆高校上榜
- (深入理解)matplotlib的交互模式(block,interactive,ion,ioff,draw,show,plot等的区别)
- Codeforces Round #313 (Div. 1) B. Equivalent Strings
- opencv获取摄像头帧率分辨率
- sparksql自定义函数
- 小程序,修改数组或对象中的值,通过input动态修改数组对象中的值
- 2017.8.15 阿狸的打字机 失败总结
- Keil MDK下载程序时的相关设置
- C语言实现顺序栈的初始化进栈出栈读取栈顶元素
- Python中系统命令
- java api 第一个类是_java_8_第一个API
- 三角形度数计算机公式,角度数换算公式(三角函数计算换算角度)
- Blender导出模型到maya
- amCharts 5.1.12 Crack
- Eric6中使用PYQT5在窗口显示图片
- 浅论语言与认知的关系 | NLP基础
- 透视全球AI治理十大事件:站在创新十字路口,人工智能会失控吗?
- 共享文件 服务器存储空间不足,Win7 文件共享报错 存储空间不足,无法处理此命令...
- 最硬核的独立蒙特卡洛抽样法