文章目录

  • RocketMQ集群基本信息
  • 目标
  • 知识预习
    • 发送方式
    • 发送结果
  • 环境搭建
  • 使用Java API操作RocketMQ—Simple Message
    • Step1. pom.xml增加依赖和bulid 插件
    • Step2.日志文件 logback.xml
    • 发送同步消息
    • 发送异步消息
    • 发送one way 消息
    • 消费消息
      • push模式
      • pull模式
  • 结构变化
  • 代码
  • 更多示例


RocketMQ集群基本信息

右侧的部署模式 ,双机互为主备

如何搭建的,请移步: RocketMQ-初体验RocketMQ(03)_RocketMQ多机集群部署

为了更好的观察RocketMQ在我们发送和消费消息的过程中,给我们产生了什么样的文件,我们把 storePathRootDir 和 storePathCommitLog 自定义到一个新的目录下


130 节点的 broker-m.conf 和 broker-s.conf中的 storePathRootDir 和 storePathCommitlog配置

一个broker节点对应一个commitlog, 所以130主机的 master broker 和 slave broker 会对应两个存储路径和两个commitlog . 注意看存储路径

broker-m.conf

broker-s.conf

启动 130的 namesrv 和 master broker 、 slave broker 节点,观察我们配置的
storePathRootDir 和 storePathCommitlog

同样的 也把131 修改下


目标

1. 使用RocketMQ 发送3种类型的消息: reliable synchronous、 reliable asynchronous、one-way transmission

2. 使用RocketMQ消费消息


知识预习

发送方式

在Producer端

org.apache.rocketmq.client.impl.CommunicationMode 枚举类中定义了3种发送方式

public enum CommunicationMode {SYNC,ASYNC,ONEWAY,
}
  • Sync:同步的发送方式,会等待发送结果后才返回
  • Async:异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 可以指定 Timeout。默认的 3000ms.
  • Oneway:发出去后,什么都不管直接返回

发送结果

org.apache.rocketmq.client.produce.SendStatus 枚举类中定义了如下4种发送结果

package org.apache.rocketmq.client.producer;public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}

1. SEND_OK : 消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或 SYNC_FLUSH。

2. FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。

3. FLUSH_SLAVE_TIMEOUT :消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时

4. SLAVE_NOT_AVAILABLE:消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slaveBroker服务器,则将返回该状态——无Slave服务器可用。


环境搭建

移步 : IDEA-使用IDEA创建maven多模块父子工程


使用Java API操作RocketMQ—Simple Message

官方指导: 戳这里

Step1. pom.xml增加依赖和bulid 插件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmqMaster</artifactId><groupId>com.artisan</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rocketmq_base</artifactId><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency></dependencies><build><pluginManagement><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>utf8</encoding><compilerArgument>-XDignore.symbol.file=true -Xlint</compilerArgument><testCompilerArgument>-XDignore.symbol.file=true -Xlint</testCompilerArgument></configuration></plugin></plugins></pluginManagement></build></project>

Step2.日志文件 logback.xml

<configuration><!-- 应用名称 --><property name="APP_NAME" value="rocketmq_base" /><!--日志文件的保存路径,首先查找系统属性-Dlog.dir,如果存在就使用其;否则,在当前目录下创建名为logs目录做日志存放的目录 --><property name="LOG_HOME" value="${log.dir:-logs}/${APP_NAME}" /><!-- 日志输出格式 --><property name="ENCODER_PATTERN"value="%d{yyyy-MM-dd  HH:mm:ss.SSS} [%thread] %-5level %logger{80} - %msg%n" /><contextName>${APP_NAME}</contextName><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${ENCODER_PATTERN}</pattern></encoder></appender><!-- 文件日志:输出全部日志到文件 --><appender name="FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_HOME}/output.%d{yyyy-MM-dd}.log</fileNamePattern><maxHistory>7</maxHistory></rollingPolicy><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><pattern>${ENCODER_PATTERN}</pattern></encoder></appender><!-- 错误日志:用于将错误日志输出到独立文件 --><appender name="ERROR_FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}.log</fileNamePattern><maxHistory>7</maxHistory></rollingPolicy><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><pattern>${ENCODER_PATTERN}</pattern></encoder><filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>WARN</level></filter></appender><!-- 独立输出的同步日志 --><appender name="SYNC_FILE"  class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_HOME}/sync.%d{yyyy-MM-dd}.log</fileNamePattern><maxHistory>7</maxHistory></rollingPolicy><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><pattern>${ENCODER_PATTERN}</pattern></encoder></appender><logger name="rocketmq_base" level="ERROR" addtivity="true"><appender-ref ref="SYNC_FILE" /></logger><root level="ERROR"><appender-ref ref="STDOUT" /><appender-ref ref="FILE" /><appender-ref ref="ERROR_FILE" /></root>
</configuration>

