1)发送同步消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;/*** 发送同步消息*/
public class SyncProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.129:9876");//3.启动producerproducer.start();for (int i = 0; i < 10; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("base", "Tag1", ("Hello World" + i).getBytes());//5.发送消息SendResult result = producer.send(msg);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(1);}//6.关闭生产者producerproducer.shutdown();}
}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC2574563DB0000, offsetMsgId=C0A8568100002A9F0000000000057D64, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC2574567E80001, offsetMsgId=C0A8568100002A9F0000000000057E0D, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=0], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC257456BDB0002, offsetMsgId=C0A8568100002A9F0000000000057EB6, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=1], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC257456FC90003, offsetMsgId=C0A8568100002A9F0000000000057F5F, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=2], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC2574573B80004, offsetMsgId=C0A8568100002A9F0000000000058008, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=1]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC2574577A90005, offsetMsgId=C0A8568100002A9F00000000000580B1, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=0], queueOffset=1]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC257457B9C0006, offsetMsgId=C0A8568100002A9F000000000005815A, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=1], queueOffset=1]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC257457F890007, offsetMsgId=C0A8568100002A9F0000000000058203, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=2], queueOffset=1]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC2574583810008, offsetMsgId=C0A8568100002A9F00000000000582AC, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=2]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108546C18B4AAC2574587770009, offsetMsgId=C0A8568100002A9F0000000000058355, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=0], queueOffset=2]Process finished with exit code 0

2)发送异步消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;/*** 发送异步消息*/
public class AsyncProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.129:9876");//3.启动producerproducer.start();for (int i = 0; i < 10; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("base", "Tag2", ("Hello World" + i).getBytes());//5.发送异步消息producer.send(msg, new SendCallback() {/*** 发送成功回调函数* @param sendResult*/public void onSuccess(SendResult sendResult) {System.out.println("发送结果:" + sendResult);}/*** 发送失败回调函数* @param e*/public void onException(Throwable e) {System.out.println("发送异常:" + e);}});//线程睡1秒TimeUnit.SECONDS.sleep(1);}//6.关闭生产者producerproducer.shutdown();}
}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DD1090000, offsetMsgId=C0A8568100002A9F000000000005884E, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=4]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DD4060001, offsetMsgId=C0A8568100002A9F00000000000588F7, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=5]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DD7F70002, offsetMsgId=C0A8568100002A9F00000000000589A0, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=0], queueOffset=5]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DDBEE0003, offsetMsgId=C0A8568100002A9F0000000000058A49, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=0], queueOffset=6]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DDFDD0004, offsetMsgId=C0A8568100002A9F0000000000058AF2, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=1], queueOffset=4]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DE3D10005, offsetMsgId=C0A8568100002A9F0000000000058B9B, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=6]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DE7BF0006, offsetMsgId=C0A8568100002A9F0000000000058C44, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=7]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DEBA80007, offsetMsgId=C0A8568100002A9F0000000000058CED, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=1], queueOffset=5]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DEF930008, offsetMsgId=C0A8568100002A9F0000000000058D96, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=3], queueOffset=8]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80108089018B4AAC2574DF3800009, offsetMsgId=C0A8568100002A9F0000000000058E3F, messageQueue=MessageQueue [topic=base, brokerName=java, queueId=1], queueOffset=6]

