一:RocketMQ简介

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

1.能够保证严格的消息顺序

2.提供丰富的消息拉取模式

3.高效的订阅者水平扩展能力

4.实时的消息订阅机制

5.亿级消息堆积能力

二:安装RocketMQ

下载源码

首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为:https://github.com/alibaba/RocketMQ/releases 或者 wget  https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz。请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。

编译源码

在进行编译源码之前我们需要安装JDK。如果你已经安装过了,请跳过这里。如果你还没有安装过JDK,请参考这篇文章(Linux环境下安装JDK)。然后我们还需要安装一下Maven。Maven的安装还是比较简单,只需要去官方上下载的安装吧,然后直接解压,再配置一下环境变量就OK。接下来我们把刚才下载来的RockeMQ的源码解压到/usr/local/rockemq-source文件夹中。在源码中有一个Install.sh。如图所示:
。运行sh install.sh。在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/rocketmq中。如果你不想编译的话,可以从这里下载编译之后的rocketmq。(rocketmq3.5.8)。

配置环境变量

接下来我们需要配置一下环境变量。在终端中输入以下命令:vi /etc/profile ,在文件的末尾中添加如下两句话:export rocketmq=/usr/local/rocketmq  export PATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source /etc/profile.

三:启动RocketMQ

接下来我们启动一下刚才编译的RocketMQ.在启动之前我们需要修改一下RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略)。我们进入到/usr/local/rocketmq/bin中,在终端中输入以下命令修改mqnamesrv的内存大小:vi runserver.sh.修改为如图的内容:
,接下来修改broker的内存大小:vi runbroker.sh:

启动mqnameserver

进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最后的这个 & 不要少。

启动mqbroker

进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以换成你刚才启动mqnamesrv的IP。autoCreateTopicEnable=true 这句话不要少了。最后的 & 也不要少了。
我们可以通过 ps aux | grep java命令来查看启动的情况。
到此,rocketmq的安装完毕。
四:RocketMQ的小例子
producer:
package com.zkn.newlearn.rocketmq;import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;import java.util.concurrent.TimeUnit;/*** Created by zkn on 2016/10/27.*/
public class ProducerTest01 {public static void main(String[] args) {/*** 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>* 注意:ProducerGroupName需要由应用来保证唯一<br>* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,* 因为服务器会回查这个Group下的任意一个Producer*/DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//producer.setNamesrvAddr("192.168.180.1:9876");producer.setNamesrvAddr("192.168.180.133:9876");producer.setInstanceName("Producer");/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>* 注意:切记不可以在每次发送消息时,都调用start方法*/try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 100; i++) {try {/*** 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。*/{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID001",// key("Hello MetaQ").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);}{Message msg = new Message("TopicTest2","TagB","OrderID001",("Hello MetaQ TagB".getBytes()));SendResult sendResult = producer.send(msg);System.out.println(sendResult);}{Message msg = new Message("TopicTest3","TagC","OrderID001",("Hello MetaQ TagC").getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult);}TimeUnit.MILLISECONDS.sleep(1000);} catch (MQClientException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();}}/*** 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己* 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法*/producer.shutdown();}
}

consumer:

package com.zkn.newlearn.rocketmq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;/*** Created by zkn on 2016/10/27.*/
public class ConsumerTest01 {/*** 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>*/public static void main(String[] args) {/*** 注意:ConsumerGroupName需要由应用来保证唯一*/DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");//pushConsumer.setNamesrvAddr("192.168.180.1:9876");pushConsumer.setNamesrvAddr("192.168.180.133:9876");pushConsumer.setInstanceName("Consumer");try {/*** 订阅指定topic下tags分别等于TagA或TagC或TagD* 两个参数:第一个参数是topic第二个参数是tags*/pushConsumer.subscribe("TopicTest1", "TagA || TagC || TagD");/*** 订阅指定topic下所有消息<br>* 注意:一个consumer对象可以订阅多个topic*///pushConsumer.subscribe("TopicTest2", "*");pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());MessageExt messageExt = msgs.get(0);if("TopicTest1".equals(messageExt.getTopic())){// 执行TopicTest1的消费逻辑if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {// 执行TagA的消费System.out.println(new String(messageExt.getBody()));}else if(messageExt.getTags() != null && messageExt.getTags().equals("TagB")){System.out.println(new String(messageExt.getBody()));}else if(messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {System.out.println(new String(messageExt.getBody()));}}else if("TopicTest2".equals(messageExt.getTopic())){System.out.println(new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});} catch (MQClientException e) {e.printStackTrace();}/*** Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>*/try {pushConsumer.start();} catch (MQClientException e) {e.printStackTrace();}System.out.println("Consumer Started.");}
}
package com.zkn.newlearn.rocketmq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;/*** Created by zkn on 2016/10/30.*/
public class ConsumerTest02 extends ConsumerTest01 {public static void main(String[] args) {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");//pushConsumer.setNamesrvAddr("192.168.180.1:9876");pushConsumer.setNamesrvAddr("192.168.180.133:9876");pushConsumer.setInstanceName("Consumer");/*** 订阅指定topic下所有消息<br>* 注意:一个consumer对象可以订阅多个topic*/try {pushConsumer.subscribe("TopicTest2", "*");pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);if("TopicTest2".equals(messageExt.getTopic())){System.out.println(new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});} catch (MQClientException e) {e.printStackTrace();}try {pushConsumer.start();} catch (MQClientException e) {e.printStackTrace();}}
}

Linux环境下安装RocketMQ(MetaQ)相关推荐