发送同步消息

package com.artisan.rocketmq.simple.producer;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;/*** @author 小工匠* @version v1.0* @create 2019-11-10 1:46* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description 同步发送消息**/public class SyncProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("Artisan_ProducerGroup");// Specify name server addresses.producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");// 设置超时时间,默认3秒producer.setSendMsgTimeout(10_000);//Launch the instance.producer.start();
//        for (int i = 0; i < 100; i++) {//            //Create a message instance, specifying topic, tag and message body.
//            Message msg = new Message("TopicArtisan" /* Topic */,
//                    "TagArtisan" /* Tag */,
//                    ("Artisan:Hello RocketMQ  " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
//            );
//            //Call send message to deliver message to one of brokers.
//            SendResult sendResult = producer.send(msg);
//            System.out.printf("%s%n", sendResult);
//        }//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicArtisan" /* Topic */,"TagArtisan" /* Tag */,("Artisan:Hello RocketMQ  ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one of brokers.SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

如果超时的话,设置下超时时间 producer.setSendMsgTimeout(10_000);

发送一条消息

返回:

SendResult [sendStatus=SEND_OK, msgId=C0A81F891D2418B4AAC230A647AD0000, offsetMsgId=C0A8128300002A9F00000000000025B0, messageQueue=MessageQueue [topic=TopicArtisan, brokerName=broker‐b, queueId=3], queueOffset=13]

可知 发送到了 集群中的 节点 的queueId=3 第四个队列里去了

到控制台根据msgId查看一下


发送异步消息

package com.artisan.rocketmq.simple.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version v1.0* @create 2019-11-10 12:18* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description 异步消息**/public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("Artisan_ProducerGroup");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");//设置发送失败重试机制producer.setRetryTimesWhenSendAsyncFailed(5);producer.start();int messageCount = 1;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {final int index = i;Message msg = new Message("TopicAsyn","TagAsyn","OrderID188","I m sending msg content xxx".getBytes(RemotingHelper.DEFAULT_CHARSET));//消息发送成功后,执行回调函数producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}

日志:

控制台查询


发送one way 消息

package com.artisan.rocketmq.simple.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-10 12:45* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OnewayProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");// Specify name server addresses.producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.setSendMsgTimeout(10000);producer.start();for (int i = 0; i < 1; i++) {Message msg = new Message("TopicOneWay" /* Topic */,"TagSendOne" /* Tag */,"OrderID198",("Hello RocketMQ test i " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);producer.sendOneway(msg);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

消费消息

push模式

package com.artisan.rocketmq.simple.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 12:49* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Artisan_ProducerGroup");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicAsyn", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

pull模式