3)单向发送消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;/*** 发送单向消息*/
public class OneWayProducer {public static void main(String[] args) throws Exception, MQBrokerException {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.129:9876");//3.启动producerproducer.start();for (int i = 0; i < 3; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("base", "Tag3", ("Hello World,单向消息" + i).getBytes());//5.发送单向消息producer.sendOneway(msg);//线程睡1秒TimeUnit.SECONDS.sleep(5);}//6.关闭生产者producerproducer.shutdown();}
}

4)消费消息基本流程

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 消息的接受者*/
public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.129:9876");//3.订阅主题Topic和Tagconsumer.subscribe("base", "Tag1");//设定消费模式:负载均衡|广播模式//consumer.setMessageModel(MessageModel.BROADCASTING);//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();}
}
consumeThread=ConsumeMessageThread_1,Hello World0
consumeThread=ConsumeMessageThread_2,Hello World1
consumeThread=ConsumeMessageThread_3,Hello World2
consumeThread=ConsumeMessageThread_4,Hello World3
consumeThread=ConsumeMessageThread_5,Hello World4
consumeThread=ConsumeMessageThread_6,Hello World5
consumeThread=ConsumeMessageThread_7,Hello World6
consumeThread=ConsumeMessageThread_8,Hello World7
consumeThread=ConsumeMessageThread_9,Hello World8
consumeThread=ConsumeMessageThread_10,Hello World9

5 消费消息

1)负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定Namesrv地址信息.consumer.setNamesrvAddr("localhost:9876");// 订阅Topicconsumer.subscribe("Test", "*");//负载均衡模式消费consumer.setMessageModel(MessageModel.CLUSTERING);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消息者consumer.start();System.out.printf("Consumer Started.%n");
}

2)广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定Namesrv地址信息.consumer.setNamesrvAddr("localhost:9876");// 订阅Topicconsumer.subscribe("Test", "*");//广播模式消费consumer.setMessageModel(MessageModel.BROADCASTING);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消息者consumer.start();System.out.printf("Consumer Started.%n");
}

顺序消息发送者

/**
* 订单的步骤
*/

import java.util.ArrayList;
import java.util.List;/*** 订单构建者*/
public class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List<OrderStep> buildOrders() {//  1039L   : 创建    付款 推送 完成//  1065L   : 创建   付款//  7235L   :创建    付款List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.132:9876");//3.启动producerproducer.start();//producer.setVipChannelEnabled(false);//构建消息集合List<OrderStep> orderSteps = OrderStep.buildOrders();//发送消息for (int i = 0; i < orderSteps.size(); i++) {String body = orderSteps.get(i) + "";Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());/*** 参数一:消息对象* 参数二:消息队列的选择器* 参数三:选择队列的业务标识(订单ID)*/SendResult sendResult = producer.send(message, new MessageQueueSelector() {/**** @param mqs:队列集合* @param msg:消息对象* @param arg:业务标识的参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {long orderId = (long) arg;long index = orderId % mqs.size();return mqs.get((int) index);}}, orderSteps.get(i).getOrderId());System.out.println("发送结果:" + sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.132:9876");//3.订阅主题Topic和Tagconsumer.subscribe("OrderTopic", "*");//4.注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//5.启动消费者consumer.start();System.out.println("消费者启动");}
}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAE70000, offsetMsgId=C0A8568400002A9F000000000005CAAE, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=21]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAEE0001, offsetMsgId=C0A8568400002A9F000000000005CB80, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=1], queueOffset=9]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAF00002, offsetMsgId=C0A8568400002A9F000000000005CC52, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=22]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAF30003, offsetMsgId=C0A8568400002A9F000000000005CD24, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=23]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAF50004, offsetMsgId=C0A8568400002A9F000000000005CDF6, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=1], queueOffset=10]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAF60005, offsetMsgId=C0A8568400002A9F000000000005CEC8, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=24]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAF80006, offsetMsgId=C0A8568400002A9F000000000005CF9A, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=1], queueOffset=11]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAF90007, offsetMsgId=C0A8568400002A9F000000000005D06C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=25]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAFB0008, offsetMsgId=C0A8568400002A9F000000000005D13E, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=26]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801066C6418B4AAC2031EBAFF0009, offsetMsgId=C0A8568400002A9F000000000005D210, messageQueue=MessageQueue [topic=OrderTopic, brokerName=java, queueId=3], queueOffset=27]Process finished with exit code 0
消费者启动
线程名称:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='创建'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=1039, desc='创建'}
线程名称:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='付款'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=1039, desc='付款'}
线程名称:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='完成'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='创建'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='付款'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=1039, desc='推送'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='完成'}
线程名称:【ConsumeMessageThread_2】:OrderStep{orderId=1039, desc='完成'}

