• JAVA操作rocketmq:

1.导入rocketmq所需要的依赖:

    <dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>3.0.10</version></dependency><dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-all</artifactId><version>3.0.10</version><type>pom</type></dependency>

2.创建生产者

package com.example.producer;import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("producer-group");producer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {// Thread.sleep(1000); // 每秒发送一次MQMessage msg = new Message("producer-topic", // topic 主题名称"msg", // pull 临时值 在消费者消费的时候 可以根据msg类型进行消费("pushmsg-" + i).getBytes()// body 内容
                );SendResult sendResult = producer.send(msg);System.out.println(sendResult.toString());}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}

3.创建消费者

package com.example.consumer;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");consumer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876");consumer.setInstanceName("consumer");consumer.subscribe("producer-topic", "msg");//此处是根据Message对象的参数来获取consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("消息id:"+msg.getMsgId() + "---" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

4.运行结果:

生产者运行结果:

消费者运行结果:

  • rocetmq幂等性问题:

在Activemq中 jms规范支持两种消息模型:点对点和发布订阅,在rocketmq中 有两种消费模式:广播消费,和集群消费。

在消费的过程中,如果消费者出现异常或者超时,导致mq没有及时的相应消费的状态,则可能让mq重试,重试机制就有可能导致出现幂等性,而rocketmq的幂等性 只会出现在集群消费(类似activemq中的点对点消息模型)

生产者:

package com.example.producer;import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("producer-group");producer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Message msg = new Message("topic", // topic 主题名称"msg", // pull 临时值 在消费者消费的时候 可以根据msg类型进行消费(i + "条消息").getBytes()// body 内容
                );SendResult sendResult = producer.send(msg);System.out.println(sendResult.toString());}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}

消费者:

package com.example.consumer;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");consumer.setInstanceName("consumer");consumer.subscribe("topic1", "msg");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));}// 超时的情况 或者程序异常int i = 2 / 0;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

消费结果:

消息id:C0A81FB100002A9F00000000000268EC---5条消息
消息id:C0A81FB100002A9F000000000002686E---4条消息
消息id:C0A81FA900002A9F0000000000037E6A---1条消息
消息id:C0A81FB100002A9F000000000002696A---6条消息
消息id:C0A81FB100002A9F00000000000269E8---7条消息
消息id:C0A81FA900002A9F0000000000038062---9条消息
消息id:C0A81FA900002A9F0000000000037EE8---2条消息
消息id:C0A81FA900002A9F0000000000037FE4---8条消息
消息id:C0A81FA900002A9F0000000000037F66---3条消息
消息id:C0A81FA900002A9F0000000000037DEC---0条消息
消息id:C0A81FA900002A9F0000000000038704---1条消息
消息id:C0A81FA900002A9F000000000003880C---9条消息
消息id:C0A81FA900002A9F0000000000038914---2条消息
消息id:C0A81FA900002A9F0000000000038A1C---0条消息
消息id:C0A81FA900002A9F0000000000038B24---3条消息
消息id:C0A81FA900002A9F0000000000038C2C---8条消息
消息id:C0A81FB100002A9F0000000000026E7E---4条消息
消息id:C0A81FB100002A9F0000000000026F86---7条消息
消息id:C0A81FB100002A9F0000000000027196---5条消息
消息id:C0A81FB100002A9F000000000002708E---6条消息

在Activimq中,可以通过消息id 来作为全局变量,检测是不是重复消费。但是在rocketmq中消费重试的结果中,任意选出两条相同的消息,可以看出 重试的时候消息id是不同的,此时在用消息id作为全局变量判断是否重复消费显然是不可能的。rocketmq中提供了一个消息的key,可以将业务id作为该key。例如:订单号什么的。可以将消息设置的key 在第一次消费的时候存放到数据库之中

幂等性消费者:

package com.example.consumer;import java.util.HashMap;
import java.util.List;
import java.util.Map;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static Map<String, String> map = new HashMap<String, String>();// 模拟内存,实际情况可以将key放在redis之中public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");consumer.setInstanceName("consumer");consumer.subscribe("topic1", "msg");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {if (!map.containsKey(msg.getKeys())) {// 如果此时的业务逻辑是将收到的消息存放到数据库System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));map.put(msg.getKeys(), new String(msg.getBody()));} else {System.out.println("重复消费");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}// 超时的情况 或者程序异常int i = 2 / 0;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

