docker下安装单例RocketMq和集群

暂时之后再进行更新

rocket 各个角色分工

  1. Producer:消息的发送者;举例:发信者

  2. Consumer:消息接收者;举例:收信者

  3. Broker:暂存和传输消息;举例:邮局

  4. NameServer:管理Broker;举例:各个邮局的管理机构

  5. Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
    类似于 rabbitmq 的 路由键 ( topic交换机 )

  6. Message Queue:相当于是Topic的分区;用于并行发送和接收消息

消息发送和接收

主题包含多个标签,例如主题春节,标签可以是放鞭炮,团年饭
同一个消费组,给不同的消费者设置不同的tag时,后启动的消费者会覆盖先启动的消费者设置的tag。生产消息
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息 (这里可以使用发送异步消息 )
6.关闭生产者producer消费消息
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
ps: 这里的 消费者和rabbitmq 有很大的不同的, rabbitmq 是从队列中取出消费消费 , 这里是从订阅主题消费消息 ,和redis 消费消息的方式很像

下面的 案例 分组可以不同 “g1”组的消费者(DefaultMQPushConsumer) 可以接收 “g2”组的生产者发送的 消息 , tag和topic 必须相同才能接收消息 指定唯一消费路径

发送同步消息

//发送同步消息
public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("g1");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);// 启动Producer实例producer.start();for (int i = 0; i < 10; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.println("%s%n"+ sendResult);System.out.println("发送状态:"+sendResult.getSendStatus());System.out.println("发送消息的id:"+sendResult.getMsgId());System.out.println("消息队列的id: "+sendResult.getMessageQueue().getQueueId());Thread.sleep(1000L);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

发送异步消息

这里要注意 ,需要防止主线程先关闭 导致Topic 被删除,消息找不到topic 发送失败咯~~

    public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("g2");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 10; i++) {final int index = i;// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 防止主线程先关闭!!!Thread.sleep(60000);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}

发送单项消息

单向消息 是不需要关心结果的消息 ,例如日志收集


public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化生产者DefaultMQProducer producer = new DefaultMQProducer("g3");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);producer.start();for (int i = 0; i < 10; i++) {Message message = new Message("topic", "hi~~ one way message".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);}producer.shutdown();}
}

轮询接收消息

这里的

     --------   消费者1 /** 1.创建消费者Consumer,制定消费者组名2.指定Nameserver地址3.订阅主题Topic和Tag4.设置回调函数,处理消息5.启动消费者consumer* */public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");// 设置NameServer的地址consumer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时consumer.setConsumeTimeout(1000000);// 消费 tagconsumer.subscribe("topic","tag");// 设置回调函数 用来接收消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(consumeConcurrentlyContext);//返回消费 成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();  }------------  消费者2public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");// 设置NameServer的地址consumer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时consumer.setConsumeTimeout(1000000);// 消费 tagconsumer.subscribe("TopicTest","TagA");// 设置回调函数 用来接收消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {log.info("我是消费者2 ,我消费了消息:{}",messageExt.getMsgId());byte[] body = messageExt.getBody();System.out.println(new String(body));}System.out.println(consumeConcurrentlyContext);//返回消费 成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(" ");}

topic 和tag 相同 这里默认是轮询的接收消息 消费者1 接收1 3 5 ,消费者2 就接收2 4 6 …

广播模式接收消息


