一:利用阻塞队列:

Producer:

package ;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;public class Producer implements Runnable {private BlockingQueue<PCData> queue;// 内存缓冲区private static AtomicInteger count = new AtomicInteger();// 总数 原子操作public Producer(BlockingQueue<PCData> queue) {this.queue = queue;}@Overridepublic void run() {PCData data = null;Random r = new Random();try {while (true) {Thread.sleep(r.nextInt(1000));data = new PCData(count.incrementAndGet());System.out.println(data + " 加入队列" );if (!queue.offer(data)) {System.err.println(" 加入队列失败");}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}
}

Consumer:

import java.util.Random;
import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable {private BlockingQueue<PCData> queue;private static final int SLEEPTIME = 1000;public Consumer(BlockingQueue<PCData> queue) {this.queue = queue;}@Overridepublic void run() {System.out.println("start Consumer id :" + Thread.currentThread().getId());Random r = new Random();try {while (true) {PCData data = queue.take();if (data != null) {System.out.println("消费了:" + data);Thread.sleep(r.nextInt(SLEEPTIME));}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}
}

PCData:

public class PCData {private final int intData;public PCData(int d) {intData = d;}public PCData(String d) {intData = Integer.valueOf(d);}public int getData() {return intData;}@Overridepublic String toString() {return "data:" + intData;}
}

ProducterAndConsumer:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;public class ProducterAndConsumer {public static void main(String[] args) throws InterruptedException {BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(5);Producer p1 = new Producer(queue);Producer p2 = new Producer(queue);Producer p3 = new Producer(queue);Consumer c1 = new Consumer(queue);//Consumer c2 = new Consumer(queue);// Consumer c3 = new Consumer(queue);ExecutorService service = Executors.newCachedThreadPool();service.execute(p1);service.execute(p2);service.execute(p3);service.execute(c1);//service.execute(c2);// service.execute(c3);
    }
}

二:利用wait/notify:

Producer:

package 生产者消费者模式;import java.util.List;
import java.util.Random;public class Producer implements Runnable {private List<PCData> queue;private int length;public Producer(List<PCData> queue, int length) {this.queue = queue;this.length = length;}public void run() {try {while(true) {if (Thread.currentThread().isInterrupted())  //无限判断线程状态,中断了就返回truebreak;Random r = new Random();long temp = r.nextInt(100);System.out.println(Thread.currentThread().getId() + "生产了:" + temp + "缓存大小:" + queue.size());PCData data = new PCData();data.set(temp);synchronized (queue) {if (queue.size() >= length) {//System.out.println("满啦~~~,无法放入");
                        queue.notifyAll();queue.wait();} else {queue.add(data);}Thread.sleep(2000);}}} catch (InterruptedException e) {// TODO Auto-generated catch block
            e.printStackTrace();}}
}

Consumer:

package 生产者消费者模式;import java.util.List;public class Consumer implements Runnable {private List<PCData> queue;public Consumer(List<PCData> queue) {this.queue = queue;}public void run() {try {while(true) {if(Thread.currentThread().isInterrupted())break;PCData data = null;synchronized (queue) {if (queue.size() == 0) {//System.out.println("空啦~~~~,无法获取");
                        queue.wait();queue.notifyAll();} data = queue.remove(0);}System.out.println(Thread.currentThread().getId()+"消费了" + data.get() +"缓存大小:" + queue.size());Thread.sleep(1000);}} catch (InterruptedException e) {// TODO Auto-generated catch block
            e.printStackTrace();}}
}

PCData:

package 生产者消费者模式;public class PCData {private long value;public void set(long value) {this.value = value;}public long get(){return value;}
}

ProducterAndConsumer:

package 生产者消费者模式;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ProducterAndConsumer {public static void main(String[] args) {List<PCData> queue = new ArrayList<>();int length = 10;Producer p1 = new Producer(queue, length);//Producer p2 = new Producer(queue, length);
        Consumer c1 = new Consumer(queue);Consumer c2 = new Consumer(queue);ExecutorService service = Executors.newCachedThreadPool();service.execute(c1);service.execute(p1);//service.execute(p2);
        service.execute(c2);}
}

