为什么80%的码农都做不了架构师?>>>   

背景

最近公司的终端机需要接通推送,然后查看对比了一下,觉得rocketMq更合适一点,所以选择了rocketMq进行测试。

软件清单

a、git(yum intsall git);b、jdk1.8; c、maven; d、rocketmq

操作步骤

1、首先下载rocketMq3.5.8

2、tar -zxvf rocketmq-3.5.8.tar.gz

3、进入rocketmq-3.5.8

4、sh install.sh,初始化rocketmq(这是初始化和下载东西,是必须项目)

5、下载完毕,执行 cd devenv(进入这个文件夹devenv)

6、编写rocket环境 (1)"ROCKETMQ_HOME=pwd" >> ~/.bash_profile

(2)source ~/.bash_profile

7、修改内存大小bin文件夹下面的runserver.sh文件夹的内存,从4G修改成128m和256m(因为我的测试服务器内存不大)

8、修改内存大小bin文件夹下面的runbroker.sh件夹的内存,从4G修改成128m和256m(因为我的测试服务器内存不大) 9、初始化并且授予权限

(1)cd /usr/local/rocketmq/rocketmq3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/bin

(2)授予执行权限 chmod +x mqadmin mqbroker mqfiltersrv mqnamesrv mqshutdown

10、运行namesrv服务(同时将日志归类)

(1)nohup mqnamesrv 1>/usr/local/rocketmq/rocketmq3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/log/ng.log 2>/usr/local/rocketmq/rocketmq3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/log/ng-err.log &

(2)查看ng.log,如果出现 The Name Server boot success. serializeType=JSON,则表示nameServer开启成功

11、启动mqbroker

(1)nohup sh mqbroker -n ip:9876 autoCreateTopicEnable=true > /usr/local/rocketmq/rocketmq3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/log/broker.log 2>&1 &

(2)查看broker.log,如果出现 The Name Server boot success. serializeType=JSON,则表示nameServer开启成功

12、查看是否运行成功 ps aux | grep java

如果代码里面生产者和消费者加上这句话,就走端口10909,不加就走vip的10911,看服务器日志启动哪个端口而定加不加 producer.setVipChannelEnabled(false);

代码区

1、消费者代码


package cn.shopin.ssm.testRocketMq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;/*** Created by Administrator on 2017/12/26.*/
public class ConsumerTest01 {/*** 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>*/public static void main(String[] args) {/*** 注意:ConsumerGroupName需要由应用来保证唯一*/DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");//pushConsumer.setNamesrvAddr("192.168.180.1:9876");pushConsumer.setNamesrvAddr("ip:9876");pushConsumer.setInstanceName("Consumer");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);try {/*** 订阅指定topic下tags分别等于TagA或TagC或TagD* 两个参数:第一个参数是topic第二个参数是tags*/pushConsumer.subscribe("TopicTest1", "TagA || TagC || TagD");pushConsumer.subscribe("TopicTest2", "*");pushConsumer.subscribe("TopicTest3", "*");/*** 订阅指定topic下所有消息<br>* 注意:一个consumer对象可以订阅多个topic*///pushConsumer.subscribe("TopicTest2", "*");final AtomicInteger count = new AtomicInteger();pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());MessageExt messageExt = msgs.get(0);count.getAndIncrement();if ("TopicTest1".equals(messageExt.getTopic())) {// 执行TopicTest1的消费逻辑if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {// 执行TagA的消费System.out.println("TopicTest1的TagA:"+new String(messageExt.getBody()));} else if (messageExt.getTags() != null && messageExt.getTags().equals("TagB")) {System.out.println("TopicTest1的TagB:"+new String(messageExt.getBody()));} else if (messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {System.out.println("TopicTest1的TagC:"+new String(messageExt.getBody()));}} else if ("TopicTest2".equals(messageExt.getTopic())) {// 执行TopicTest1的消费逻辑if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {// 执行TagA的消费System.out.println("TopicTest2的TagA:"+new String(messageExt.getBody()));} else if (messageExt.getTags() != null && messageExt.getTags().equals("TagB")) {System.out.println("TopicTest2的TagB:"+new String(messageExt.getBody()));} else if (messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {System.out.println("TopicTest2的TagC:"+new String(messageExt.getBody()));}} else if ("TopicTest3".equals(messageExt.getTopic())) {// 执行TopicTest1的消费逻辑if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {// 执行TagA的消费System.out.println("TopicTest3的TagA:"+new String(messageExt.getBody()));} else if (messageExt.getTags() != null && messageExt.getTags().equals("TagB")) {System.out.println("TopicTest3的TagB:"+new String(messageExt.getBody()));} else if (messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {System.out.println("TopicTest3的TagC:"+new String(messageExt.getBody()));}}System.out.println("当前的count的值为:"+count);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});} catch (MQClientException e) {e.printStackTrace();}/*** Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>*/try {pushConsumer.start();} catch (MQClientException e) {e.printStackTrace();}System.out.println("Consumer Started.");}
}

2、生产者代码

package cn.shopin.ssm.testRocketMq;import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;import java.util.concurrent.TimeUnit;/*** Created by Administrator on 2017/12/26.*/
public class ProducerTest01 {public static void main(String[] args) {/*** 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>* 注意:ProducerGroupName需要由应用来保证唯一<br>* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,* 因为服务器会回查这个Group下的任意一个Producer*/DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//producer.setNamesrvAddr("192.168.180.1:9876");producer.setNamesrvAddr("ip:9876");producer.setInstanceName("Producer");//producer.setVipChannelEnabled(false);/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>* 注意:切记不可以在每次发送消息时,都调用start方法*/try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 10; i++) {try {/*** 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。*/{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID001",// key("按一下").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println("按一下:"+sendResult);}{Message msg = new Message("TopicTest2","TagB","OrderID001",("Hello MetaQ TagB".getBytes()));SendResult sendResult = producer.send(msg);System.out.println("Hello MetaQ TagB"+sendResult);}{Message msg = new Message("TopicTest3","TagC","OrderID001",("Hello MetaQ TagC").getBytes());SendResult sendResult = producer.send(msg);System.out.println("Hello MetaQ TagC"+sendResult);}TimeUnit.MILLISECONDS.sleep(1000);} catch (MQClientException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();}}/*** 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己* 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法*/producer.shutdown();}
}

转载于:https://my.oschina.net/grkj/blog/1596142

Linux的centos7.2部署rocketMq3.5.8相关推荐