@Slf4j
public class Consumer02 {/** 1.创建消费者Consumer,制定消费者组名2.指定Nameserver地址3.订阅主题Topic和Tag4.设置回调函数,处理消息5.启动消费者consumer* */public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");// 设置NameServer的地址consumer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时consumer.setConsumeTimeout(1000000);//设置广播模式consumer.setMessageModel(MessageModel.BROADCASTING);// 消费 tagconsumer.subscribe("TopicTest","TagA");// 设置回调函数 用来接收消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {log.info("我是消费者2 ,我消费了消息:{}",messageExt.getMsgId());byte[] body = messageExt.getBody();System.out.println(new String(body));}System.out.println(consumeConcurrentlyContext);//返回消费 成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(" ");}}

这里十条消息 有两个消费者 消费者1 (默认消息接收方式(默认轮询)). 消费者2(广播接收消息模式)
消费者1:接收到了 8 4 6 5 9 0 7
消费者2 :接收了 1–10 全部消息

继续实验
这里有3个消费者
消费者1 消费者3 是默认轮询模式
消费者2 是广播模式
消费者1 3 平分了消息
消费者2 接收了所有的消息

这里出现了个小bug,,,, 消费者1 和消费者2 加起来一共才消费了8 条消息 , 消费者3接收了10条消息
第二次实验 消息9 不见了…

实验3 ,关闭广播模式 的消费者 ,两个消费者不仅能正常监听到消息,还能把之前未监听到的消息 重新监听到…

总结:当 轮询消费者和 广播消费者 混一起监听同一个topic 下的 tag的时候 ,广播模式的消费者可以监听全部的消息 ,
所有 轮询模式消费者 加起来还是会有几条消息没有监听到 ,,,所以 rabbitmq 中的手动ack加消息回退 是多么的重要!!!(题外话)

局部消息顺序

原始 , 消息的发送者会把消息 发送到各个不同的队列 ,消费者也是 同时监听所有队列
局部消息顺序 ,将 具有顺序性的消息发送给一个队列 , 消费者专门开辟一条线程监听那个队列 ,这样就能实现消息的顺序性
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

// 消息的生产者
// 按照订单号 的不同 订单消息进入不同的 消息队列 (类比一下 哈希取余 )public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException,InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("g2");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);// 启动Producrdeproducer.start();//发送消息List<OrderStep> orderSteps = OrderStep.buildOrders();//构建消息集合  根据订单id 有选择的去发for (int i=0;i<orderSteps.size();i++) {byte[] bytes = JSON.toJSONString(orderSteps.get(i)).getBytes(StandardCharsets.UTF_8);Message message = new Message("orderTopic","order","i:"+i,bytes);//new MessageQueueSelector() :消息队列的选择器 ,根据消息的业务标识 实现方法,这里只要订单id 一样,进入的队列就是一样的// orderId: 业务标识producer.send(message, new MessageQueueSelector() {/** list:队列集合* message:消息对象* arg :业务标识参数 === orderSteps.get(i).getOrderId()* */// 选择队列的方法@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object arg) {long orderId = (long) arg; // 订单idlong index = orderId % list.size();// 消息被发送的队列return list.get((int) index);}},orderSteps.get(i).getOrderId());}producer.shutdown();}// 消费者
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");// 设置NameServer的地址consumer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时consumer.setConsumeTimeout(1000000);// 消费 tagconsumer.subscribe("orderTopic","*");//注册监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println(" ThreadName:"+Thread.currentThread().getName()+" QueueID:"+msg.getQueueId()+",获得的消息是:"+new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("消费者启动");}
}

打印的结果 (我 区分了一下)

 ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"创建","orderId":1039}ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"付款","orderId":1039}ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"推送","orderId":1039}ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"完成","orderId":1039}ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"创建","orderId":7235}ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"付款","orderId":7235}ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"完成","orderId":7235}ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"创建","orderId":1065}ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"付款","orderId":1065}ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"完成","orderId":1065}

可以很清楚的看到 线程id 和消息队列id 是一一对应的… 不会出现 一个消息队列里的消息同时被两条线程消费 ,这样打印出来的消息就是顺序的了

延迟消息

message.setDelayTimeLevel(2); 消息延迟两秒
消息会在消息队列延迟2秒
这里可以做成定时任务 ,
实验了一下 ,,这里不能做消息队列
我 模拟了一百条消息 , 每条消息都设置了随机 的延迟时间,并没有发现 延迟时间短的先出来 ,接收消息都是随机的,乱序的
总之一句话 和rabbitmq 差别很大…

批量消息发送

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {producer.send(messages);
} catch (Exception e) {e.printStackTrace();//处理error
}

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
解决方案