延迟消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.132:9876");//3.启动producerproducer.start();for (int i = 0; i < 10; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());//设定延迟时间msg.setDelayTimeLevel(2);//5.发送消息SendResult result = producer.send(msg);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(1);}//6.关闭生产者producerproducer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.132:9876");//3.订阅主题Topic和Tagconsumer.subscribe("DelayTopic", "*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E5CA20000, offsetMsgId=C0A8568400002A9F000000000005D2E2, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=3], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E609C0001, offsetMsgId=C0A8568400002A9F000000000005D3C3, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=0], queueOffset=1]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E648E0002, offsetMsgId=C0A8568400002A9F000000000005D4A4, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=1], queueOffset=2]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E68840003, offsetMsgId=C0A8568400002A9F000000000005D585, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=2], queueOffset=3]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E6C770004, offsetMsgId=C0A8568400002A9F000000000005D666, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=3], queueOffset=4]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E706E0005, offsetMsgId=C0A8568400002A9F000000000005D81F, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=0], queueOffset=5]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E74640006, offsetMsgId=C0A8568400002A9F000000000005D9D8, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=1], queueOffset=6]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E78580007, offsetMsgId=C0A8568400002A9F000000000005DB91, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=2], queueOffset=7]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E7C440008, offsetMsgId=C0A8568400002A9F000000000005DD4A, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=3], queueOffset=8]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106691418B4AAC2032E80330009, offsetMsgId=C0A8568400002A9F000000000005DF03, messageQueue=MessageQueue [topic=DelayTopic, brokerName=java, queueId=0], queueOffset=9]Process finished with exit code 0
消费者启动
消息ID:【C0A80106691418B4AAC2032E68840003】,延迟时间:21647
消息ID:【C0A80106691418B4AAC2032E78580007】,延迟时间:17595
消息ID:【C0A80106691418B4AAC2032E609C0001】,延迟时间:26608
消息ID:【C0A80106691418B4AAC2032E706E0005】,延迟时间:22557
消息ID:【C0A80106691418B4AAC2032E648E0002】,延迟时间:25599
消息ID:【C0A80106691418B4AAC2032E5CA20000】,延迟时间:27618
消息ID:【C0A80106691418B4AAC2032E80330009】,延迟时间:18522
消息ID:【C0A80106691418B4AAC2032E6C770004】,延迟时间:23574
消息ID:【C0A80106691418B4AAC2032E74640006】,延迟时间:21546
消息ID:【C0A80106691418B4AAC2032E7C440008】,延迟时间:19529

批量消息发送

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.132:9876");//3.启动producerproducer.start();List<Message> msgs = new ArrayList<Message>();//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());msgs.add(msg1);msgs.add(msg2);msgs.add(msg3);//5.发送消息SendResult result = producer.send(msgs);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(1);//6.关闭生产者producerproducer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.132:9876");//3.订阅主题Topic和Tagconsumer.subscribe("BatchTopic", "*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}

发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A8010647B418B4AAC2033C13190000,C0A8010647B418B4AAC2033C13190001,C0A8010647B418B4AAC2033C13190002, offsetMsgId=C0A8568400002A9F000000000005E41C,C0A8568400002A9F000000000005E4CB,C0A8568400002A9F000000000005E57A, messageQueue=MessageQueue [topic=BatchTopic, brokerName=java, queueId=3], queueOffset=0]Process finished with exit code 0
消费者启动
consumeThread=ConsumeMessageThread_3,Hello World3
consumeThread=ConsumeMessageThread_2,Hello World2
consumeThread=ConsumeMessageThread_1,Hello World1

过滤消息的两种方式

