发布者和订阅者都充当 生产者和消费者

发布者

package publisher.to.subscriber;

import java.awt.font.TextMeasurer;

import javax.jms.Connection;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.command.ActiveMQTopic;

public class Publisher implements MessageListener {

/**

* @param args

* @throws JMSException

*/

public static void main(String[] args) throws JMSException {

// TODO Auto-generated method stub

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

// 第一个参数设置是否需要事务支持

Session session = connection.createSession(true,

Session.AUTO_ACKNOWLEDGE);

// 发送消息 而 订阅者接受消息必须有个topic 名字也叫 msg.send

Destination sendTopic = new ActiveMQTopic("msg.message");

// 接受消息 而 订阅者发送消息必须有个topic 名字也叫 msg.receive

Destination sendReceive = new ActiveMQTopic("msg.control");

MessageProducer producer = session.createProducer(sendTopic);

MessageConsumer consumer = session.createConsumer(sendReceive);

Rec rec = new Rec(consumer,session,connection,producer);

rec.start();

}

public void onMessage(Message msg) {

// TODO Auto-generated method stub

}

}

class Rec extends Thread {

private MessageConsumer consumer = null;

private Session session = null;

private Connection connection=null;

MessageProducer producer=null;

public Rec(MessageConsumer consumer, Session session,Connection connection,MessageProducer producer) {

this.consumer = consumer;

this.session = session;

this.connection=connection;

this.producer=producer;

}

@Override

public void run() {

// TODO Auto-generated method stub

while (true) {

try {

TextMessage smsg = session.createTextMessage("hello my name is liaomin");

producer.send(smsg);

session.commit();

TextMessage msg = (TextMessage) consumer.receive();

System.out.println(msg.getText());

// 接收时必须提交以下 否则会出现上次收到的旧数据

session.commit();

session.close();

connection.close();

break;

} catch (Exception e) {

}

}

}

}

订阅者

package publisher.to.subscriber;

import javax.jms.Connection;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.command.ActiveMQTopic;

public class Subscriber {

/**

* @param args

* @throws JMSException

*/

public static void main(String[] args) throws JMSException {

// TODO Auto-generated method stub

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

// 第一个参数设置是否需要事务支持

Session session = connection.createSession(true,

Session.AUTO_ACKNOWLEDGE);

//订阅者接受消息

Destination receiveTopic = new ActiveMQTopic("msg.message");

//订阅者发送消息

Destination sendTopic = new ActiveMQTopic("msg.control");

MessageProducer producer = session.createProducer(sendTopic);

MessageConsumer consumer = session.createConsumer(receiveTopic);

Rec1 rec = new Rec1(consumer,session,connection,producer);

rec.start();

}

public void onMessage(Message msg) {

// TODO Auto-generated method stub

}

}

class Rec1 extends Thread {

private MessageConsumer consumer = null;

private Session session = null;

private Connection connection;

private MessageProducer producer;

public Rec1(MessageConsumer consumer, Session session,Connection connection,MessageProducer producer) {

this.consumer = consumer;

this.session = session;

this.connection=connection;

this.producer=producer;

}

@Override

public void run() {

// TODO Auto-generated method stub

while (true) {

try {

TextMessage msg = (TextMessage) consumer.receive();

session.commit();

System.out.println(msg.getText());

TextMessage remsg = session.createTextMessage("ok receive");

producer.send(remsg);

// 接收时必须提交以下 否则会出现上次收到的旧数据

session.commit();

session.close();

connection.close();

break;

} catch (Exception e) {

}

}

}

}

先运行 订阅者 在运行 发布者