public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加日志的开销20字节if (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error}

消息过滤

利用tag实施过滤

// 消息发送者public static void main(String[] args) throws Exception{// 实例化生产者DefaultMQProducer producer = new DefaultMQProducer("g3");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);producer.start();String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagB", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagC", "OrderID003", "Hello world 2".getBytes()));producer.send(messages);producer.shutdown();}// 消息接收者 public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");// 设置NameServer的地址consumer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时consumer.setConsumeTimeout(1000000);// 消费 tag  TagA||TagB :只消费 这两种tag , * 消费BatchTest下全部的tagconsumer.subscribe("BatchTest","TagA||TagB");// 设置回调函数 用来接收消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {log.info("我消费了消息:{},消息延迟了:{},消息队列是:{},消息TAG是:{}",messageExt.getMsgId(),System.currentTimeMillis()-messageExt.getStoreTimestamp(),messageExt.getQueueId(),messageExt.getTags().toString());byte[] body = messageExt.getBody();System.out.println(new String(body));}System.out.println(consumeConcurrentlyContext);//返回消费 成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(" ");}

利用SQL 进行过滤

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
需要更改 enablePropertyFilter 属性
sql 过滤失效


这样就好了

消费者 和 生产者

public static void main(String[] args) throws Exception{// 实例化生产者DefaultMQProducer producer = new DefaultMQProducer("g1");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);producer.start();for (int i = 0; i < 20; i++) {Random random = new Random();Message message = new Message("topic1", "TagA","hi~~ one way message".getBytes(StandardCharsets.UTF_8));// putUserProperty : 便于 消费者使用sql 语句进行 过滤 ~~message.putUserProperty("i", String.valueOf(i));producer.send(message);}producer.shutdown();}public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");// 设置NameServer的地址consumer.setNamesrvAddr("116.205.161.47:9876");//增大超时时间,防止超时consumer.setConsumeTimeout(1000000);//MessageSelector messageSelector = MessageSelector.bySql("i > 5");consumer.subscribe("topic1",messageSelector);// 设置回调函数 用来接收消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {log.info("我消费了消息:{},消息队列是:{},消息TAG是:{},消息的properties是:{}",messageExt.getMsgId(),messageExt.getTags().toString(),messageExt.getProperties().toString());byte[] body = messageExt.getBody();System.out.println(new String(body));}System.out.println(consumeConcurrentlyContext);//返回消费 成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(" ");}

执行结果

6:44:52.956 [main] INFO com.example.rocketmq.Consumer -
16:50:06.561 [ConsumeMessageThread_1] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24DF50006,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=32, i=6, CONSUME_START_TIME=1661158204994, UNIQ_KEY=C0A82BF2316818B4AAC26FC24DF50006, WAIT=true, TAGS=TagA},消息的properties是:{}
16:50:06.561 [ConsumeMessageThread_11] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC250560010,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=35, i=16, CONSUME_START_TIME=1661158205592, UNIQ_KEY=C0A82BF2316818B4AAC26FC250560010, WAIT=true, TAGS=TagA},消息的properties是:{}
16:50:06.561 [ConsumeMessageThread_9] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24FEC000E,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=34, i=14, CONSUME_START_TIME=1661158205468, UNIQ_KEY=C0A82BF2316818B4AAC26FC24FEC000E, WAIT=true, TAGS=TagA},消息的properties是:{}
16:50:06.562 [ConsumeMessageThread_8] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24FBB000D,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=34, i=13, CONSUME_START_TIME=1661158205421, UNIQ_KEY=C0A82BF2316818B4AAC26FC24FBB000D, WAIT=true, TAGS=TagA},消息的properties是:{}
hi~~ one way message
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext@17042905
hi~~ one way message

事务消息