  1. Linux环境下安装RocketMQ

    文章目录 简介 前言 一.Linux安装JDK 1.下载连接 2.配置jdk环境 解压jdk文件 配置jdk环境变量 测试jdk环境是否生效 二.安装RocketMQ 1.下载rocketmq安装包 ...

  2. Linux环境下安装nginx

    大家好,本篇文章主要讲的是Linux环境下安装nginx教程,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览 1.安装所需环境 //安装gcc yum install gcc-c ...

  3. 尚学python课程---11、linux环境下安装python注意

    尚学python课程---11.linux环境下安装python注意 一.总结 一句话总结: 准备安装依赖包:zlib.openssl:yum install zlib* openssl*:pytho ...

  4. Linux环境下安装Tigase XMPP Server

    Tigase是一种XMPP服务器,可以作为采用XMPP协议的各种IM(Instant Messeging)工具(如Pandion.Spark等)的服务器. 在Linux环境下安装Tigase的步骤如下 ...

  5. linux环境下安装多个任意版本的python环境

    linux环境下安装多个任意版本的python环境 安装方法可以归结为 [下载 + 编译 + 配置环境变量] 下载Python Windows下载发送到linux上 step1. 下载linux版本的 ...

  6. Linux环境下安装Mysql5.7

    本文记录下我近期在Linux环境下安装Mysql5.7的实践经历. 服务器版本 Mysql版本 Centos 7.6 5.7.32 1. 下载Mysql 下载地址:https://downloads. ...

  7. Linux环境下安装单实例MySQL 5.7

    一.安装环境 1.操作系统版本:CentOS 7.5 2.MySQL版本:5.7.22(社区版) 3.MySQL安装包:mysql-5.7.22-linux-glibc2.12-x86_64.tar. ...

  8. Linux 环境下安装 GitLab 与配置

    什么是 GitLab? GitLab 是一个用于仓库管理系统的开源项目,使用 Git 作为代码管理工具,并在此基础上搭建起来的 web 服务. GitLab的功能特点 提供了管理,计划,创建,验证,打 ...

  9. 服务器Linux环境下安装Matlab2018b

    服务器Linux环境下安装Matlab2018b 一.下载Linux版本Matlab2018b 二.上传Matlab2018b镜像 三.安装Matlab2018b 四.激活Matlab 一.下载Lin ...

最新文章

  1. TensorFlow中的random_normal()函数
  2. 剑指offer-11.数值的整数次方实现power
  3. [ios2]ios系统中各种设置项的url链接
  4. 《MySQL——恢复数据-误删行、表、库》
  5. python判断语句入门教程_Python中的条件判断语句基础学习教程
  6. mavon-editor文本编辑器初体验(一)
  7. Javascript第六章JavaScript字面量加数组创建对象第三课
  8. 【丐中丐】废旧光驱改装激光雕刻机
  9. CSS进阶(15)—— CSS世界的层叠规则(上)
  10. mac+微信打开连接到服务器,MAC OS系统 ,微信接收到的文件,打开wo… - Apple 社区...
  11. Web前端:什么是前端框架?
  12. springboot从零开始,快捷键 快捷操作
  13. 第二证券|钠电池三种技术路线谁更将率先取代锂电池?
  14. Java——数据类型
  15. Linux 磁盘合并挂载
  16. 高斯函数半高宽FWHM、拐点差值绝对值一半以及标准差σ的关系
  17. GraalVM Native Image
  18. SQL Server存储过程调用WebService
  19. matlab 混合高斯背景建模的实现
  20. 《计算之魂》--- 思考题0.3 【读书笔记】

热门文章

  1. 如何解决ABBYY FineReader中表格检测不到问题
  2. 统计学习方法 李航---第7章 支持向量机
  3. kickstart及引导镜像制作
  4. wordpress 伪静态
  5. 模板技巧之:费用科目条件过滤
  6. Cisco C2960 升级IOS
  7. sql语句练习(三):LeetCode
  8. ShopEx 中规格属性增加时,自动计算其对应的销售价格,同时注意模板中的变量间的计算
  9. C语言经典算法100例
  10. echo count(“abc”); 输出什么?