spring cloud笔记(1) 整合消息中间件ONS(商业版rocketmq)
之前一直用spring boot+dubbo+zookeeper来搭建分布式项目,后来在网上看到有人说用spring cloud更好,前两天抽时间了解了一下,spring cloud 为我们提供了许多分布式系统的组件(服务注册与发现、服务网关、客户端负载均衡、配置中心、断路器、消息总线等),我们写少量的代码和配置就能搭建分布式的应用。不过我个人觉得,在搭建系统的时候,不一定要全部使用spring cloud里面所有的组件,可以根据自己的喜好或者习惯使用其它第三方的组件来代替spring cloud里面的部分组件,就像玩王者农药你可以选择推荐出装,也可以根据自己的喜好来搭配。
前两天基于spring cloud写了几个demo,本次在之前的基础上来整合阿里的消息队列rocketmq,因为开源版本不支持事务消息,所以我选择了商业版ONS,项目结构如下:
- spcd-registry 服务注册中心
- spcd-core 系统核心组件 其它子项目都会依赖它
- spcd-wsapi 所有服务接口都定义在这里面
- spcd-order 订单微服务,对外提供订单相关接口
- spcd-message 消息微服务
- spcd-mobile 业务网关层
使用商业版ONS需要在阿里云控制台去开通,然后创建Topic、创建ProducerId、创建ConsumerId、授权...
一切准备就绪,开工,核心部分代码定义在spcd-core中
- 添加maven依赖
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.6.0.Final</version></dependency>
- 创建消息发送者,这里创建一个代理类MQClient用来供调用方发送消息
package com.christy.spcd.core.mq;import java.nio.charset.Charset;
import java.util.UUID;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;public class MQClient implements InitializingBean,DisposableBean {private static final Logger LOGGER = LoggerFactory.getLogger(MQClient.class);private Producer producer;private TransactionProducer transactionProducer;public MQClient(Producer producer,TransactionProducer transactionProducer) {this.producer = producer;this.transactionProducer = transactionProducer;}public MQSendResult sendMessage(ConsumeTag tag,Object body,String key){try{Message message = wrapMessage(tag, body, key);MQSendResult result = MQSendResult.convert(producer.send(message));LOGGER.info("MQ send result {}",JSON.toJSON(result));return result;}catch (Exception e){LOGGER.error(e.getMessage(),e);}return null;}public MQSendResult sendTransactionMessage(ConsumeTag tag,Object body,String key,LocalTransactionExecuter executer,Object arg){try{Message message = wrapMessage(tag, body, key);MQSendResult result = MQSendResult.convert(transactionProducer.send(message, executer, arg));LOGGER.info("MQ send result {}",JSON.toJSON(result));return result;}catch (Exception e){LOGGER.error(e.getMessage(),e);}return null;}private Message wrapMessage(ConsumeTag tag,Object body,String key){byte[] byteBody = null;String topic = tag.getTopic();if(body.getClass().isPrimitive() || body.getClass() == String.class) {byteBody = String.valueOf(body).getBytes(Charset.forName("UTF-8"));} else {byteBody = JSON.toJSONBytes(body);}key = topic + "_" + tag.name() + "_" + key;Message message = new Message(topic,tag.name(),key,byteBody);message.putUserProperties("messageType", body.getClass().getName());message.putUserProperties("_uuid", System.nanoTime() + UUID.randomUUID().toString().replaceAll("-", ""));return message;}@Overridepublic void afterPropertiesSet() throws Exception {if(producer != null){producer.start();}if(transactionProducer != null){transactionProducer.start();}}@Overridepublic void destroy() throws Exception {if(producer != null){producer.shutdown();}if(transactionProducer != null){transactionProducer.shutdown();}}
}
sendMessage用于发送普通消息,sendTransactionMessage用于发送事务消息
- 实例化MQClient(这里spcd-order作为消息发送方)
package com.christy.spcd.order;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.christy.spcd.core.mq.ConsumeSpec;
import com.christy.spcd.core.mq.DefaultMQConfig;
import com.christy.spcd.core.mq.MQClient;
import com.christy.spcd.core.mq.MQClientBuilder;
import com.christy.spcd.core.mq.ProducerId;@Configuration
public class MQConfig extends DefaultMQConfig{@Beanpublic MQClient mqClient(){Properties properties = mqProperties();properties.put(PropertyKeyConst.ProducerId, producerId().name());return new MQClientBuilder(springFactory, properties).build();}@Overridepublic List<ConsumeSpec> registerConsumeTags() {List<ConsumeSpec> consumeSpecs = new ArrayList<ConsumeSpec>();return consumeSpecs;}@Overrideprotected ProducerId producerId() {return ProducerId.PID_ORDER_007;}}
package com.christy.spcd.core.mq;import java.util.Properties;import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.christy.spcd.core.SpringFactory;public class MQClientBuilder {private SpringFactory springFactory;private Properties properties;public MQClientBuilder(SpringFactory springFactory,Properties properties) {this.springFactory = springFactory;this.properties = properties;}public MQClient build(){Producer producer =ONSFactory.createProducer(properties);TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties,localTransactionChecker());return new MQClient(producer, transactionProducer);}private LocalTransactionChecker localTransactionChecker(){DefaultTransactionChecker checker = new DefaultTransactionChecker();springFactory.initializeBean(checker);return checker;}}
package com.christy.spcd.core.mq;import java.util.List;
import java.util.Properties;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.christy.spcd.core.SpringFactory;public abstract class DefaultMQConfig implements ApplicationContextAware{protected SpringFactory springFactory;public Properties mqProperties(){Properties properties = new Properties();properties.put(PropertyKeyConst.AccessKey, "在阿里云后台创建的AccessKey");properties.put(PropertyKeyConst.SecretKey, "在阿里云后台创建的AccessSecretKey");return properties;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.springFactory = applicationContext.getBean(SpringFactory.class);}public abstract List<ConsumeSpec> registerConsumeTags();protected ProducerId producerId() {return null;}protected ConsumerId consumerId(){return null;}}
到此消息的发送者已经准备就绪,我们模拟一下订单支付的业务,spcd-order对外提供一个订单支付接口pay, spcd-mobile作为业务网关层调用spcd-order的支付接口,spcd-order在完成支付动作后,发出一个订单支付成功的消息,spcd-message负责消费这个消息。
- 首先我们定义一个TAG 其中ORDER1 为TOPIC
package com.christy.spcd.core.mq;public enum ConsumeTag {ORDER_PAID_SUCCEED("ORDER1"),;private String topic;ConsumeTag(String topic) {this.topic = topic;}public String getTopic() {return topic;}
}
- 然后定义一个消息体OrderPaidSucceedMessage(如果是基础数据类型或者String类型也可以不定义),为了在消费方能看到这个消息体,这里将它定义在spcd-wsapi中
package com.christy.spcd.wsapi.order.message;import java.io.Serializable;public class OrderPaidSucceedMessage implements Serializable{private static final long serialVersionUID = 8673315516088031608L;private Integer orderId;public OrderPaidSucceedMessage() {}public OrderPaidSucceedMessage(Integer orderId) {this.orderId = orderId;}public Integer getOrderId() {return orderId;}public void setOrderId(Integer orderId) {this.orderId = orderId;}
}
- 在spcd-order 的支付接口的实现中通过注入MQClient来发送消息
@Overridepublic String pay(Integer orderId) {MQSendResult result = mqClient.sendMessage(ConsumeTag.ORDER_PAID_SUCCEED, new OrderPaidSucceedMessage(orderId), String.valueOf(orderId));if(result == null){return "支付失败了";}return result.getMessageId();}
消息发送方部分已完成,接下来实现消费方
- 注册消费者以及订阅消息
package com.spcd.message;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.christy.spcd.core.mq.ConsumeSpec;
import com.christy.spcd.core.mq.ConsumeTag;
import com.christy.spcd.core.mq.ConsumerId;
import com.christy.spcd.core.mq.DefaultMQConfig;
import com.christy.spcd.core.mq.MQConsumerFactory;
import com.spcd.message.common.listener.OrderPaidSucceedMessageListener;
@Configuration
public class MQConfig extends DefaultMQConfig{@Beanpublic MQConsumerFactory mqConsumerFactory(){Properties properties = mqProperties();properties.put(PropertyKeyConst.ConsumerId, consumerId().name());return new MQConsumerFactory(properties,registerConsumeTags());}@Overridepublic List<ConsumeSpec> registerConsumeTags() {List<ConsumeSpec> consumeSpecs = new ArrayList<ConsumeSpec>();consumeSpecs.add(new ConsumeSpec(ConsumeTag.ORDER_PAID_SUCCEED, OrderPaidSucceedMessageListener.class));return consumeSpecs;}@Overrideprotected ConsumerId consumerId() {return ConsumerId.CID_MESSAGE_007;}}
package com.christy.spcd.core.mq;import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.christy.spcd.core.SpringFactory;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;public class MQConsumerFactory {@Autowiredprivate SpringFactory springFactory;private Properties properties;private List<ConsumeSpec> consumeSpecs;public MQConsumerFactory( Properties properties, List<ConsumeSpec> consumeSpecs){this.springFactory = springFactory;this.properties = properties;this.consumeSpecs = consumeSpecs;}@PostConstructpublic void createConsumers(){if(CollectionUtils.isNotEmpty(consumeSpecs)){for (ConsumeSpec consumeSpec : consumeSpecs) {Consumer consumer = ONSFactory.createConsumer(properties);MessageListener messageListener = springFactory.getOrCreateBean(consumeSpec.getMessageListenerCls());consumer.subscribe(consumeSpec.getTag().getTopic(), consumeSpec.getTag().name(), messageListener);consumer.start();}}}}
- 实现消息监听器
package com.christy.spcd.core.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;public abstract class MQMessageListener<T> implements MessageListener{private static final Logger LOGGER = LoggerFactory.getLogger(MQMessageListener.class);@SuppressWarnings("unchecked")@Overridepublic Action consume(Message message, ConsumeContext consumecontext) {T messageBody = null;Class<?> messageType;try {messageType = Class.forName(message.getUserProperties("messageType"));if(messageType.isPrimitive() || messageType == String.class) {messageBody = (T)new String(message.getBody());} else {messageBody = JSON.parseObject(message.getBody(), messageType);}return onMessage(messageBody);} catch (Exception e) {LOGGER.error(e.getMessage(),e);return Action.ReconsumeLater;}}public abstract Action onMessage(T message);}
package com.spcd.message.common.listener;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.aliyun.openservices.ons.api.Action;
import com.christy.spcd.core.mq.MQMessageListener;
import com.christy.spcd.wsapi.order.message.OrderPaidSucceedMessage;public class OrderPaidSucceedMessageListener extends MQMessageListener<OrderPaidSucceedMessage>{private static final Logger LOGGER = LoggerFactory.getLogger(OrderPaidSucceedMessageListener.class);@Overridepublic Action onMessage(OrderPaidSucceedMessage message) {LOGGER.info("received message is {}",message.getOrderId());return Action.CommitMessage;}}
OK 普通消息的发送与消费已实现,我们在spcd-mobile中开放一个外部访问接口
@RequestMapping("/pay")public String pay(@RequestParam("orderId")Integer orderId){return orderApi.pay(orderId);}
依次启动spcd-registry spcd-order spcd-message spcd-mobile
然后访问 http://127.0.0.1:7777/mobile/pay?orderId=123
如果消息发送成功会在页面输出一个msgId,这个时候如果spcd-message中接收到消息 会在控制台输出 received message is 123
如果控制台报错提示权限问题,可以通过阿里云管理控制台配置一下访问权限
如果消息发送成功而spcd-message中收不到消息可以通过阿里云管理控制台发送测试消息,检查是否发送方或消费方配置有问题
接下来我们开始实现发送事务消息,使用事务消息能够保证发送方的本地事务能够执行完毕,否则消费方将不会收到该消息。而消费方的事务完整性一般是通过重试机制来实现最终一致,所以需要注意在消费方做到幂等。
发送事务消息的接口如下所示,可以看到它比发送普通消息多了两个参数,一个是LocalTransactionExecuter类型, 这个就是用来执行本地事务,另外一个Object类型,这个是在执行本地事务的参数,没有可以设置null
@Overridepublic String pay2(Integer orderId) {MQSendResult result = mqClient.sendTransactionMessage(ConsumeTag.ORDER_PAID_SUCCEED, new OrderPaidSucceedMessage(orderId), String.valueOf(orderId),orderPaidTransactionExecuter,orderId);if(result == null){return "支付失败了";}return result.getMessageId();}
我们需要为每个事务消息实现对应的本地事务执行器和消息回查方法
- 消息回查(如果在执行本地事务时返回Unknow,MQ会定时回调我们的消息回查方法)
package com.christy.spcd.core.mq;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;public class DefaultTransactionChecker implements LocalTransactionChecker{@Autowired(required=false)private List<TransactionCheckStrategy<?>> strategies;@Overridepublic TransactionStatus check(Message message) {for (TransactionCheckStrategy<?> strategy : strategies) {if(strategy.support(ConsumeTag.valueOf(message.getTag()))){return strategy.check(message);}}return TransactionStatus.Unknow;}}
package com.christy.spcd.core.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;public abstract class TransactionCheckStrategy<T> implements LocalTransactionExecuter {private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCheckStrategy.class);@SuppressWarnings("unchecked")public TransactionStatus check(Message message){T messageBody = null;Class<?> messageType;try {messageType = Class.forName(message.getUserProperties("messageType"));if(messageType.isPrimitive() || messageType == String.class) {messageBody = (T)new String(message.getBody());} else {messageBody = JSON.parseObject(message.getBody(), messageType);}return checkLocalTransaction(messageBody);} catch (Exception e) {LOGGER.error(e.getMessage(),e);return TransactionStatus.Unknow;}}public abstract boolean support(ConsumeTag tag);public abstract TransactionStatus checkLocalTransaction(T message);}
- 为消息发送方定制消息本地事务处理器和消息回查方法,为了简单明了,我将它们都放到一个类中实现。
package com.christy.spcd.order.tx;import org.springframework.stereotype.Service;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import com.christy.spcd.common.config.TransactionConfig;
import com.christy.spcd.core.mq.ConsumeTag;
import com.christy.spcd.core.mq.TransactionCheckStrategy;
import com.christy.spcd.wsapi.order.message.OrderPaidSucceedMessage;
@Service
public class OrderPaidTransactionExecuter extends TransactionCheckStrategy<OrderPaidSucceedMessage>{@Overridepublic TransactionStatus execute(Message message, Object arg) {Integer orderId = (Integer) arg;TransactionConfig.put(orderId, true);return TransactionStatus.CommitTransaction;}@Overridepublic boolean support(ConsumeTag tag) {return ConsumeTag.ORDER_PAID_SUCCEED == tag;}@Overridepublic TransactionStatus checkLocalTransaction(OrderPaidSucceedMessage message) {if(TransactionConfig.isComplated(message.getOrderId())){return TransactionStatus.CommitTransaction;}return TransactionStatus.RollbackTransaction;}
}
这样发送事务消息部分就完成了,因为事务是针对发送方的所以消费方不需要做改动。
https://github.com/sergewu/spcd
THE END
转载于:https://my.oschina.net/serge/blog/1510298
spring cloud笔记(1) 整合消息中间件ONS(商业版rocketmq)相关推荐
- spring学习笔记06-spring整合junit(出现的问题,解决的思路)
spring学习笔记06-spring整合junit(出现的问题,解决的思路) 文章目录 spring学习笔记06-spring整合junit(出现的问题,解决的思路) 3.1测试类中的问题和解决思路 ...
- Spring Cloud Data Flow整合UAA之使用LDAP进行账号管理
我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 Spring Cloud Data Flow整合UAA的文章已经写了两篇,之前的方案是把用户信息保存在数据库 ...
- Spring Cloud消息驱动整合 1
Spring Cloud Stream 使用场景 消息驱动的微服务应用 目的 简化代码 统一抽象 主要概念 1.应用模型 2.Binder抽象 3.持久化 发布/订阅支持 4.消费分组支持 5.分区支 ...
- Spring Cloud Data Flow整合UAA使用外置数据库和API接口
我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 之前的文章<Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限 ...
- Spring Cloud 笔记
文章目录 第⼀部分 微服务架构 第 1 节 互联⽹应⽤架构发展(回顾) 第 2 节 微服务架构体现的思想及优缺点 第 3 节 微服务架构中的⼀些概念 第⼆部分 Spring Cloud 综述 第 1 ...
- Spring Cloud笔记 中级篇
Hystrix(豪猪哥)断路器 背景分析 我们微服务的出现满足了高内聚低耦合的设计思想.这样固然是好的,但随之而来的是微服务的不断增多,经常会出现80端口调用8001,8001再去调用8002,800 ...
- Spring Cloud笔记
目录 1.如何调用底层服务? RestTemplate 2.是如何找到底层服务的呢?Eureka 3.服务负载均衡 Ribbon 4.写RestTemplate太繁琐?提供方与调用方沟通成本大,效率不 ...
- Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限控制
我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 关于Spring Cloud Data Flow这里不多介绍,有兴趣可以看下面的文章.本文主要介绍如何整合D ...
- spring cloud+zookeeper+feign整合 简单实例(一)
一.前言 各位热爱知识的小伙伴们大家好呀!很高兴大家能点开这个博客,这是我个人的第一篇博客,之后也会持续的更新java以及spring项目的相关代码,希望大家持续关注.如果对本篇博客有什么不懂的地方或 ...
最新文章
- 关于阿里面试的的一个小题
- 【django】配置URLconf
- 缓冲区溢出(buffer overflow)机理分析
- SAP Spartacus加载delivery region的实现
- 使用opencv的LBF算法进行人脸关键点检测
- mysql存储引擎的区别_Mysql的两种存储引擎以及区别
- 4步教你玩转可视化大屏设计|内附实际操作
- container_of详解
- logback 配置详解
- 计算机应用基础工作页,计算机应用基础工作页
- Linux7.2虚拟机连接电脑相关配置
- oracle恢复被覆盖的存储过程
- Java笔记1(2015-8-30)
- 做好产品经理,需要具备哪些技能?
- vue——常用的第三方插件安装合集(可详细了!持续更新)
- android 清理工具,安卓清理君深度清理软件/真心强
- AndroidStudio如何删除Modle
- 三步快速搭建android开发环境 (下载包已集成可用sdk,无需费心到google相应网站下载,快哉!)
- 超级女生最适合最妻子指数评析
- 40 个超棒的免费 Bootstrap HTML5 网站模板
热门文章
- QT + VS + C++ <QComboBox基础认识>
- 【李宏毅2020机器学习深度学习(完整版)国语】P22 Semi-supervised
- Java---toString
- html在苹果手机上显示不出来,苹果手机下拉菜单显示不出来也下不来的解决方法...
- 蓝桥杯大赛(大学B组)——2020省赛 跑步锻炼 (C语言)
- 大数据也要有自已的特色
- labview通过串口控制风扇
- BIOS调整服务器性能模式,如何修改BIOS的设置,让显卡发挥最佳性能?
- ik分词器的下载和使用
- 美团配送数据治理实践【转载美团技术团队博客】