开始先分享一下借鉴的帖子有:
1.Spring整合JMS(二)——三种消息监听器
2.RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知
推荐先把上面两个文章看完,会对后面的阅读有帮助。

消息队列

消息队列有很多种:ActiveMQ,RabbitMQ,ZeroMQ,Kafka,RocketMQ等。本文不再细述AMQ是什么、如何搭建使用,初学者可先学习AMQ原理和用法。
翻了很多帖子很少有用AMQ实现梯度通知的,所以特此写一个。

应用场景

一个的交易系统,当用户消费成功后,需要发送通知,告知这笔订单的商户,该订单已被支付了。这种业务场景与微信和支付宝的支付成功回调通知雷同。微信支付结果通知接口文档

代码实现

业务代码:
代码用了lombok,所以对 @Data 这个注解陌生的同学先导入相应的jar包才能使用。以下是几个简单的POJO类。

/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 消费的请求类,一些敏感字段我就不展示了*/
@Data
public class SaleReq extends BaseReq {//商户号private String merchId;//金额private String totallAmt;//卡号private String ecardId;//订单号private String orderId;//回调通知地址private String notifyUrl;private String appId;//订单的附属数据,可以理解为备注remarkprivate String attach;//支付通知发送次数private int notifyTime;
}
/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 回调通知请求类*/
@Data
public class Notify4ConsumReq implements Serializable {private String appId;private String timestamp;private String orderId;private String amount;private String ecardId;private String storeId;private String attach;private String resultCode;private String errCode;private String errorMsg;private String sign;
}

/*** @Author: Gavin* @DateTime: 2018/8/14* @Des:*/
@Data
public class Notify4ConsumRsp implements Serializable {//响应码private String retCode;//响应信息private String retMsg;
}

消费业务执行完后判断消费是否成功,成功并且请求带有支付成功回调通知url和appid则调用回调通知方法,产生一个通知消息
PS:appid是用于报文验签,防止第三方商户收到假的支付成功回调通知。