  1. linux(centos7)部署kubernetes(k8s 1.16.2)集群环境及测试

    k8s作为容器集群管理系统有着明显的优势,比如动态扩容/缩容. 1. 准备环境 最基本的集群需要三个节点,在三个节点上都安装k8s Node,在其中一个节点上安装Master. 操作系统 IP hos ...

  2. linux 生成密码本,Linux下CentOS7使用OTPW实现双因子密码本登录

    otpw优点: 1.前缀密码+一次性随机码,相当于双重加密.就算随机码列表泄露,没有前缀密码也是无法登录. 2.如果一次没登录成功,会启用三重随机码. 3.用户目录下保存密码的文件可通用,适合批量部署 ...

  3. Centos7安装部署BookStack

    Centos7安装部署BookStack 参考文章链接: 1.安装epel-release 2.安装nginx 3.下载php-fpm以及所需依赖组件 4.配置PHP 5.更改php-fpm配置文件 ...

  4. 新装linux系统(centOs7)使用nginx驱动vue项目

    新装linux系统(centOs7)使用nginx驱动vue项目 配置linux环境 centos安装nginx 安装nginx前首先要确认系统中安装了gcc.pcre-devel.zlib-deve ...

  5. Centos7安装部署免费confluence wiki

    Confluence是一个专业的企业知识管理与协同软件, 也可以用于构建企业wiki.使用简单, 但它强大的编辑和站点管理特征能够帮助团队成员之间共享信息. 文档协作.集体讨论,信息推送. Cento ...

  6. Linux(Centos7)系统基本操作

    Linux(Centos7)系统基本操作 目录结构 - bin 普通用户使用的命令 - sbin 管理员使用的命令 - dev 设备文件 - proc 虚拟文件系统,反映内核进程信息实时状态 - us ...

  7. CentOS7下部署CDH5.9(HA)

    CentOS7下部署CDH5.9(HA) 文件下载 1.Cloudera Manager http://archive.cloudera.com/cm5/cm/5/cloudera-manager-c ...

  8. 记一次 Centos7 安装部署 gogs-v0.12.1

    一.基本信息 Gogs 的目标是打造一个最简单.最快速和最轻松的方式搭建自助 Git 服务.使用 Go 语言开发使得 Gogs 能够通过独立的二进制分发,并且支持 Go 语言支持的 所有平台,包括 L ...

  9. Linux(CentOS7)在VMware上的安装以及认识操作系统

    Linux操作系统实战之Day 01 认识操作系统 一.操作系统概述 1.计算机发展史 第一台计算机是1946 年2 月14 日诞生日,第一台名称ENIAC.体积一间屋子的大小,重量高达28t. 第一 ...

最新文章

  1. java实现redis缓存_java实现redis缓存功能
  2. iOS多线程之7.NSOperation的初识
  3. JAVA从下载到浏览器运行完整篇,写给lewis
  4. python实现文件上传功能_python实现上传下载文件功能
  5. 配置apache虚拟主机
  6. mysql高级查询 二_MySQL高级查询(二)
  7. Selenium介绍
  8. 安装了Node.js 从VScode 使用node -v 和 npm -v等命令却无效
  9. 快手上市首日涨近161% 两大创始人身家破千亿
  10. ofdm误码率仿真代码_共享源代码之MSK仿真(1)
  11. 一张思维导图完成淘宝精细化运营
  12. 黑客+马拉松=? 黑客马拉松?
  13. 入侵检测领域数据集总结
  14. 华为上机题之Word Maze(单词迷宫)
  15. 【违规举报】违规举报方法步骤
  16. android系统最近删除照片,安卓手机最近删除的照片怎么恢复?专家教你这样做...
  17. wps合并所有sheet页_WPS怎么把几个工作表合并到一个工作表 - 卡饭网
  18. 切比雪夫------切比雪夫不等式
  19. 【思维导图】冒险岛职业分类(按职业系、职业群分类)
  20. Cheat Engine 的使用 一

热门文章

  1. git-svn:通过git来管理svn代码
  2. C#获得枚举类型的长度
  3. jQueryDOM操作
  4. 喜得千金,升级做爸爸喽
  5. 铁路网上购票需要完善但值得鼓励
  6. 企业级工作流解决方案(十五)--集成Abp和ng-alain--Abp其他改造
  7. NUC1003 Hangover
  8. SecureCRT 绝佳配色方案, 保护你的眼睛
  9. 艾伟_转载:.NET 4.0中数组的新增功能
  10. DEDECMS v5.5 GBK Final 的一个鸡肋漏洞