package com.artisan.rocketmq.simple.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;/*** @author 小工匠* @version v1.0* @create 2019-11-10 13:51* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class PullConsumer {private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("tl_message_group");consumer.setNamesrvAddr("192.168.241.198:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicStudent");for (MessageQueue mq : mqs) {System.err.println("Consume from the queue: " + mq);SINGLE_MQ:while (true) try {PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.println(pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:List<MessageExt> messageExtList = pullResult.getMsgFoundList();for (MessageExt m : messageExtList) {System.out.println(new String(m.getBody()));}break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offsetTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offsetTable.get(mq);if (offset != null)return offset;return 0;}
}

结构变化

自动创建了commitlog目录,其中目录下的文件固定为 1024M

自动创建了 index 索引目录

自动创建consumequeue目录


代码

请移步:https://github.com/yangshangwei/rocketmqMaster


更多示例

RocketMQ Gihub官网example工程 :戳这里

RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构相关推荐

  1. rocketmq 初体验(二)AsyncProducer No name server address, please set it.

    AsyncProducer No name server address, please set it. 报错log No name server address, please set it. 原因 ...

  2. 全文搜索引擎Elasticsearch的初体验:基本概念和操作

    一.简介 关于Java Web的开发周边技术,搜索引擎也是经常被用到的,其中solr和es是被当作技术选型经常出现的,他们都是基于lucene,但是,你没法直接用 Lucene,必须自己写代码去调用它 ...

  3. 微信小程序初体验-苏州实时公交API

    利用聚合数据API快速写出小程序,过程简单. 1.申请小程序账号 2.进入开发 3.调用API.比如"苏州实时公交"小程序,选择的是苏州实时公交API. 苏州实时公交API文档:h ...

  4. ap接口 php_小白php API初体验 php api文档 php api接口开发 php web ap

    这里的php 写API其实就是指提供一个WebServiceWebSite : 1.以html格式响应返回 2.由用户通过浏览器来接入 WebService : 1.以json/Xml格式返回 2.由 ...

  5. Hadoop3——集群搭建以及初体验

    1. 匹配主机名 2.下载安装hadoop 3. 配置Hadoop环境 4. 启动Hadoop环境 5. Hadoop初体验 建议先整体浏览一遍再做 (关于创建虚拟机的操作日后有需要的话我再补上) 1 ...

  6. RocketMQ-初体验RocketMQ(01)_RocketMQ初体验

    文章目录 RocketMQ的由来 RocketMQ 版本 RocketMQ 基本概念 消息模型 消息生产者(producer) 消息消费者(Consumer) 主题(Topic) 代理服务器(Brok ...

  7. vue3.0 Composition API 上手初体验 使用 vue-router 构建多页面应用

    vue3.0 Composition API 上手初体验 使用 vue-router 构建多页面应用 前两讲,我们已经顺利的使用 vue3.0 将项目跑起来了.但是实在是过于简陋,目前我们几乎不可能开 ...

  8. ros 消息队列与缓冲区_Spring Boot消息队列系统:RocketMQ初入门

    前言 来啦老铁! 笔者学习Spring Boot有一段时间了,截至目前已实践.总结了26篇Spring Boot系列学习文章,感兴趣的同学可以关注专题一起学习吧! Spring Boot全家桶 在上一 ...

  9. 三、Hadoop系统应用之Hadoop集群测试及初体验(超详细步骤指导操作,WIN10,VMware Workstation 15.5 PRO,CentOS-6.7)

    Hadoop集群搭建前安装准备参考: 一.Hadoop系统应用之安装准备(一)(超详细步骤指导操作,WIN10,VMware Workstation 15.5 PRO,CentOS-6.7) 一.Ha ...

最新文章

  1. M_Map画南海水深地形图
  2. 代码的演化-DI(理解依赖注入di,控制反转ioc)
  3. 【k8s】kuboard获取token命令
  4. YBTOJ:魔法数字(数位dp)
  5. uva 1203—— Argus
  6. onenetsim定位功能吗_餐饮空间的设计原则和特点,你了解吗?
  7. autosar中bsw架构组成_AUTOSAR分层架构深度解析
  8. windows 2003活动目录更名操作[图]
  9. Android获取所在城市坐标及城市信息(逆地理位置编码)
  10. Android经典的大牛博客推荐(排名不分先后)!!
  11. linux下svn图形客户端,CentOS6.3下svn图形客户端SmartSVN安装
  12. 移动安全-IOS逆向第三天——实战HOOK RSA/DES加密
  13. sql中字符串转换成日期
  14. 新房怎么做到全屋网络覆盖?
  15. 华为杯山东理工大学第二届团体程序设计天梯赛
  16. Linux系统如何更新升级
  17. 【行为分析】(二)前端埋点实现及原理分析
  18. BAT超强IOS面试题116道,助你拿到高薪offer
  19. CTF封神台第三关通关
  20. 多妙招解除极域电子教室的控制权限

热门文章

  1. android 分段显示百分比,按百分比设置排名-Android DisplayMetrics
  2. 从一个提问引发到你是怎么看待编程语言是一种工具这句话的?【笔记自用】
  3. 真人秀制作网站_[BoA] 出道20周年真人秀Nobody Talks To BoA上演与李秀满总制作人充满默契的对话!...
  4. 异常检测-孤立森林(IsolationForest)
  5. 点云网络的论文理解(七)-Frustum PointNets for 3D Object Detection from RGB-D Data
  6. 错误记录 Could NOT find GTest (missing: GTEST_INCLUDE_DIR)
  7. tensorboardX笔记:理解graph
  8. 【数学建模】MATLAB应用实战系列(八十二)-【数学建模】非线性多元回归(附MATLAB代码)
  9. 编程语言中的Lambda 函数是如何产生的 它究竟有什么用
  10. MATLAB可视化实战系列(四十二)-图像特征提取-使用低秩 SVD 进行图像压缩实例