/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 消费的controller*/
@Controller
public class SaleController{@AutowiredISaleService mSaleService;@AutowiredISaleAsynService saleAsynService;@RequestMapping(value = "/Sale", method = RequestMethod.POST, produces = {"application/json;charset=UTF-8"})@ResponseBodypublic String Sale(@RequestBody CommunicationReq param) {try {//消费业务,具体代码忽略rsp = mSaleService.doSale(procid, req, rsp);} catch (Exception e) {rsp.setRetCode(Constant.FAIL);rsp.setRetMsg("系统错误");return rsp;}//支付回调,当消费成功 && 有回调地址 && 请求带有appid 则调用支付回调方法(消息队列的生产者)if(Constant.SUCCESS.equals(rsp.getRetCode()) && StringUtils.isNotBlank(req.getNotifyUrl()) && StringUtils.isNotBlank(req.getAppId())){try {saleAsynService.notify4Produce(procid, req);}catch (Exception e){}}return rsp;}
}
/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 消费的异步处理业务*/
@Service("saleAsynService")
public class SaleAsynServiceImpl implements ISaleAsynService {@AutowiredProducerService producerService;@Autowired@Qualifier("adapterQueue")Destination adapterQueue;@AutowiredIAppInfoDao appInfoDao;@Overridepublic void notify4Produce(String procid, SaleReq saleReq) {//产生消息log.info("发送支付通知内容:" + JsonUtil.Object2Json(saleReq));//初始发送次数为第一次 这是一个关键点saleReq.setNotifyTime(1);producerService.sendMessage(adapterQueue, saleReq, 0);log.info("发送支付通知成功!!!");}@Transactional(readOnly = true)@Overridepublic Notify4ConsumRsp notify4Consum(String procid, SaleReq saleReq) {Notify4ConsumRsp rsp = new Notify4ConsumRsp();rsp.setRetCode("success");//查找appid//找不到appid,无法做签名,所以直接不发通知。if(StringUtils.isBlank(saleReq.getAppId())){log.info("无appid1,不发送通知");return rsp;}AppInfo appInfo = appInfoDao.get(saleReq.getAppId());if(appInfo==null){log.info("无appid2,不发送通知");return rsp;}//拼装通知请求对象Notify4ConsumReq req = new Notify4ConsumReq();req.setAppId(saleReq.getAppId());req.setAmount(saleReq.getTotallAmt());req.setAttach(saleReq.getAttach());req.setEcardId(saleReq.getEcardId());req.setOrderId(saleReq.getOrderId());req.setResultCode("success");req.setStoreId(saleReq.getMerchId());//时间戳是为了签名的不确定性,此字段可以换成一个随机字符串req.setTimestamp(DateUtil.formatDateToString(new Date(),"yyyyMMddHHmmsss"));//生成签名String sign="";try {sign = SignUtil.generateSignature(SignUtil.beanToMap(req),appInfo.getAppSecret(),SignType.valueOf(appInfo.getSignType()));} catch (Exception e) {log.info("签名失败="+CommonUtil.getTrace(e));}req.setSign(sign);//向第三方支付系统发送校验请求String str=null;if(saleReq.getNotifyUrl().contains("http")){str = HttpClientUtil.postHttpJson(saleReq.getNotifyUrl() , JsonUtil.Object2Json(req), 10*1000);}else{str = HttpClientUtil.postHttpsJson(saleReq.getNotifyUrl() , JsonUtil.Object2Json(req), Constant.TIME_OUT);}if (str == null) {log.info("请求支付通知系统失败,数据返回为空");rsp.setRetCode("fail");return rsp;}log.info("支付通知系统返回结果:" + str);rsp = (Notify4ConsumRsp) JsonUtil.json2Object(str,Notify4ConsumRsp.class);return rsp;}
}

上面的代码都是和业务有关,以下是AMQ的代码实现了。

/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 消息生产者*/
@Component
public class ProducerServiceImpl implements ProducerService {@Autowiredprivate JmsTemplate jmsTemplate;@Autowired@Qualifier("responseQueue")private Destination responseDestination;@Overridepublic void sendMessage(String procid,Destination destination, final String message,final int delayTime) {log.info("---------------生产者开始发送消息-----------------");log.info("消息内容:" + message);jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage(message);textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delayTime);return textMessage;}});log.info("---------------生产者发送消息完毕-----------------");}@Overridepublic void sendMessage(final Destination destination, final Serializable obj,final long delayTime) {jmsTemplate.convertAndSend(destination, obj, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws JMSException {message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);return message;}});}
}
/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 消息消费者,监听接收消息*/
public class ConsumerMessageListener implements SessionAwareMessageListener {@Autowired@Qualifier("myMessageConverter")private MessageConverter messageConverter;@Autowired@Qualifier("adapterQueue")private Destination adapterQueue;@AutowiredISaleAsynService saleAsynService;@AutowiredProducerService producerService;private static final Map<Integer, Long> myMap;//初始话一个map,这个map的key代表通知重发次数,value是通知间隔时间//比如,第一次重发间隔5秒,第二次重发间隔10秒以此类推//通知频率为5/10/30/180/1800,单位:秒static {myMap = new HashMap<Integer, Long>();myMap.put(1, 5 * 1000L);myMap.put(2, 10 * 1000L);myMap.put(3, 30 * 1000L);myMap.put(4, 180 * 1000L);myMap.put(5, 1800 * 1000L);
//        myMap.put(6, 1800*1000L);
//        myMap.put(7, 1800*1000L);
//        myMap.put(8, 3600*1000L);}@Overridepublic void onMessage(Message message, Session session) {try {if  (message instanceof ObjectMessage) {ObjectMessage objMessage = (ObjectMessage) message;Object obj = messageConverter.fromMessage(objMessage);log.info("接收到一个ObjectMessage:" + JsonUtil.Object2Json(obj));if (obj instanceof SaleReq) {SaleReq saleReq = (SaleReq) obj;log.info("这是一个SaleReq类");Notify4ConsumRsp rsp = saleAsynService.notify4Consum(procid, saleReq);log.info("Notify4ConsumRsp=" + JsonUtil.Object2Json(rsp));if ("success".equals(rsp.getRetCode())) {//通知成功} else {//通知失败,重发//重发一次,通知次数+1,当次数超过预设的值5时,就不再重发通知。int notifytime = saleReq.getNotifyTime();LogUtil.addProclog(procid, "Time=" + notifytime);if (notifytime <= 5) {saleReq.setNotifyTime(saleReq.getNotifyTime() + 1);producerService.sendMessage(adapterQueue, saleReq, myMap.get(notifytime));}}} else {log.info("不能识别的JAVA类");}}} catch (Exception e) {log.info("消费者处理错误=" + CommonUtil.getTrace(e));}}}
/*** @Author: Gavin* @DateTime: 2018/8/14 * @Des: 这个是message的转换类。*/
public class MyMessageConverter implements MessageConverter {@Overridepublic Message toMessage(Object object, Session session)throws JMSException, MessageConversionException {return session.createObjectMessage((Serializable) object);}@Overridepublic Object fromMessage(Message message) throws JMSException,MessageConversionException {ObjectMessage objMessage = (ObjectMessage) message;return objMessage.getObject();}}

ActiveMQ的配置文件就不贴出来了,看完第一篇文章应该已经把ActiveMQ和spring融合一起了。
代码有很多删减过,所以直接复制代码是行不通的,仅展示主要的业务逻辑。

第二次写博,如有疑问或者建议可以给我留言哦,期待你的留言(●’◡’●)

使用ActiveMQ实现阶梯式消息通知相关推荐