转载于:https://www.cnblogs.com/920913cheng/p/10730497.html

rocketmq(三 java操作rocket API, rocketmq 幂等性)相关推荐

  1. 转载:JAVA 操作 Ant API

    2019独角兽企业重金招聘Python工程师标准>>> Java调用Ant API用法  用法 API 调用 JAVA  Ant是Java程序员的一个好的工具,主要可以帮助程序员进行 ...

  2. JAVA操作HDFS API(hadoop)

    http://www.cnblogs.com/alisande/archive/2012/06/06/2537903.html HDFS API详解 Hadoop中关于文件操作类基本上全部是在&quo ...

  3. RocketMQ学习笔记(8)----RocketMQ的Producer API简介

    在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...

  4. Java 8 Stream Api 中的 skip 和 limit 操作

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 1. 前言 Java 8 Stream API 中的sk ...

  5. Java操作Excel三种方式POI、Hutool、EasyExcel

    Java操作Excel三种方式POI.Hutool.EasyExcel 1. Java操作Excel概述 1.1 Excel需求概述 1.2 Excel操作三种方式对比 2. ApachePOIExc ...

  6. Java 操作excel表格 - JXL(Java excel api)

    Java 操作excel表格 Java 操作 Excel 最常用的就是JXL(Java excel api)和POI,用起来挺简单的,不过相应的其功能也并非很强大,够用就行! 首先,下载jxl.jar ...

  7. java操作Excel有两种方式 方式1:jxl操作Excel jxl的API

    java操作Excel有两种方式 方式1:jxl操作Excel 方式2:poi操作Excel 下面介绍jxl API: 使用Windows操作系统的朋友对Excel(电子表格)一定不会陌生,但是要使用 ...

  8. rocketmq java例子_SpringBoot和RocketMQ的简单实例

    1,引用jar包 build.gradle文件添加jar包引用 compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-st ...

  9. Java操作mongoDB2.6的常见API使用方法

    对于mongoDB而言,学习方式和学习关系型数据库差不太多 開始都是学习怎样insert.find.update.remove,然后就是分页.排序.索引,再接着就是主从复制.副本集.分片等等 最后就是 ...

最新文章

  1. 使用 PEAR的Text_CAPTCHA保护Web表单[翻译]
  2. 如何在JavaScript中将字符串转换为布尔值?
  3. 使用自定义线程池处理并行数据库流
  4. Android耗电优化
  5. 设计资源类网站|日常必逛设计导航
  6. php环境模拟stphp_一个模拟浏览器请求的php类,模拟请求ua设置
  7. Android开发笔记(三十九)Activity的生命周期
  8. 解决iview中</Input>标签报错的方法
  9. Windows Workflow HOL学习笔记(七):添加一个验证来检查Email参数
  10. Mac OS 开启三指拖移,三指缩放,拖拽窗口,切换全面页面变成四指
  11. Android 自定义标尺滑动选择值
  12. 维基百科的语料库下载以及后续操作(一)2020年6月【包括opencc下载避雷,繁转简】
  13. ShareX+七牛云搭建博客图床
  14. 操作系统笔记整理12——磁盘存储器的管理
  15. 名帖236 俞和 行书《次韵韩伯清见寄之什凡五首》
  16. 求助!!!pycharm第一次安装后启动时报错怎么解决
  17. error: #268: declaration may not appear after executable statement in block
  18. swift学习笔记之navigationController的设置以及使用
  19. mysql命令创建用户_使用MySQL命令行新建用户并授予权限的方法
  20. 电子火折子的电路原理

热门文章

  1. form 表单提交,防止重复提交,加token
  2. 简单的鼠标可拖动div 兼容IE/FF
  3. HTML5实践之歌词同步播放器
  4. windows-服务端口
  5. 田志刚:智慧的员工,个人知识管理
  6. RichTextBox实现关键字自定义颜色显示(C#)
  7. 使用openpyxl去操作Excel表格
  8. Qt应用程序主窗口之一:主窗口框架
  9. C++实现 找出10000以内的完数
  10. Tomcat报错:Failed to start component [StandardEngine[Catalina].StandardHost[localhost]]