(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

当消息发送之后如果没有被 提交 消息是不能 被 消费者消费的!!
executeLocalTransaction() 给消息设置是否能提交
checkLocalTransaction() 回查

* TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
* TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
* TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

生产者

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;import java.nio.charset.StandardCharsets;
import java.util.Random;public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化生产者TransactionMQProducer producer = new TransactionMQProducer("g1");// 设置NameServer的地址producer.setNamesrvAddr("116.205.161.47:9876");//设置监听者producer.setTransactionListener(new TransactionListener() {// 当消息被 mq接收到了 会触发该方法 ,对消息进行处理  COMMIT_MESSAGE(提交) ROLLBACK_MESSAGE(丢丢)UNKNOW(被checkLocalTransaction()处理)//该方法中执行本地事务 ROLLBACK_MESSAGE:消息会被丢掉@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {if (message.getTags().equals("A")){return LocalTransactionState.COMMIT_MESSAGE;}else if (message.getTags().equals("B")){return LocalTransactionState.ROLLBACK_MESSAGE;}else if (message.getTags().equals("C")){return LocalTransactionState.UNKNOW ;}else {return LocalTransactionState.UNKNOW ;}}//该方法 对事务状态的回查// unknow 的消息会被 checkLocalTransaction() 处理@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("消息Tag:"+messageExt.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//增大超时时间,防止超时producer.setSendMsgTimeout(1000000);producer.start();String[] tag = {"A","B","C"} ;for (int i = 0; i < 3; i++) {Message message = new Message("topic1", tag[i],"hi~~ one way message".getBytes(StandardCharsets.UTF_8));TransactionSendResult result = producer.sendMessageInTransaction(message, null);System.out.println("发送结果是: "+result.getSendStatus());}// producer.shutdown();}
}
  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

rocketmq-spring-boot-starter使用

导入

   <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.1</version></dependency>

@Testvoid contextLoads() {rocketMQTemplate.convertAndSend("springtest","hello Rocket");}@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "my-group", topic = "springtest")
public class Mqlistener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {log.info("接收到的消息是:{}",s);}
}

消息的存储和发送

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
```![在这里插入图片描述](https://img-blog.csdnimg.cn/7b0e0978ab8346eeb0ba5d450b8fadbd.png)# 消息重试
>对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。```c
# 异常重试
public class MessageListenerImpl implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {//处理消息doConsumeMessage(message);//方式1:返回 Action.ReconsumeLater,消息将重试return Action.ReconsumeLater;//方式2:返回 null,消息将重试return null;//方式3:直接抛出异常, 消息将重试throw new RuntimeException("Consumer Message exceotion");}
}# 异常不重试
public class MessageListenerImpl implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {try {doConsumeMessage(message);} catch (Throwable e) {//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;return Action.CommitMessage;}//消息处理正常,直接返回 Action.CommitMessage;return Action.CommitMessage;}
}

自定义消息重试次数

  • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

消费幂

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置, key存入 数据库 来搞幂等性:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
consumer.subscribe("ons_test", "*", new MessageListener() {public Action consume(Message message, ConsumeContext context) {String key = message.getKey()// 根据业务唯一标识的 key 做幂等处理}
});

RocketMq最强总结 带你rocket从入门到入土为安相关推荐

  1. 强哥带你零基础学java-03运行第一个java程序

    java是什么,java是一种编程语言的名字,在生活中,我们知道的有汉语,英语.运用汉语语法,我们可以写一篇文章.同样的道理,用java的语法,我们就可以写程序了. 那么现在的问题是,要如何来写呢?现 ...

  2. python带我起飞_Python带我起飞:入门、进阶、商业实战

    <Python带我起飞:入门.进阶.商业实战>针对Python3.5以上版本,采用"理论+实践"的形式编写,通过大量的实例(共42个),全面而深入地讲解"Py ...

  3. 初二上册计算机编程入门先学什么,8年级以上学生必读,这项AP课程带你零基础入门编程!...

    原标题:8年级以上学生必读,这项AP课程带你零基础入门编程! 导读 作为全北美通行的预科课程,AP考试的分数不仅能够换取大学学分,还能对申请有超级重要的加分作用.对于中国学生而言,AP计算机科学(以下 ...

  4. 视频教程-赵强老师:大数据从入门到精通(6)MapReduce-Hadoop

    赵强老师:大数据从入门到精通(6)MapReduce 毕业于清华大学,拥有超过13年的工作经验. Oracle认证讲师,拥有6年以上授课经验.精通Oracle数据库.中间(Weblogic)和大数据H ...

  5. 视频教程-赵强老师:大数据从入门到精通(7)HBase-Hbase

    赵强老师:大数据从入门到精通(7)HBase 毕业于清华大学,拥有超过13年的工作经验. Oracle认证讲师,拥有6年以上授课经验.精通Oracle数据库.中间(Weblogic)和大数据Hadoo ...

  6. 视频教程-赵强老师:大数据从入门到精通(1)Linux基础-Linux

    赵强老师:大数据从入门到精通(1)Linux基础 毕业于清华大学,拥有超过13年的工作经验. Oracle认证讲师,拥有6年以上授课经验.精通Oracle数据库.中间(Weblogic)和大数据Had ...

  7. 视频教程-赵强老师:大数据从入门到精通(15)Storm-大数据

    赵强老师:大数据从入门到精通(15)Storm 毕业于清华大学,拥有超过13年的工作经验. Oracle认证讲师,拥有6年以上授课经验.精通Oracle数据库.中间(Weblogic)和大数据Hado ...

  8. 视频教程-赵强老师:大数据从入门到精通(23)配置Hive On Spark-Spark

    赵强老师:大数据从入门到精通(23)配置Hive On Spark 毕业于清华大学,拥有超过13年的工作经验. Oracle认证讲师,拥有6年以上授课经验.精通Oracle数据库.中间(Weblogi ...

  9. 视频教程-赵强老师:大数据从入门到精通(12)集群HA-Hadoop

    赵强老师:大数据从入门到精通(12)集群HA 毕业于清华大学,拥有超过13年的工作经验. Oracle认证讲师,拥有6年以上授课经验.精通Oracle数据库.中间(Weblogic)和大数据Hadoo ...

最新文章

  1. 人工智能值得关注的技术研究方向
  2. DELPHI FMX 获取系统版本 ANDROID IOS通用
  3. mongodb 导出txt_(干货)前端实现导出excel的功能
  4. 在windows server 2003下如何了启动远程管理(html)
  5. java之接口interface
  6. 每天一道剑指offer-二叉搜索数的后序遍历序列
  7. 智能优化算法:闪电连接过程算法 - 附代码
  8. 使用putty上传文件到linux
  9. 使用Gpu恢复7z密码
  10. FPGA写约束文件+固化+上板抓信号方法(vivado软件)
  11. vue使用高德地图web端JSAPI 路线规划、搜索提示教程
  12. 【微信小程序】农历公历互相转换
  13. 串口接收完整一帧数据包的3种方法
  14. 场地预约管理微信小程序开发过程中的错误记录
  15. 劳务派遣经营许可证怎样办理
  16. pyqt QLineEdit 详细用法
  17. 服务器微信支付接口笔记-(与app端对接)
  18. 重叠社区发现-LFM算法
  19. windows无法连接到打印机_打印机无法连接怎么办
  20. 929. 独特的电子邮件地址(简单,字符串)(12.20)

热门文章

  1. w7计算机虚拟内存设置,win7虚拟内存怎么设置最好?win7虚拟内存设置方法
  2. Android 调节屏幕亮度(当前应用和系统亮度)
  3. Effective C++ T23:宁以non-member、non-friend替换member函数
  4. python模型预测控制_【模型工具】耦合python和 SWMM的城市排水系统模型预测算法...
  5. php 同步微信大量粉丝在数据表,微粉丝—— 微信加粉统计系统/复制统计准确率90%以上...
  6. 好多粉如何统计微信加粉复制次数?
  7. 微信h5页面提交表单后返回键重复提交的问题
  8. FPGA能做什么?比单片机厉害吗?
  9. jsp自定义标签的问题Unable to load tag handler class
  10. nova-week2