Tag过滤
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.132:9876");//3.启动producerproducer.start();for (int i = 0; i < 3; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("FilterTagTopic", "Tag2", ("Hello World" + i).getBytes());//5.发送消息SendResult result = producer.send(msg);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(1);}//6.关闭生产者producerproducer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.132:9876");//3.订阅主题Topic和Tagconsumer.subscribe("FilterTagTopic", "Tag1 || Tag2 ");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106695818B4AAC2037F28920000, offsetMsgId=C0A8568400002A9F000000000005E629, messageQueue=MessageQueue [topic=FilterTagTopic, brokerName=java, queueId=2], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106695818B4AAC2037F2C8A0001, offsetMsgId=C0A8568400002A9F000000000005E6DC, messageQueue=MessageQueue [topic=FilterTagTopic, brokerName=java, queueId=3], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106695818B4AAC2037F307D0002, offsetMsgId=C0A8568400002A9F000000000005E78F, messageQueue=MessageQueue [topic=FilterTagTopic, brokerName=java, queueId=0], queueOffset=0]
消费者启动
consumeThread=ConsumeMessageThread_1,Hello World2
consumeThread=ConsumeMessageThread_2,Hello World1
consumeThread=ConsumeMessageThread_3,Hello World0
SQL语法过滤
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.132:9876");//3.启动producerproducer.start();for (int i = 0; i < 10; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes());msg.putUserProperty("i", String.valueOf(i));//5.发送消息SendResult result = producer.send(msg);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(2);}//6.关闭生产者producerproducer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.132:9876");//3.订阅主题Topic和Tagconsumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}

事务消息的实现

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.TimeUnit;/*** 发送同步消息*/
public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("group5");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.86.132:9876");//添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法中执行本地事务* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {if (StringUtils.equals("TAGA", msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("TAGB", msg.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else if (StringUtils.equals("TAGC", msg.getTags())) {return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** 该方法时MQ进行消息事务状态回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("消息的Tag:" + msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//3.启动producerproducer.start();String[] tags = {"TAGA", "TAGB", "TAGC"};for (int i = 0; i < 3; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());//5.发送消息SendResult result = producer.sendMessageInTransaction(msg, null);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(2);}//6.关闭生产者producer//producer.shutdown();}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 消息的接受者*/
public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.86.132:9876");//3.订阅主题Topic和Tagconsumer.subscribe("TransactionTopic", "*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("生产者启动");}
}
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106651818B4AAC203A813A50000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=java, queueId=1], queueOffset=0]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106651818B4AAC203A81B880001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=java, queueId=2], queueOffset=1]
发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80106651818B4AAC203A8235C0002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=java, queueId=3], queueOffset=2]
消息的Tag:TAGC
生产者启动
consumeThread=ConsumeMessageThread_1,Hello World2
consumeThread=ConsumeMessageThread_2,Hello World0

