文章目录

  • 概述
  • springboot整合接入
    • mq消费者
    • mq 生产者

概述

消息队列 RocketMQ 是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双 11 使用的核心产品

阿里云官方接入文档:https://help.aliyun.com/document_detail/29553.html?spm=a2c4g.11186623.6.570.632b7059ZI0shf

springboot整合接入

pom文件

<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.0.Final</version></dependency>

application.yml

mq:consumerId: @ons.consumerId@accessKeyId: @ons.accessKeyId@accessKeySecret: @ons.accessKeySecret@onsAddr: @ons.onsAddr@automaticPackagingTopic: @ons.automaticPackagingTopic@maxReconsumeTimes: @ons.maxReconsumeTimes@

不同的环境中对应不同的properties文件

mq.consumerId=xxx
mq.accessKeyId=xxx
mq.accessKeySecret=xxx
mq.onsAddr=xxx
mq.automaticPackagingTopic=xxx
mq.maxReconsumeTimes=3

mq消费者

@Component
public class MessageConsumer {@Value("${mq.consumerId}")private String consumerId;@Value("${mq.accessKeyId}")private String accessKey;@Value("${mq.accessKeySecret}")private String secretKey;@Value("${mq.onsAddr}")private String onsAddr;@Value("${mq.automaticPackagingTopic}")private String topic;@Value("${mq.maxReconsumeTimes}")private String maxReconsumeTimes;private Consumer consumer;private final Map<String, MessageListener> listenerMap =new ConcurrentHashMap<>();@Autowiredprivate MessageConsumer(Map<String, MessageListener> listenerMap){this.listenerMap.clear();listenerMap.forEach((k,v)->this.listenerMap.put(v.getType(),v));}@PostConstructpublic void init(){LogFactory.mqlog.info("Consumer 开始启动...");Properties properties = new Properties();properties.put(PropertyKeyConst.GROUP_ID, consumerId);properties.put(PropertyKeyConst.AccessKey, accessKey);properties.put(PropertyKeyConst.SecretKey, secretKey);properties.put(PropertyKeyConst.NAMESRV_ADDR, onsAddr);properties.put(PropertyKeyConst.MaxReconsumeTimes, maxReconsumeTimes);consumer = ONSFactory.createConsumer(properties);consumer.subscribe(topic,"*", (message,context)->listenerMap.get(message.getTag()).consume(message,context));consumer.start();LogFactory.mqlog.info("Consumer 启动完成");}}

例如消费两种消息

public interface MessageListener {/*** 获取listener类型* @return*/String getType();/***  消费消息* @param message* @param context* @return*/Action consume(final Message message, final ConsumeContext context);
}

一类接收足球消息

@Component
public class FootBallListener implements MessageListener {private static final String TAG_FootBall = "TAG_FootBall";@Overridepublic Action consume(Message message, ConsumeContext context) {LogFactory.mqlog.info("receive :" + message.toString());try {// do what u should doLogFactory.mqlog.info("consume success");return Action.CommitMessage;} catch (Exception e) {LogFactory.mqlog.error("consume fail:" + e);return Action.ReconsumeLater;}}@Overridepublic String getType() {return TAG_FootBall;}
}

接收乒乓球消息

@Component
public class PingPangListener implements MessageListener {public static final String TAG_PingPang = "TAG_PingPang";@Overridepublic Action consume(Message message, ConsumeContext context) {LogFactory.jvopfLog.info("receive:" + message.toString());try {//你的业务代码if(true){LogFactory.mqlog.info("consume Success");return Action.CommitMessage;}else {//比较重要的消息,失败后重试LogFactory.mqlog.info("consume Fail");return Action.ReconsumeLater;}}catch (Exception e){LogFactory.mqlog.error("consume Exception:" + e);return Action.ReconsumeLater;}}@Overridepublic String getType() {return TAG_PingPang;}
}

mq 生产者

摘自 : https://help.aliyun.com/document_detail/29553.html?spm=a2c4g.11186623.6.570.dc4d2e77x4Y8BP

