超越昨天的自己系列(2)

1、P2P模型
在P2P模型中,有下列概念:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队 列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时 。
每个消息只有一个消费者 (Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者之间在时间上没有依赖性 ,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
接收者在成功接收消息之后需向队列应答成功
如果你希望发送的每个消息都应该被成功处理 的话,那么你需要P2P模型。

适用场合:想让接收者进行且只进行一次处理

customer:

import javax.jms.Connection;public class Consumer {private static String brokerURL = "tcp://localhost:61616";private static transient ConnectionFactory factory;private transient Connection connection;private transient Session session;private String jobs[] = new String[]{"suspend", "delete"};public Consumer() throws JMSException {factory = new ActiveMQConnectionFactory(brokerURL);connection = factory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);}public void close() throws JMSException {if (connection != null) {connection.close();}}    public static void main(String[] args) throws JMSException {Consumer consumer = new Consumer();for (String job : consumer.jobs) {Destination destination = consumer.getSession().createQueue("JOBS." + job);MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);messageConsumer.setMessageListener(new Listener(job));}}public Session getSession() {return session;}}

listener:

import javax.jms.Message;public class Listener implements MessageListener {private String job;public Listener(String job) {this.job = job;}public void onMessage(Message message) {try {//do something hereSystem.out.println(job + " id:" + ((ObjectMessage)message).getObject());} catch (Exception e) {e.printStackTrace();}}}

publish:

import javax.jms.Connection;public class Publisher {private static String brokerURL = "tcp://localhost:61616";private static transient ConnectionFactory factory;private transient Connection connection;private transient Session session;private transient MessageProducer producer;private static int count = 10;private static int total;private static int id = 1000000;private String jobs[] = new String[]{"suspend", "delete"};public Publisher() throws JMSException {factory = new ActiveMQConnectionFactory(brokerURL);connection = factory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);producer = session.createProducer(null);}    public void close() throws JMSException {if (connection != null) {connection.close();}}    public static void main(String[] args) throws JMSException {Publisher publisher = new Publisher();while (total < 1000) {for (int i = 0; i < count; i++) {publisher.sendMessage();}total += count;System.out.println("Published '" + count + "' of '" + total + "' job messages");try {Thread.sleep(1000);} catch (InterruptedException x) {}}publisher.close();}public void sendMessage() throws JMSException {int idx = 0;while (true) {idx = (int)Math.round(jobs.length * Math.random());if (idx < jobs.length) {break;}}String job = jobs[idx];Destination destination = session.createQueue("JOBS." + job);Message message = session.createObjectMessage(id++);System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);producer.send(destination, message);}    }

pub/sub方式
概念:topic,publisher,subscriber,主题,发布者,订阅者三个角色。主题和订阅者是一对多关系,一个主题可以被多个订阅者订 阅。当发布者向某个主题发送一条消息时,所有的订阅者都会收到。
如何理解订阅的概念呢,个人理解,分两种情况:
一、 创建一个订阅者时使用session.createDurableSubscriber(topic, clientId )方法创建为持久型订阅者时,该topic和clientId的订阅关系将被保存在服务器上,即产生了订阅关系。这样clientId这个id的订阅者将 在离线的时候,也不会丢失消息。这个订阅关系也可以理解为被持久化在jms服务器上了,使用jmx的监视控制台(我使用的activeMq),可以看到有 一个Subscription的节点,下面有你订阅主题时给定的客户端名字。可以使用unsubscribe 方法 取消订阅关系。

二、 创建一个非持久化方式的订阅者时,只有在客户端订阅者连接到jms服务器时,订阅关系成立,订阅者离线时,订阅关系即取消,不会保存在服务器上,这也是非 持久化方式订阅者不能离线接收消息的原因吧。默认为广播方式,在没有订阅者连接到服务器时,发送的消息将丢失,不会保存在服务器。

subSession.createConsumer(destination);
     subSession.createSubscriber(topic);

subSession.createDurableSubscriber(topic, name);    //name  是 一个jms 用来区别定阅者的id

取消定阅

subscriber.close();

session.unsubscribe(name);  //只对持久定阅,

customer:

import javax.jms.Connection;public class Consumer {private static String brokerURL = "tcp://localhost:61616";private static transient ConnectionFactory factory;private transient Connection connection;private transient Session session;public Consumer() throws JMSException {factory = new ActiveMQConnectionFactory(brokerURL);connection = factory.createConnection();connection.start();/*通知方式                    效果DUPS_OK_ACKNOWLEDGE         session延迟通知。如果JMS服务器宕机,会造成重复消息的情况。程序必须保证处理重复消息而不引起程序逻辑的混乱。AUTO_ACKNOWLEDGE             当receive或MessageListener方法成功返回后自动通知。CLIENT_ACKNOWLEDGE            客户端调用消息的acknowledge方法通知*/session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);}public void close() throws JMSException {if (connection != null) {connection.close();}}    public static void main(String[] args) throws JMSException {Consumer consumer = new Consumer();for (String stock : args) {Destination destination = consumer.getSession().createTopic("STOCKS." + stock);// A client uses a MessageConsumer object to receive messages from a destinationMessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);// 注册一个Message Listener,实现异步messageConsumer.setMessageListener(new Listener());}}public Session getSession() {return session;}}

listener:

import java.text.DecimalFormat;
//A MessageListener object is used to receive asynchronously delivered messages
public class Listener implements MessageListener {// 处理messagepublic void onMessage(Message message) {try {MapMessage map = (MapMessage)message;String stock = map.getString("stock");double price = map.getDouble("price");double offer = map.getDouble("offer");boolean up = map.getBoolean("up");DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));} catch (Exception e) {e.printStackTrace();}}}

Publisher:

import java.util.Hashtable;public class Publisher {protected int MAX_DELTA_PERCENT = 1;protected Map<String, Double> LAST_PRICES = new Hashtable<String, Double>();protected static int count = 10;protected static int total;protected static String brokerURL = "tcp://localhost:61616";protected static transient ConnectionFactory factory;protected transient Connection connection;protected transient Session session;protected transient MessageProducer producer;public Publisher() throws JMSException {factory = new ActiveMQConnectionFactory(brokerURL);connection = factory.createConnection();try {connection.start();} catch (JMSException jmse) {connection.close();throw jmse;}session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);producer = session.createProducer(null);}public void close() throws JMSException {if (connection != null) {connection.close();}}public static void main(String[] args) throws JMSException {Publisher publisher = new Publisher();while (total < 1000) {for (int i = 0; i < count; i++) {publisher.sendMessage(args);}total += count;System.out.println("Published '" + count + "' of '" + total + "' price messages");try {Thread.sleep(1000);} catch (InterruptedException x) {}}publisher.close();}protected void sendMessage(String[] stocks) throws JMSException {int idx = 0;while (true) {idx = (int)Math.round(stocks.length * Math.random());if (idx < stocks.length) {break;}}String stock = stocks[idx];// Topic和Queue都继承与Destination,也就这两种模式Destination destination = session.createTopic("STOCKS." + stock);Message message = createStockMessage(stock, session);System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destination);producer.send(destination, message);}protected Message createStockMessage(String stock, Session session) throws JMSException {Double value = LAST_PRICES.get(stock);if (value == null) {value = new Double(Math.random() * 100);}// lets mutate the value by some percentagedouble oldPrice = value.doubleValue();value = new Double(mutatePrice(oldPrice));LAST_PRICES.put(stock, value);double price = value.doubleValue();double offer = price * 1.001;boolean up = (price > oldPrice);MapMessage message = session.createMapMessage();message.setString("stock", stock);message.setDouble("price", price);message.setDouble("offer", offer);message.setBoolean("up", up);return message;}protected double mutatePrice(double price) {double percentChange = (2 * Math.random() * MAX_DELTA_PERCENT) - MAX_DELTA_PERCENT;return price * (100 + percentChange) / 100;}}

--------------------------

让我们继续前行

jms两种模式例子-超越昨天的自己系列(2)相关推荐

  1. Doctype? 严格模式与混杂模式-如何触发这两种模式,区分它们有何意义?

    严格模式与混杂模式--如何触发这两种模式,区分它们有何意义. 在标准模式中,浏览器根据规范呈现页面: 在混杂模式中,页面以一种比较宽松的向后兼容的方式显示. 浏览器根据DOCTYPE是否存在以及使用的 ...

  2. epoll的两种模式

    From: http://haoningabc.iteye.com/blog/1432958 linux异步IO浅析  http://hi.baidu.com/_kouu/blog/item/e225 ...

  3. RTI1.3时间管理支持的两种模式

    RTI 1.3目前支持两种模式,一种是轮询模式,一种是异步模式,无论在那种模式下LRC都要做大量工作,比如和其他的联邦成员的LRC交换信息,完成这些工作需要一定时间,tick()函数就是为了给RTI提 ...

  4. vue-router前端路由的两种模式的区别

    一.前端路由存在的意义 前端路由主要应用在spa项目中. 核心---在无刷新(不向后端发送请求)的情况下,可以根据不同url更改视图. 二.浏览器提供hash 和history 两种模式支持(可以说, ...

  5. 严格模式与混杂模式-如何触发这两种模式,区分它们有何意义

    Doctype:(Document Type)文档类型,它位于文档中最前面的位置,处于标签之前.如果你想制作符合标准的页面,一个必不可少的关键组成部分就是DOCTYPE的声明.确定了正确的Doctyp ...

  6. Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...

    目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...

  7. Epoll两种模式浅析(ET or LT)

    linux异步IO浅析  http://hi.baidu.com/_kouu/blog/item/e225f67b337841f42f73b341.html epoll有两种模式,Edge Trigg ...

  8. 2021年大数据Spark(九):Spark On Yarn两种模式总结

    目录 Spark On Yarn两种模式 引入 一.当一个MR应用提交运行到Hadoop YARN上时 二.当一个Spark应用提交运行在集群上时 注意 client 模式 cluster 模式 总结 ...

  9. 简述python中怎样导入模块_Python中导入模块的两种模式,import

    import import pandas import pandas as pd 使用函数方式:.(),或者.() 比如 pandas.read_csv("data/stock.csv&qu ...

  10. FTP的两种模式和在实际工作中应用

    FTP是一种文件传输协议,它支持两种模式,一种方式叫做Standard (也就是 Active,主动方式),一种是 Passive (也就是PASV,被动方式). Standard模式 FTP的客户端 ...

最新文章

  1. SVN更换修改用户名
  2. 揭秘PHP深受Web开发者喜爱的原因
  3. QCon北京2015:18个热门专题,出品人全部确认,新版网站上线
  4. mysql备份数据库命令
  5. 工作流集成第三方jsp表单
  6. redistemplate hash 过期时间_Redis开发 —— 过期消息通知实现(Springboot)
  7. matlab序列谱分析,基于MATLAB序列谱分析及FFT实现快速卷积.pdf
  8. ArrayList与LinkedList
  9. 看看高手做的ARM开发板
  10. 离屏渲染在车载导航中的应用
  11. java定时器 并发_【java多线程与并发库】— 定时器的应用 | 学步园
  12. Python 字符串和列表的转化 ,简单到尖叫
  13. raft算法_学习分布式一致性协议:自己实现一个Raft算法
  14. [IHS] No.2 程序员一生的读书计划
  15. 2016蓝桥杯C++A:快速排序(详解版,很快掌握)
  16. Atitit 减少财政支出----普通人如何蹭政府补贴措施 attilax大总结.docx
  17. 百度地图 baidu-map 地图根据范围画圈
  18. 软件测试周刊(第63期):凡是持久的,必是温和的与可持续的。
  19. Linux实战——Shell编程练习(更新12题)
  20. 网络安全笔记8——虚拟专网技术

热门文章

  1. Android 创建与解析XML(三)—— Sax方式
  2. HDU 6395 Sequence(分段矩阵快速幂)题解
  3. 团队作业4——第一次项目冲刺(Alpha版本)4.23
  4. HDU1847 博弈论 水题
  5. 为什么要使用多层开发?
  6. 【R图秀】情人节快乐!
  7. 权限系统(第一次测试)
  8. Microsoft.AlphaImageLoader滤镜解说
  9. Docker网络详解
  10. 在linux下将当前目录文件全部小写含目录名