简述

现在生产中用的最多的消息队列有Activemq,rabbitmq,kafka,rocketmq等。

JMS规范

rocketmq虽然不完全基于jms规范,但是他参考了jms规范和 CORBA Notification 规范等,
可以说是青出于蓝而胜于蓝,

什么是jms呢

   jms其实就是类似于jdbc的一套接口规范,但不同的是他是面向的消息服务,提供一套标准API接口,
大部分厂商都会参考jms规范,不过我们后面要讲到的rocketmq却没有严格遵守jms规范,后面我们会讲到。一些常见的jms厂商有:IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,
还有APACHE开源的ActiveMQ。这里面Activemq这个也是我接触到的第一个mq,现在市场份额也是很大的,
京东商城采用的就是这个。

基本概念

发送者( Sender)也就是消息的生产者,俗的将就是创建并发送消息的JMS客户端。接收者( Receiver)也就是消息消费者,接收订制消息的并按照相应的业务逻辑进行处理,最终将结果反馈给mq的服务端。点对点( Point-to-Point(P2P) )点对点就是一对一的关系,一个消息发出只有一个接受者所处理。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。发布订阅( Publish/Subscribe(Pub/Sub) )1、客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。消息队列(Queue)
一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,
消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。主题(Topic)一种支持发送消息给多个订阅者的机制。发布者(Publisher)
同生产者订阅者(Subscriber)
针对同一主题的多个消费者

对象模型

(1) ConnectionFactory创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种(很显然是基于点对点和和发布订阅的两种方式分别创建连接工厂的)。
可以通过JNDI来查找ConnectionFactory对象。(2) DestinationDestination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,
它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,
它的Destination也是某个队列或主题(即消息来源)。所以,
Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。(3) ConnectionConnection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。
Connection可以产生一个或多个Session。跟ConnectionFactory一样,
Connection也有两种类型:QueueConnection和TopicConnection。(4) SessionSession是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。
Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,
可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。(5) 消息的生产者消息生产者由Session创建,并用于将消息发送到Destination。
同样,消息生产者分两种类型:QueueSender和TopicPublisher。
可以调用消息生产者的方法(send或publish方法)发送消息。(6) 消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。
两种类型:QueueReceiver和TopicSubscriber。
可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。
当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。(7) MessageListener消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
我们后面消息消费还会看到。

消息消费

在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。○ 同步
订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞○ 异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