黑马程序员rocketmq第一章相关推荐

  1. 黑马程序员C++第一章

    第一章 1.书写Hello word 编写一个C++程序总共分为四个步骤: 创建项目 创建文件 编写代码 运行程序 2.注释-单行注释和多行注释 作用:在代码中多加一行说明和注释,方便自己或其他程序员 ...

  2. 软考 程序员教程-第一章 计算机系统基础知识

    软考 程序员教程-第一章 计算机系统基础知识 为了督促自己学习,告别懒惰,在此先给自己定个小目标,请大家监督哟! 目标:一个月内过一遍<程序员教程>,下一个月开始上真题. 简单看了下,我在 ...

  3. HTML+CSS+JavaScript网页制作案例教程-黑马程序员-第五章课后习题(课程介绍专栏效果)

    黑马程序员编著的教材  HTML+CSS+JavaScript网页制作案例教程 第五章:"课程介绍"专栏-课后习题参考代码 题目原型: 请结合给出的素材,运用列表标记,超链接标记以 ...

  4. HTML+CSS+JavaScript网页制作案例教程-黑马程序员-第四章课后习题(播放器图标)

    黑马程序员编著的教材  HTML+CSS+JavaScript网页制作案例教程 第四章:播放器图标-课后习题参考代码 ........ 记得 关注,收藏,评论哦,作者将持续更新.... 我自己做的效果 ...

  5. 黑马程序员C++ 第一阶段 C++基础语法入门

    B站黑马C++内容,自己手动敲了一遍代码,文章之间也加入了一些我个人的理解,仅供学习和参考用,程序代码均来自黑马程序员 . 一.C++基础入门 1.1第一个c++程序 (输出一个hello world ...

  6. 【传智播客】Javaweb程序设计任务教程 黑马程序员 第六章 课后答案

    所有章节答案合集-->传送门 [测一测] 学习完前面的内容,下面来动手测一测吧,请思考以下问题: 1.请简述JSP的运行原理. 1.请简述JSP中的9个隐式对象. 2.请使用include标签编 ...

  7. 【传智播客】Javaweb程序设计任务教程 黑马程序员 第五章 课后答案

    所有章节答案合集-->传送门 [测一测] 学习完前面的内容,下面来动手测一测吧,请思考以下问题: 1.简述什么是会话技术? 2.简述Cookie与Session的区别?(至少写出3点) 3.请设 ...

  8. 【传智播客】Javaweb程序设计任务教程 黑马程序员 第四章 课后答案

    所有章节答案合集-->传送门 [测一测] 学习完前面的内容,下面来动手测一测吧,请思考以下问题: 1.简述请求转发与重定向的异同?(至少写3点) 2.请写出禁止浏览器缓存页面的核心代码. 3.请 ...

  9. 【传智播客】Javaweb程序设计任务教程 黑马程序员 第三章 课后答案

    所有章节答案合集-->传送门 [测一测] 学习完前面的内容,下面来动手测一测吧,请思考以下问题: 1.请列举Servlet接口中的方法,并分别说明这些方法的特点及其作用. 2.简述Servlet ...

最新文章

  1. JVM调优实战:G1中的to-space exhausted问题
  2. Py之textrank4zh:textrank4zh的简介、安装、使用方法之详细攻略
  3. SSM三大框架整合Springfox(Swagger2)详细解决方案
  4. no need for pictures
  5. ELK+Kafka 企业日志收集平台(二)这是原版
  6. python网络编程——实现简单聊天
  7. linux 核显驱动程序,支持下代核显 Intel放出Linux图形驱动
  8. leetcode417. 太平洋大西洋水流问题(bfs)
  9. 开博啦——半路出家做运维以来的一些杂感
  10. PHP常用函数大全500+
  11. 已知函数ex可以展开为幂级数。现给定一个实数x,要求利用此幂级数部分和求ex的近似值,求和一直继续到最后一项的绝对值小于0.00001。
  12. .json格式转为.yml格式
  13. 平面设计图文混排要怎么做
  14. 双绞线连接布线方案(计算机网络)
  15. iOS_44_导入第3方APP内的文件_UTI
  16. java方向考什么证_java认证证书两个工作方向
  17. 【中科院】分子生物学-朱玉贤第四版-笔记-第11-12讲 基因功能研究技术
  18. appinventor飞机大战案例_第一个AppInventor 开发案例 Hello Kitty(下)
  19. Stata分位数回归I:理解边际效应和条件边际效应
  20. List循环中指定删除元素(不止一个)

热门文章

  1. 基于MATLAB的线性规划解决方法——单纯形法
  2. 单源最短路径算法java_数据结构 - 单源最短路径之迪杰斯特拉(Dijkstra)算法详解(Java)...
  3. wget linux
  4. Gunicorn+django部署
  5. xshell / xftp个人免费版 seafile个人云盘下载
  6. k8s1.18 StorageClass 使用rbd-provisioner提供ceph rbd持久化存储
  7. 六十星系之43紫微天相坐辰戌
  8. Linux命令find -perm使用方法
  9. 手机基站伪装潜伏引强烈关注
  10. 【C++】Placement New