最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,

目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

比kafka还是有过之无不及,其实kafka文档很丰富

但RocketMQ网上的文章太少,找不到相关的操作教程

于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究

下载源码的地址 https://github.com/alibaba/RocketMQ/releases

  • 首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK

    <dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>3.2.6</version>
    </dependency>

Downloads

  • 11.3 MBalibaba-rocketmq-3.2.6.tar.gz
  • 2.46 MBalibaba-rocketmq-client-java-3.2.6.tar.gz
  • Source code (zip)
  • Source code (tar.gz)

在linux 下用wget 下载源码然后解压出来

在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"

可以 vi runserver.sh

分别给 mqnamesrv mqbroker play.sh 执行的权限

chmod +x  mqnamersrv

chmod +x  mqbroker

chmod +x  play.sh

下面红线框的这段 命令输入错误了,忽略不用看

通过 nohup sh mqnamesrv& 启动 RocketMq

目前没看到结束的命令,也没找到相关的介绍,

我这里用的 ps -ef|grep rocketmq  查到进程pid

然后kill pid号

或则pkill -9 java [慎用]

用jps -v 查看下java进程的参数

rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了

在防火墙配置里面加上 9876端口,设置iptables对外开放

部署Broker

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties &

这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip

Master和Slave的配置文件参考conf目录下的配置文件

Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

部署一Master一Slave,集群采用异步复制方式:

Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &

Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &

package com.pgsqlmybatis.common.rocketmq;/*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) {DefaultMQProducer producer = new DefaultMQProducer("Producer");producer.setNamesrvAddr("xxxxxxxxxx:9876");try {producer.start();String pushMsg="kafka activeMq rocketMq 消息队列使用1";Message msg = new Message("PushTopic","push","1",pushMsg.getBytes("UTF-8"));SendResult result = producer.send(msg);System.out.println("id:" + result.getMsgId() +" result:" + result.getSendStatus());String pushMsg2="海量级消息记录单机测试2";msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8"));result = producer.send(msg);System.out.println("id:" + result.getMsgId() +" result:" + result.getSendStatus());String pushMsg3="海量级消息记录单机测试3";msg = new Message("PullTopic","pull","1",pushMsg3.getBytes());result = producer.send(msg);System.out.println("id:" + result.getMsgId() +" result:" + result.getSendStatus());} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}}
}

  

启动生成者

启动消费者

package com.pgsqlmybatis.common.rocketmq;/*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/import java.io.UnsupportedEncodingException;
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args){DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("PushConsumer");consumer.setNamesrvAddr("xxxxxxxxxxxx:9876");try {consumer.subscribe("PushTopic", "push");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext Context) {Message msg = list.get(0);System.out.println(msg.toString());String recString= null;try {recString = new String(msg.getBody() ,"UTF-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println(recString);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} catch (Exception e) {e.printStackTrace();}}
}

  

以上为单机实例配置

如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^

参考:http://blog.csdn.net/a19881029/article/details/34446629

http://sofar.blog.51cto.com/353572/1540874

http://blog.csdn.net/loongshawn/article/details/51086876

RocketMq最佳实践

《RocketMQ原理简介》

分布式开放消息系统(RocketMQ)的原理与实践

《RocketMQ用户指南》

转载于:https://www.cnblogs.com/fangyuan303687320/p/5495481.html

RocketMq消息队列使用相关推荐

  1. RocketMQ 消息队列中丢失消息的场景举例及解决办法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图 ...

  2. Spring Cloud Alibaba 消息队列:基于 RocketMQ 实现服务异步通信

    本讲咱们将学习以下三方面内容: 介绍消息队列与 Alibaba RocketMQ: 掌握 RocketMQ 的部署方式: 讲解微服务接入 RocketMQ 的开发技巧: 首先咱们先来认识什么是消息队列 ...

  3. 消息队列之延时消息应用解析及实践

    简介:消息队列常用于实现业务需要的异步.解耦以及削峰功能.但在某些特殊的业务场景中,还需要消息队列服务本身支持一些特殊的消息类型,比如常见的延时消息.本次直播为您深入剖析延时消息的特性.应用场景,对比 ...

  4. RocketMq_02_消息队列及角色

    文章目录 RocketMq 消息队列介绍 消息中间件功能 应用解耦 流量削峰 大数据处理 异构系统 RocketMQ的角色 broker broker集群 producer consumer name ...

  5. 分布式消息队列基础知识

    本文主要整理消息队列的一些基本概念,为后面的Rocketmq消息队列组件深入学习打下基础. 一.什么是消息队列? 维基百科介绍:消息队列(Message Queue)是一种进程间通信或同一进程的不同线 ...

  6. RabbitMQ消息队列常见面试题总结

    1.什么是消息队列: 1.1.消息队列的优点: (1)解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的.消息 ...

  7. 从源码分析RocketMQ系列-RocketMQ消息设计详解

    1 消息存储   消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...

  8. 面试总结(四):消息队列

    问题导读: 1.什么是异步处理? 2.P2P的特点是什么? 3.如何防止消息丢失? 二.消息队列 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题 实现高性能,高可用 ...

  9. 17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列

    点击上方"方志朋",选择"置顶公众号" 技术文章第一时间送达! 作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/qu ...

最新文章

  1. 17DOM之操作元素
  2. 表格转化为Latex代码
  3. python 标准差_标准差python
  4. 谈“80后”程序员为什么找不到工作? [转]
  5. sql.xml大于小于号处理的方法
  6. 成本管控难题怎么破?BI大神带你一步步拆解分析,节省成本390万
  7. .net source code download
  8. Oracle 中给表添加主键、外键
  9. django urls路由匹配分发
  10. linux malloc和free解析
  11. Paip. DDBS 分布式 数据库系统 attilax总结C0G
  12. Matlab学习第一部分:基础知识
  13. 《啊哈算法》知识点总结
  14. 用python实现猜数字游戏
  15. Scale和Resolution的相互转换算法
  16. 以下描述中最不适合用计算机编程来处理的是,以下描述中最适合用计算机编程来处理的问题是( ?)。...
  17. Deploying JRE (Native Plug-in) for Windows Clients in Oracle E-Business Suite 11i (文档 ID 290807.1)
  18. 逻辑公式之吸收律理解
  19. switchport trunk native 的原理与作用
  20. SpringBoot初始化过程核心源码剖析

热门文章

  1. Boost:等待和通知操作的模糊测试
  2. Boost:基于Boost的管道pipeline通信
  3. ITK:在灰度图像中标记连接的组件
  4. ITK:比较两个图像并将输出像素设置为最大
  5. VTK:可视化算法之CutStructuredGrid
  6. VTK:模型之ContourTriangulator
  7. C语言递归方式实现冒泡排序(bubble排序)算法(附完整源码)
  8. c++如何定义一个只能在堆上(栈上)生成对象的类?
  9. QT的QGraphicsProxyWidget类的使用
  10. STL常用的集合算法