  1. Redis消息通知系统的实现

    Redis消息通知系统的实现 Posted on 2012-02-29 by 老王 http://huoding.com/2012/02/29/146 最近忙着用Redis实现一个消息通知系统,今天大 ...

  2. [UWP]实现一个轻量级的应用内消息通知控件

    [UWP]实现一个轻量级的应用内消息通知控件 原文:[UWP]实现一个轻量级的应用内消息通知控件 在UWP应用开发中,我们常常有向用户发送一些提示性消息的需求.这种时候我们一般会选择MessageDi ...

  3. SCOM警报通知新特性:即时消息通知

    使用过SCOM的朋友应该了解,相对MOM,SCOM增加了"即时消息"和"短信"两种警报通知方式.今天我将给大家介绍其中之一,"即时消息"告警 ...

  4. NSNotification消息通知实现源码(观察者模式原理)

    先简单介绍苹果封装的消息通知,再献上根据观察者模式原理实现的源码供参考. 消息通知 对于观察者模式,苹果封装了消息通知(NSNotification)和通知中心(NSNotificationCente ...

  5. html 消息通知功能,HTML5之消息通知的使用(Web Notification)

    关于 HTML5 ,写了不少文章,总觉得相关的高级 API 都得过一遍.系统的了解,站在更高的高度去思考问题,这样才能事半功倍. 一.先睹为快 我们先来尝试一个最简单的例子,打开 chrome 开发者 ...

  6. Laravel——消息通知

    有的时候,在做一些业务的时候,可能会遇到这么个需求.那就是,别人评论了你的某个东西,或者是关注你,再或者是收藏了你的文章,那么作者,应该是需要被通知一下,以展现一下作者该有的成果,也可以满足一下作者小 ...

  7. Java中集成极光推送实现给Android提送消息通知(附代码下载)

    场景 Android中集成极光推送实现推送消息通知与根据别名指定推送附示例代码下载: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details ...

  8. Android中集成Jpush实现推送消息通知与根据别名指定推送附示例代码下载

    场景 经常会有后台服务向Android推送消息通知的情况. 实现 首先在Android Studio中新建一个Android应用 在Project根目录的build.gradle中配置了jcenter ...

  9. html5的消息通知

    这里介绍一个HTML5的notification demo: <!DOCTYPE html> <html><head><meta charset=" ...

最新文章

  1. JavaScript 的同源策略
  2. jQuery获取json数据
  3. CentOS系统基础优化16条知识汇总
  4. JavaScript 教程(二)
  5. 【C++教程】02.环境配置
  6. rpm安装mysql报错_【CentOS-65】通过rpm包安装mysql57解决了server报错和mysqld启动报错的问题...
  7. 大数据将如何重构汽车产业的商业模式?
  8. chrome浏览器崩溃_不只是您:Chrome浏览器在Windows 10的2018年4月更新中崩溃
  9. RMI强制Full GC每小时运行一次
  10. 向一个数组中插入元素
  11. qq如何用其他进制登录
  12. php为什么要有非静态方法,php中非静态方法的静态调用【解释】
  13. java python混合编程_python+java混合编程
  14. 计算机怎么算对数的反函数,Excel 计算对数分布函数反函数:LOGINV函数
  15. weka API,创建线性回归时出现缺少no.uib.cipr.matrix和org.netlib.blas的解决方案
  16. 山海经异兽录找不到服务器,星辰山海经异兽录
  17. iPhone 记录之 点与像素
  18. 以太坊的单位wei是什么?
  19. 手机里拍摄的照片误删了也不怕,2招教你快速找回照片!
  20. 什么是动态域名解析服务?

热门文章

  1. Orica 如何维护安全、质量、管理风险、高标准的客户服务和员工福利所需的大量文档和内容
  2. 日内趋势票如何把握?
  3. jenkin设置定时构建及时区修改
  4. android 多语言的实现
  5. 手把手教学安装pycharm(社区版)
  6. 使用Boost::Log记录日志
  7. php的表达爱意的一句代码,一句表达爱意的古词 最能表达爱的一句话
  8. [毕业设计]基于springboot线上教学平台的管理系统
  9. python小工具开发_python音乐下载小工具源码(tkinter)
  10. 中国黑客档案:识别黑客犯罪的蛛丝马迹