阿里云ONS消息队列入门指南
文章目录
- 概述
- springboot整合接入
- mq消费者
- mq 生产者
概述
消息队列 RocketMQ 是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双 11 使用的核心产品
阿里云官方接入文档:https://help.aliyun.com/document_detail/29553.html?spm=a2c4g.11186623.6.570.632b7059ZI0shf
springboot整合接入
pom文件
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.0.Final</version></dependency>
application.yml
mq:consumerId: @ons.consumerId@accessKeyId: @ons.accessKeyId@accessKeySecret: @ons.accessKeySecret@onsAddr: @ons.onsAddr@automaticPackagingTopic: @ons.automaticPackagingTopic@maxReconsumeTimes: @ons.maxReconsumeTimes@
不同的环境中对应不同的properties文件
mq.consumerId=xxx
mq.accessKeyId=xxx
mq.accessKeySecret=xxx
mq.onsAddr=xxx
mq.automaticPackagingTopic=xxx
mq.maxReconsumeTimes=3
mq消费者
@Component
public class MessageConsumer {@Value("${mq.consumerId}")private String consumerId;@Value("${mq.accessKeyId}")private String accessKey;@Value("${mq.accessKeySecret}")private String secretKey;@Value("${mq.onsAddr}")private String onsAddr;@Value("${mq.automaticPackagingTopic}")private String topic;@Value("${mq.maxReconsumeTimes}")private String maxReconsumeTimes;private Consumer consumer;private final Map<String, MessageListener> listenerMap =new ConcurrentHashMap<>();@Autowiredprivate MessageConsumer(Map<String, MessageListener> listenerMap){this.listenerMap.clear();listenerMap.forEach((k,v)->this.listenerMap.put(v.getType(),v));}@PostConstructpublic void init(){LogFactory.mqlog.info("Consumer 开始启动...");Properties properties = new Properties();properties.put(PropertyKeyConst.GROUP_ID, consumerId);properties.put(PropertyKeyConst.AccessKey, accessKey);properties.put(PropertyKeyConst.SecretKey, secretKey);properties.put(PropertyKeyConst.NAMESRV_ADDR, onsAddr);properties.put(PropertyKeyConst.MaxReconsumeTimes, maxReconsumeTimes);consumer = ONSFactory.createConsumer(properties);consumer.subscribe(topic,"*", (message,context)->listenerMap.get(message.getTag()).consume(message,context));consumer.start();LogFactory.mqlog.info("Consumer 启动完成");}}
例如消费两种消息
public interface MessageListener {/*** 获取listener类型* @return*/String getType();/*** 消费消息* @param message* @param context* @return*/Action consume(final Message message, final ConsumeContext context);
}
一类接收足球消息
@Component
public class FootBallListener implements MessageListener {private static final String TAG_FootBall = "TAG_FootBall";@Overridepublic Action consume(Message message, ConsumeContext context) {LogFactory.mqlog.info("receive :" + message.toString());try {// do what u should doLogFactory.mqlog.info("consume success");return Action.CommitMessage;} catch (Exception e) {LogFactory.mqlog.error("consume fail:" + e);return Action.ReconsumeLater;}}@Overridepublic String getType() {return TAG_FootBall;}
}
接收乒乓球消息
@Component
public class PingPangListener implements MessageListener {public static final String TAG_PingPang = "TAG_PingPang";@Overridepublic Action consume(Message message, ConsumeContext context) {LogFactory.jvopfLog.info("receive:" + message.toString());try {//你的业务代码if(true){LogFactory.mqlog.info("consume Success");return Action.CommitMessage;}else {//比较重要的消息,失败后重试LogFactory.mqlog.info("consume Fail");return Action.ReconsumeLater;}}catch (Exception e){LogFactory.mqlog.error("consume Exception:" + e);return Action.ReconsumeLater;}}@Overridepublic String getType() {return TAG_PingPang;}
}
mq 生产者
摘自 : https://help.aliyun.com/document_detail/29553.html?spm=a2c4g.11186623.6.570.dc4d2e77x4Y8BP
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring {public static void main(String[] args) {/*** 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中*/ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");Producer producer = (Producer) context.getBean("producer");//循环发送消息for (int i = 0; i < 100; i++) {Message msg = new Message( //// Message 所属的 Topic"TopicTestMQ",// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤"TagA",// Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式"Hello MQ".getBytes());// 设置代表消息的业务关键属性,请尽可能全局唯一// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发// 注意:不设置也不会影响消息正常收发msg.setKey("ORDERID_100");// 发送消息,只要不抛异常就是成功try {SendResult sendResult = producer.send(msg);assert sendResult != null;System.out.println("send success: " + sendResult.getMessageId());}catch (ONSClientException e) {System.out.println("发送失败");}}}}
阿里云ONS消息队列入门指南相关推荐
- 【结果很简单,过程很艰辛】记阿里云Ons消息队列服务.NET接口填坑过程
Maybe 这个问题很简单,因为解决方法是非常简单,但填坑过程会把人逼疯,在阿里云ONS工作人员.同事和朋友的协助下,经过一天的调试和瞎捣鼓,终于解决了这个坑,把问题记下来,也许更多人在碰到类似问题的 ...
- C#实现阿里云微消息队列LMQ
本文属于个人原创作品.个人总结,谢绝转载.抄袭.如果您有疑问或者希望沟通交流,可以联系QQ:865562060. 一.简介 MQ 微消息队列(Light Message Queue,简称 LMQ): ...
- 阿里云微消息队列 MQTT
前言 因为工作上的事情比较繁忙,近期的博客更新率已经创了新低,所以想着把一些工作上的调研笔记
- springboot集成阿里ons消息队列发布订阅消息功能
此处的项目是springboot项目.使用队列的产品是阿里云ons 消息队列 阿里云的ons消息队列是基于rockermq 项目环境.jdk1.8 使用阿里ons开发的api接口实现发布定于功能生产和 ...
- 阿里云ONS而微软Azure Service Bus体系结构和功能比较
阿里云ONS而微软Azure Service bus体系结构和功能比较 版权所有所有,转载请注明出处http://blog.csdn.net/yangzhenping.谢谢! 阿里云的开放消息服务: ...
- 项目分布式部署那些事(1):ONS消息队列、基于Redis的Session共享,开源共享
因业务发展需要现在的系统不足以支撑现在的用户量,于是我们在一周之前着手项目的性能优化与分布式部署的相关动作. 概况 现在的系统是基于RabbitHub(一套开源的开发时框架)和Rabbit.WeiXi ...
- JAVA整合阿里云ONS(RocketMQ)
前言 关于阿里云ONS我这里不多介绍,用的话直接去看官网,这里提几个实际对接的问题 问题一: TCP版的不支持本地调试,只支持公网链接 问题二: ONS不支持批量消息 问题三: ONS延迟消息的时间是 ...
- 阿里云机器学习PAI-快速上手指南
阿里云机器学习PAI-快速上手指南 What is 机器学习 机器学习指的是机器通过统计学算法,对大量的历史数据进行学习从而生成经验模型,利用经验模型指导业务.目前机器学习主要在以下一些方面发挥作用: ...
- 千亿级金融场景下,基于Pulsar的云原生消息队列有怎样的表现?
导语 | 云原生场景,多语言.多种协议兼容,任意多的消息 Topic.任意多的消费者,性能的按需快速扩展成为消息队列基本的要求.本文是对腾讯TEG技术委员会专家工程师刘德志老师在云+社区沙龙 onli ...
最新文章
- 深度解析ASP.NET2.0中的Callback机制
- Python中的正则
- mysql备份工具xtr_mysql-xtrbackup备份与恢复
- 学python五大理由_学习Python的五大理由
- 二、Java 面向对象高级——Collection、泛型
- 在Java中调用Python,java面试题,java初级笔试题
- jacket for matlab,打印本页 - 在联想系统上使用Jacket For Matlab
- 香肠派对电脑版_6款好玩的吃鸡小游戏,和平精英、香肠派对、迷你攻势、、、...
- 语文招教考试-古今中外神话故事汇总,教育心理学知识点
- Win8.1激活方法
- 大话开发板技术支持——在www.ouravr.com上看到一个老兄对一个开发板淘宝代理提出控诉之后...
- 销售管理系统er图_这套电商订单管理系统,90%电商玩家都受用
- 做PPT使用的矢量图标网站
- 吴晓慧讲述:“随手记安全吗”网贷平台“出清”利好行业发展
- Windows安全中心打开空白
- 【线性代数】6-6:相似矩阵(Similar Matrices)
- 洛谷P2664 树上游戏 【点分治 + 差分】
- 【FinE】FamaFrench 5 Factors asset pricing Model(FF五因子模型)
- 实时操作系统---任务管理
- 扫描电镜下的人体感官结构,超震撼