activemq的部分代码来简单说明一下上面说道的一些JMS规范

    public void init(){try {//创建一个链接工厂(用户名,密码,broker的url地址)connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);//从工厂中创建一个链接connection  = connectionFactory.createConnection();//开启链接connection.start();//创建一个会话session = connection.createSession(true,Session.SESSION_TRANSACTED);} catch (JMSException e) {e.printStackTrace();}}公共部分:也就是说不管你是消息的生产者还是消息的消费者都需要这些步骤
1.首先我们需要创建一个连接工厂,当然这里我们需要输入用户性和密码还有就是broker的url
2.然后我们根据连接工厂创建了一个连接,此刻这个工厂并没有和broker建立连接
3.调用start方法就和broker建立了连接,这里我大概解释一下broker
4.创建一个session,上面我们提到过所有的消息操作都是与session进行的public void sendMsg(String queueName){try {//创建一个消息队列(此处也就是在创建Destination)Queue queue = session.createQueue(queueName);//消息生产者MessageProducer messageProducer = null;if(threadLocal.get()!=null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while(true){Thread.sleep(1000);int num = count.getAndIncrement();//创建一条消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:生产消息,count:"+num);//发送消息messageProducer.send(msg);//提交事务session.commit();}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}

生产:配置完上面的公共部分我们就迫不及待的把消息生产出来吧,我这边说的是点对点的方式

1.通过session创建一个Destination,我这边直接就用了queue了
2.接下来我们需要创建一个消息的生产者
3.我这边就循环每1s发送一条消息
4.这边看到我们的消息也是用session来创建的,这里面我们用的是文本的消息类型
5.发送消息
6.提交这次发送,至此我们的消息就发送到了broker上了,用过activemq的同学都知道,
activemq提供了一个很好用的界面可以查到你的消息的状态,包括是否消费等消费:消费我们上面也提到了两种方式,同步和异步,我这边准备了两份代码分别说明了一下public void doMessage(String queueName){try {//创建DestinationQueue queue = session.createQueue(queueName);MessageConsumer consumer = null;while(true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if(msg!=null) {msg.acknowledge();System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());}else {break;}}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}

同步:可以看到消息会一直阻塞到有消息才会继续

分布式消息中间件-Rocketmq相关推荐

  1. 分布式消息中间件rocketmq的原理与实践

    RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.顺序消息 消息有序指的是 ...

  2. 消息中间件学习总结(2)——RocketMQ之阿里开源消息中间件RocketMQ的前世今生

    摘要: 昨天,我们将分布式消息中间件RocketMQ捐赠给了开源软件基金会Apache. 孵化成功后,RocketMQ或将成为国内首个互联网中间件在Apache上的顶级项目. 消息一出,本以为群众的反 ...

  3. 分布式消息中间件 : Rocketmq

    简述 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 分布式消息中间件,主要是实现分布式系统中解耦.异步消息.流量销锋.日志处理等场景.生产中用的最 ...

  4. 分布式消息中间件中的一些概念(接上一篇的《什么是分布式消息中间件?》)...

    接上一篇的<什么是分布式消息中间件?>,这一篇来介绍一下消息中间件相关的一些概念和专业术语. Topic 主题,从逻辑上讲一个Topic就是一个Queue,即一个队列:从存储上讲,一个To ...

  5. [RocketMQ]消息中间件—RocketMQ消息消费(一)

    2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...

  6. 消息中间件RocketMQ

    消息中间件RocketMQ   RocketMQ 是阿里巴巴开源的分布式消息中间件.支持事务消息.顺序消息.批量消息.延时消息.消息回溯等.它里面有几个区别于标准消息中件间的概念,如Group.Top ...

  7. 分布式消息中间件设计

    目录 1.什么是分布式消息中间件 1.1.单体架构 1.2.分布式系统架构 2.基于消息中间件的分布式系统架构 2.1.什么是消息中间件 2.2消息中间件概述 3.消息中间件的核心设计 3.1.本质 ...

  8. 分布式技术与实战第五课 分布式-消息中间件选型

    第26讲:消息队列有哪些应用场景? 分布式系统不同模块之间的通信,除了远程服务调用以外,消息中间件是另外一个重要的手段,在各种互联网系统设计中,消息队列有着广泛的应用.从本课时开始,专栏进入分布式消息 ...

  9. 开源软件成熟度评测报告-分布式消息中间件

    一.背景 随着互联网技术和金融科技的不断发展,从RPC到Web Service,从SOA的推行再到RESTful以及云计算中PaaS与SaaS的推广,分布式架构在金融企业中得到了广泛应用,消息中间件则 ...

最新文章

  1. iOS视频播放器之ZFPlayer剖析
  2. 大数据之MySql笔记-0916
  3. 车辆动力学及控制_第一届国际轮胎动力学仿真技术峰会在长春举行
  4. 20140704笔试面试总结(java)
  5. 手机中如何处理Excel格式转换PDF格式
  6. 【数学模型】基于Matlab实现洪水调度运算
  7. linux编译lame,linux 下安装lame以及tritonus-mp3enc
  8. 网站如何做域名转移?闲置域名要及时处理
  9. modelsim 常用快捷键
  10. 计算机截图工具无法运行,重装win7系统后打开截图工具显示“截图工具当前未在计算机上运行”如何解决...
  11. 2018省赛第九届蓝桥杯真题C语言B组第十题题解 乘积最大
  12. Git详细使用大全- rebase, merge, switch, cherry-pick, tag
  13. 机器自动翻译古文拼音 - 十大宋词 - 江城子·乙卯正月二十日夜记梦 苏轼
  14. 快速蒙版应用——撕纸效果
  15. 汇智动力——北大研究生的抉择!
  16. 无人机倾斜摄影和三维实景模型 实施流程
  17. 【C语言】——文件内容排序
  18. 构建一个简单的go-web镜像
  19. python--jupyter notebook 转化为PDF教程
  20. 7年测试工程师分享的20K的测试“卷王真经”

热门文章

  1. modbus-tcp qt4-socket ---------micro2440 as device
  2. Boost智能指针——weak_ptr
  3. 0301 - 一个比价的小项目
  4. 运营一个网站,新增加一个功能,容易忽视哪些问题就匆匆上线?
  5. Java Integer的缓存策略
  6. php7连接mongodb,批量添加数据
  7. Amdahl’s law (阿姆达尔定律)的演化和思考
  8. linux实例大全学习笔记1
  9. UBoot常用命令手册
  10. 剑道训练很有趣的一种手段