java publisher_Publisher/Subscriber(发布/订阅者)消息模式开发流程
发布者和订阅者都充当 生产者和消费者
发布者
package publisher.to.subscriber;
import java.awt.font.TextMeasurer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class Publisher implements MessageListener {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// TODO Auto-generated method stub
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
// 第一个参数设置是否需要事务支持
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
// 发送消息 而 订阅者接受消息必须有个topic 名字也叫 msg.send
Destination sendTopic = new ActiveMQTopic("msg.message");
// 接受消息 而 订阅者发送消息必须有个topic 名字也叫 msg.receive
Destination sendReceive = new ActiveMQTopic("msg.control");
MessageProducer producer = session.createProducer(sendTopic);
MessageConsumer consumer = session.createConsumer(sendReceive);
Rec rec = new Rec(consumer,session,connection,producer);
rec.start();
}
public void onMessage(Message msg) {
// TODO Auto-generated method stub
}
}
class Rec extends Thread {
private MessageConsumer consumer = null;
private Session session = null;
private Connection connection=null;
MessageProducer producer=null;
public Rec(MessageConsumer consumer, Session session,Connection connection,MessageProducer producer) {
this.consumer = consumer;
this.session = session;
this.connection=connection;
this.producer=producer;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
TextMessage smsg = session.createTextMessage("hello my name is liaomin");
producer.send(smsg);
session.commit();
TextMessage msg = (TextMessage) consumer.receive();
System.out.println(msg.getText());
// 接收时必须提交以下 否则会出现上次收到的旧数据
session.commit();
session.close();
connection.close();
break;
} catch (Exception e) {
}
}
}
}
订阅者
package publisher.to.subscriber;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class Subscriber {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// TODO Auto-generated method stub
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
// 第一个参数设置是否需要事务支持
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
//订阅者接受消息
Destination receiveTopic = new ActiveMQTopic("msg.message");
//订阅者发送消息
Destination sendTopic = new ActiveMQTopic("msg.control");
MessageProducer producer = session.createProducer(sendTopic);
MessageConsumer consumer = session.createConsumer(receiveTopic);
Rec1 rec = new Rec1(consumer,session,connection,producer);
rec.start();
}
public void onMessage(Message msg) {
// TODO Auto-generated method stub
}
}
class Rec1 extends Thread {
private MessageConsumer consumer = null;
private Session session = null;
private Connection connection;
private MessageProducer producer;
public Rec1(MessageConsumer consumer, Session session,Connection connection,MessageProducer producer) {
this.consumer = consumer;
this.session = session;
this.connection=connection;
this.producer=producer;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
TextMessage msg = (TextMessage) consumer.receive();
session.commit();
System.out.println(msg.getText());
TextMessage remsg = session.createTextMessage("ok receive");
producer.send(remsg);
// 接收时必须提交以下 否则会出现上次收到的旧数据
session.commit();
session.close();
connection.close();
break;
} catch (Exception e) {
}
}
}
}
先运行 订阅者 在运行 发布者
java publisher_Publisher/Subscriber(发布/订阅者)消息模式开发流程相关推荐
- kafka redis vs 发布订阅_发布订阅的消息系统 Kafka的深度解析
背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐 ...
- 发布订阅的消息系统 Kafka的深度解析
发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号: T | T 一个典型的kafka集群中包含若干produce ...
- uni-app微信小程序订阅消息功能开发(流程讲解篇)
温馨提示 微信小程序中废弃了"模板消息",,微信小程序模板消息 使用场景 首先我们需要明白微信订阅消息使用场景,比如客户点了一份美团外卖客户需要知道当前订单商家是否接单,或订单是否 ...
- php网站怎么对接微信群,PHP对接微信公众平台消息接口开发流程教程
PHP(外文名:PHP: Hypertext Preprocessor,中文名:"超文本预处理器")是一种通用开源脚本语言.语法吸收了C语言.Java和Perl的特点,利于学习,使 ...
- php对接微信提醒,PHP对接微信公众平台消息接口开发流程教程
PHP对接微信公众平台消息接口开发流程教程 发布于 2015-02-15 08:54:13 | 157 次阅读 | 评论: 1 | 来源: 网友投递 PHP开源脚本语言PHP(外文名: Hyperte ...
- php微信公众号怎么开发_PHP对接微信公众平台消息接口开发流程详解及实例
这篇文章主要介绍了PHP对接微信公众平台消息接口开发流程,如何使用PHP版接口操作公众平台消息,需要的朋友可以参考下 一.写好接口程序 在你的服务器上上传好一个接口程序文件内容如下: 代码如下:< ...
- Java中使用发布订阅模式
发布订阅者模式 概述 使用意图 使用场景 与MVC模式之间的关系 逻辑方法展示 代码演示 概述 发布订阅者模式其实在意图上等同于观察者模式,但是在结构上又有所不同. 在意图上,两者都是为对象创建一对多 ...
- java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列
发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...
- Redis的发布订阅(消息队列,比如ActiveMQ,一方得到数据后,多方得到信息)
什么是发布订阅? 发布和订阅是进程间的一种消息通信模式:发送者(publisher)将消息发送给一个第三方,订阅者(subscriber)从第三方那里接收消息. 这个第三方我们通常称之为 消息中间件, ...
- redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池
参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)- 项目启动环境 导入依赖 <parent><groupId>org.springframework ...
最新文章
- SpringBoot部署脚本,拿走即用!
- 运维自动化之ansible playbook安装apache
- boost::hana::drop_front用法的测试程序
- StringBuffer(字符串缓冲区)
- 前端三大技术 HTML、CSS、JavaScript 快速入门手册
- access mysql oracle数据库_Oracle Access 数据库连接 使用
- 第七章:跨程序共享数据-探究内容提供器
- 取字符串以逗号隔开的两个值
- 基础表比商户对账和汇总多了一笔退款数据
- 【Python】PIL库介绍
- 宿主机mac os无法连接到虚拟机centos
- Microsoft Visual Studio Ultimate 2012 ISO 映像
- 文本编辑器(Editor)and 文件上传功能
- linux设置汉语输入法,在Deepin系统下使用韩语(韩文)输入法的方法
- 打造属于自己的量化投资系统3——利用backtrader创建加权移动平均线策略
- N1 armbian cups安装hp m126a打印机
- 谷歌的天敌竟然是亚马逊:从开放7-Mic阵列授权说起
- 已解决报错UnboundLocalError: local variable ‘title‘ referenced before assignment
- Jest 组件库单元测试【基础语法篇】
- fullgc问题解决:Full GC (Metadata GC Threshold)
热门文章
- 卷积神经网络——Faster Rcnn中的anchor和Proposal
- Creator打字机效果
- ZYNQ系列(一) Petalinux建立工程
- 【最新版】贝塔智能挪车v2.5.2+前端-已测试
- html挪车隐藏手机,警告,千万不能留挪车电话!!!教你两招,这样做就可以挪车,避免麻烦...
- 剪刀石头布java流程图_青岛能源所基于“剪刀石头布”策略实现快速多轮基因编辑...
- SAP License:FI/CO模块常用表及事务代码
- iphone8进入恢复模式或DFU模式
- 微生活完成3200万元A轮融资,助企业快速搭建移动互联CRM
- DXP改变字体的方法