 package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring {public static void main(String[] args) {/*** 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中*/ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");Producer producer = (Producer) context.getBean("producer");//循环发送消息for (int i = 0; i < 100; i++) {Message msg = new Message( //// Message 所属的 Topic"TopicTestMQ",// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤"TagA",// Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式"Hello MQ".getBytes());// 设置代表消息的业务关键属性,请尽可能全局唯一// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发// 注意:不设置也不会影响消息正常收发msg.setKey("ORDERID_100");// 发送消息,只要不抛异常就是成功try {SendResult sendResult = producer.send(msg);assert sendResult != null;System.out.println("send success: " + sendResult.getMessageId());}catch (ONSClientException e) {System.out.println("发送失败");}}}}

阿里云ONS消息队列入门指南相关推荐

  1. 【结果很简单,过程很艰辛】记阿里云Ons消息队列服务.NET接口填坑过程

    Maybe 这个问题很简单,因为解决方法是非常简单,但填坑过程会把人逼疯,在阿里云ONS工作人员.同事和朋友的协助下,经过一天的调试和瞎捣鼓,终于解决了这个坑,把问题记下来,也许更多人在碰到类似问题的 ...

  2. C#实现阿里云微消息队列LMQ

    本文属于个人原创作品.个人总结,谢绝转载.抄袭.如果您有疑问或者希望沟通交流,可以联系QQ:865562060. 一.简介 MQ 微消息队列(Light Message Queue,简称 LMQ): ...

  3. 阿里云微消息队列 MQTT

    前言 因为工作上的事情比较繁忙,近期的博客更新率已经创了新低,所以想着把一些工作上的调研笔记

  4. springboot集成阿里ons消息队列发布订阅消息功能

    此处的项目是springboot项目.使用队列的产品是阿里云ons 消息队列 阿里云的ons消息队列是基于rockermq 项目环境.jdk1.8 使用阿里ons开发的api接口实现发布定于功能生产和 ...

  5. 阿里云ONS而微软Azure Service Bus体系结构和功能比较

    阿里云ONS而微软Azure Service bus体系结构和功能比较 版权所有所有,转载请注明出处http://blog.csdn.net/yangzhenping.谢谢! 阿里云的开放消息服务: ...

  6. 项目分布式部署那些事(1):ONS消息队列、基于Redis的Session共享,开源共享

    因业务发展需要现在的系统不足以支撑现在的用户量,于是我们在一周之前着手项目的性能优化与分布式部署的相关动作. 概况 现在的系统是基于RabbitHub(一套开源的开发时框架)和Rabbit.WeiXi ...

  7. JAVA整合阿里云ONS(RocketMQ)

    前言 关于阿里云ONS我这里不多介绍,用的话直接去看官网,这里提几个实际对接的问题 问题一: TCP版的不支持本地调试,只支持公网链接 问题二: ONS不支持批量消息 问题三: ONS延迟消息的时间是 ...

  8. 阿里云机器学习PAI-快速上手指南

    阿里云机器学习PAI-快速上手指南 What is 机器学习 机器学习指的是机器通过统计学算法,对大量的历史数据进行学习从而生成经验模型,利用经验模型指导业务.目前机器学习主要在以下一些方面发挥作用: ...

  9. 千亿级金融场景下,基于Pulsar的云原生消息队列有怎样的表现?

    导语 | 云原生场景,多语言.多种协议兼容,任意多的消息 Topic.任意多的消费者,性能的按需快速扩展成为消息队列基本的要求.本文是对腾讯TEG技术委员会专家工程师刘德志老师在云+社区沙龙 onli ...

最新文章

  1. 深度解析ASP.NET2.0中的Callback机制
  2. Python中的正则
  3. mysql备份工具xtr_mysql-xtrbackup备份与恢复
  4. 学python五大理由_学习Python的五大理由
  5. 二、Java 面向对象高级——Collection、泛型
  6. 在Java中调用Python,java面试题,java初级笔试题
  7. jacket for matlab,打印本页 - 在联想系统上使用Jacket For Matlab
  8. 香肠派对电脑版_6款好玩的吃鸡小游戏,和平精英、香肠派对、迷你攻势、、、...
  9. 语文招教考试-古今中外神话故事汇总,教育心理学知识点
  10. Win8.1激活方法
  11. 大话开发板技术支持——在www.ouravr.com上看到一个老兄对一个开发板淘宝代理提出控诉之后...
  12. 销售管理系统er图_这套电商订单管理系统,90%电商玩家都受用
  13. 做PPT使用的矢量图标网站
  14. 吴晓慧讲述:“随手记安全吗”网贷平台“出清”利好行业发展
  15. Windows安全中心打开空白
  16. 【线性代数】6-6:相似矩阵(Similar Matrices)
  17. 洛谷P2664 树上游戏 【点分治 + 差分】
  18. 【FinE】FamaFrench 5 Factors asset pricing Model(FF五因子模型)
  19. 实时操作系统---任务管理
  20. 扫描电镜下的人体感官结构,超震撼

热门文章

  1. Centos7.9最小化安装与初始化环境配置
  2. 探秘谷歌地球,它如何绘制全球98%的地图?
  3. [益智]:3个女儿的年龄
  4. css渐变写法 从左到右颜色渐变
  5. 音频质量的评价方法:简单梳理
  6. 学习在kvm上创建vtpm
  7. 百万投资血本无归,细数外汇资金盘《云腾科技》的八宗罪
  8. RocksDB问题点解决及相关学习记录
  9. 监控电脑屏幕python
  10. 使用计算机数据采集的优点,什么是数据采集器