java publisher_Publisher/Subscriber(发布/订阅者)消息模式开发流程相关推荐

  1. kafka redis vs 发布订阅_发布订阅的消息系统 Kafka的深度解析

    背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐 ...

  2. 发布订阅的消息系统 Kafka的深度解析

    发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号: T | T 一个典型的kafka集群中包含若干produce ...

  3. uni-app微信小程序订阅消息功能开发(流程讲解篇)

    温馨提示 微信小程序中废弃了"模板消息",,微信小程序模板消息 使用场景 首先我们需要明白微信订阅消息使用场景,比如客户点了一份美团外卖客户需要知道当前订单商家是否接单,或订单是否 ...

  4. php网站怎么对接微信群,PHP对接微信公众平台消息接口开发流程教程

    PHP(外文名:PHP: Hypertext Preprocessor,中文名:"超文本预处理器")是一种通用开源脚本语言.语法吸收了C语言.Java和Perl的特点,利于学习,使 ...

  5. php对接微信提醒,PHP对接微信公众平台消息接口开发流程教程

    PHP对接微信公众平台消息接口开发流程教程 发布于 2015-02-15 08:54:13 | 157 次阅读 | 评论: 1 | 来源: 网友投递 PHP开源脚本语言PHP(外文名: Hyperte ...

  6. php微信公众号怎么开发_PHP对接微信公众平台消息接口开发流程详解及实例

    这篇文章主要介绍了PHP对接微信公众平台消息接口开发流程,如何使用PHP版接口操作公众平台消息,需要的朋友可以参考下 一.写好接口程序 在你的服务器上上传好一个接口程序文件内容如下: 代码如下:< ...

  7. Java中使用发布订阅模式

    发布订阅者模式 概述 使用意图 使用场景 与MVC模式之间的关系 逻辑方法展示 代码演示 概述 发布订阅者模式其实在意图上等同于观察者模式,但是在结构上又有所不同. 在意图上,两者都是为对象创建一对多 ...

  8. java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列

    发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...

  9. Redis的发布订阅(消息队列,比如ActiveMQ,一方得到数据后,多方得到信息)

    什么是发布订阅? 发布和订阅是进程间的一种消息通信模式:发送者(publisher)将消息发送给一个第三方,订阅者(subscriber)从第三方那里接收消息. 这个第三方我们通常称之为 消息中间件, ...

  10. redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池

    参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)- 项目启动环境 导入依赖 <parent><groupId>org.springframework ...

最新文章

  1. SpringBoot部署脚本,拿走即用!
  2. 运维自动化之ansible playbook安装apache
  3. boost::hana::drop_front用法的测试程序
  4. StringBuffer(字符串缓冲区)
  5. 前端三大技术 HTML、CSS、JavaScript 快速入门手册
  6. access mysql oracle数据库_Oracle Access 数据库连接 使用
  7. 第七章:跨程序共享数据-探究内容提供器
  8. 取字符串以逗号隔开的两个值
  9. 基础表比商户对账和汇总多了一笔退款数据
  10. 【Python】PIL库介绍
  11. 宿主机mac os无法连接到虚拟机centos
  12. Microsoft Visual Studio Ultimate 2012 ISO 映像
  13. 文本编辑器(Editor)and 文件上传功能
  14. linux设置汉语输入法,在Deepin系统下使用韩语(韩文)输入法的方法
  15. 打造属于自己的量化投资系统3——利用backtrader创建加权移动平均线策略
  16. N1 armbian cups安装hp m126a打印机
  17. 谷歌的天敌竟然是亚马逊:从开放7-Mic阵列授权说起
  18. 已解决报错UnboundLocalError: local variable ‘title‘ referenced before assignment
  19. Jest 组件库单元测试【基础语法篇】
  20. fullgc问题解决:Full GC (Metadata GC Threshold)

热门文章

  1. 卷积神经网络——Faster Rcnn中的anchor和Proposal
  2. Creator打字机效果
  3. ZYNQ系列(一) Petalinux建立工程
  4. 【最新版】贝塔智能挪车v2.5.2+前端-已测试
  5. html挪车隐藏手机,警告,千万不能留挪车电话!!!教你两招,这样做就可以挪车,避免麻烦...
  6. 剪刀石头布java流程图_青岛能源所基于“剪刀石头布”策略实现快速多轮基因编辑...
  7. SAP License:FI/CO模块常用表及事务代码
  8. iphone8进入恢复模式或DFU模式
  9. 微生活完成3200万元A轮融资,助企业快速搭建移动互联CRM
  10. DXP改变字体的方法