转载于:https://www.cnblogs.com/buder-cp/p/7740248.html

2017-10-26 消息队列设计相关推荐

  1. ActiveMQ学习总结(8)——消息队列设计精要

    2019独角兽企业重金招聘Python工程师标准>>> 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步R ...

  2. 【kafka】消息队列设计精要

    1.概述 转载:消息队列设计精要 好文章,建议大家去看原文. 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手 ...

  3. 【共享内存】基于共享内存的无锁消息队列设计

    上交所技术服务 2018-09-05 https://mp.weixin.qq.com/s/RqHsX3NIZ4_BS8O30KWYhQ 目录 一.背景 二.消息队列的应用需求 (一)  通信架构的升 ...

  4. 消息队列设计的精髓基本都藏在本文里了

    当今市面上有很多主流的消息中间件,如老牌的ActiveMQ.RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify.MetaQ.RocketMQ等.本文不会一一介绍这些消息队列的所有特 ...

  5. 应用消息队列设计可以解决哪些实际问题?

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ ...

  6. 架构文摘:消息队列设计精要

    消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一. 当今市面上有很多主流的消息中间件,如老牌的Activ ...

  7. 去哪儿网消息队列设计与实现

    去哪儿网近日在GitHub上开源了其内部广泛使用的消息队列(内部代号QMQ),本文从去哪儿网使用消息队列所碰到的各种问题出发探讨去哪儿网消息队列的设计与实现. 背景 2012年,随着公司业务的快速增长 ...

  8. 2017.10.26 社团活动总结

    这次到的人二十以内,感谢其他部门负责人捧场.上课的人是高二(15)班的罗颖,他还是要好好锻炼一下的 下面总结课程内容: 想偷个懒,就讲下button控件的用法,其他看bilibili视频 1.下载安装 ...

  9. 2017.10.26模拟赛day1

    -- T1为爱追寻 问题描述 历经了半年的停课之后,紫萱学姐回到了陌生又熟悉的班里,她想找到学长的位置.于是她决定采用一种高效率的寻找方法:瞎找法. 我们将学姐的班级视为一个二维平面,每个整数坐标对应 ...

  10. 2017.10.26 四校联测D1

    题目比以前的题不知道高明到哪里去了,虽然std有错+数据有问题 T1 地精部落原题 #include<iostream> #include<cstdio> using name ...

最新文章

  1. Animation Override Controller动画重载器
  2. criscriter英语测试软件,English test (英语测验)
  3. 怎样的中奖算法能让人信服(转)
  4. python3 配置文件处理 configparser 库简介
  5. UA MATH571B 试验设计 总结 试验的类型与选择
  6. 关于python的全局变量和局部变量、以下描述错误的是_Python:全局变量与局部变量的问题...
  7. apache ranger_Apache Ranger插件的美丽简洁
  8. 拼多多被指洗钱 官方回应:将起诉“差评”并索赔1000万元
  9. Git(3):git clone远程GitHub仓库代码出现“Permission Denied (publickey)”问题
  10. 关于IP地址定位、IP查询和IP地址库 你想了解的历史都在这里
  11. 我的世界源代码java复制_《我的世界》你做主,微软逐步开放源代码
  12. AppScan下载安装教程
  13. DOS窗口查找电脑端口占用情况
  14. 程序设计基础java_Java程序设计基础
  15. 用C语言实现求水仙花数
  16. docker更改映射端口(实践篇)
  17. 阿里云在家实践计划主机申请教程
  18. 南京热门美食网html,江苏南京十大特色美食排名榜单揭晓
  19. QTableView 设置行间距
  20. 恶梦护士 asa_Web开发人员的10大噩梦

热门文章

  1. 建议0 不要让main函数返回void
  2. 【机器人操作系统】ROS文件结构
  3. lua_shared_dict的incr方法
  4. 微信开发--获取用户信息中文乱码的解决方案
  5. Linux mkdir 与 mkdir -p 的区别
  6. Server Tomcat v8.0 Server at localhost failed to start.
  7. mongoDB's Optimization example
  8. C# App.config 详解
  9. java多线程都有几种方式实现
  10. FZU_2019_Mountain Number题解