生产者

下面展示一些 内联代码片


import java.util.Date;
import java.util.HashMap;
import java.util.Map;import com.downtown.enums.EnvEnum;
import com.downtown.pos.util.EnvConfig;
import com.downtown.utils.enums.Bo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;
import com.downtown.pos.log.OLog;/*** MQ服务*/
@Component
public class StoreMqService {@AutowiredRabbitTemplate rabbitTemplate;@AutowiredOLog oLog;//   @Autowired
//  MQTTService mQTTService;@Asyncpublic void sendMq(String requstId ,  String action ,Object data ) {readyGo(  requstId ,  action ,data );}private void readyGo(String requstId  ,String action  ,Object data ) {String routeKey = "storex";if(StoreMqTopicConfig.getKdsMap().containsKey(action) ) {routeKey += ".kds";}if(StoreMqTopicConfig.getFeeMap().containsKey(action) ) {routeKey += ".fee";}if(StoreMqTopicConfig.getMktMap().containsKey(action) ) {routeKey += ".mkt";}//      if(StoreMqTopicConfig.getMemberMap().containsKey(action) ) {//          routeKey += ".member";
//      }if(StoreMqTopicConfig.getPayMap().containsKey(action) ) {routeKey += ".pay";}if(StoreMqTopicConfig.getPosMap().containsKey(action) ) {routeKey += ".pos";}if (Bo.cn.name().equals(EnvEnum.getByEvn(EnvConfig.envName).getBo())){if(StoreMqTopicConfig.getAllianceMap().containsKey(action) ) {routeKey += ".alliance";}}go( requstId,    action,  routeKey  ,data  );}private void go(String requstId , String action, String routeKey   ,Object data) {try {Map<String,Object> mqOrdersData = new HashMap<>(16);mqOrdersData.put("data",data);mqOrdersData.put("id",requstId);mqOrdersData.put("addTime",new Date());mqOrdersData.put("action",action);CorrelationData correlationData = new CorrelationData(requstId);rabbitTemplate.convertAndSend(StoreMqTopicConfig.TOPICExchange, routeKey,JSON.toJSONStringWithDateFormat(mqOrdersData, "yyyy-MM-dd HH:mm:ss"),correlationData);oLog.info( "投递MQ:"+action,  JSON.toJSONStringWithDateFormat(mqOrdersData, "yyyy-MM-dd HH:mm:ss"));
//          logger.info("send to mq ,ordersId:" + ordersId + ",action:" + action +",requstId:" + requstId);}catch(Exception e){e.printStackTrace();oLog.info( "投递MQ失败:"+action,  routeKey);}}
}

交换机


package com.downtown.pos.mq;import java.util.HashMap;
import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;import com.downtown.pos.log.OLog;
import com.downtown.pos.store.storeEvent.StoreEventsActionEnum;/*** mq交换机* @author zhoucheng**/
@Configuration
public class StoreMqTopicConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@AutowiredOLog ordersLogerService;public final static String TOPICExchange = "posTopicExchange";public static Map<String,Integer> KdsMap =  new HashMap<>();public static Map<String,Integer> PayMap =  new HashMap<>();public static Map<String,Integer> PosMap =  new HashMap<>();public static Map<String,Integer> MktMap =  new HashMap<>();public static Map<String,Integer> FeeMap =  new HashMap<>();public static Map<String,Integer> AllianceMap =  new HashMap<>();static  {//{//  [RabbitmqTopic.posAddBrand]: ['.pay', '.kds', '.fee', '.mkt.1'],//  [RabbitmqTopic.posAddStore]: ['.pay', '.kds', '.member', '.fee', '.mkt', '.pos.1'],//  [RabbitmqTopic.posDelStore]: ['.pay', '.kds', '.member', '.fee', '.mkt.1'],//  [RabbitmqTopic.posMenuChange]: ['.pos.1']//}KdsMap.put(StoreEventsActionEnum.CREATE.getAction(),1);PayMap.put(StoreEventsActionEnum.CREATE.getAction(),1);FeeMap.put(StoreEventsActionEnum.CREATE.getAction(),1);MktMap.put(StoreEventsActionEnum.CREATE.getAction(),1);PosMap.put(StoreEventsActionEnum.CREATE.getAction(),1);KdsMap.put(StoreEventsActionEnum.UPDATE.getAction(),1);MktMap.put(StoreEventsActionEnum.UPDATE.getAction(),1);AllianceMap.put(StoreEventsActionEnum.UPDATE.getAction(),1);KdsMap.put(StoreEventsActionEnum.DELETE.getAction(),1);MktMap.put(StoreEventsActionEnum.DELETE.getAction(),1);AllianceMap.put(StoreEventsActionEnum.DELETE.getAction(),1);}@BeanTopicExchange exchange() {return new TopicExchange(StoreMqTopicConfig.TOPICExchange);}//  static KdsSimpleQueue = 'KdsSimpleQueue'
//    [RabbitmqQueue.KdsSimpleQueue]: '#.kds.#',@Beanpublic Queue KdsQueue() {return new Queue("KdsSimpleQueue");}@BeanBinding bindingKdsExchangeMessage() {return BindingBuilder.bind(KdsQueue()).to(exchange()).with("#.kds.#");}//  static PayQueue = 'StorePayQueue'
//  [RabbitmqQueue.PayQueue]: '#.pay.#',@Beanpublic Queue PayQueue() {return new Queue("StorePayQueue");}@BeanBinding bindingPayExchangeMessage() {return BindingBuilder.bind(PayQueue()).to(exchange()).with("#.pay.#");}//  static PosQueue = 'PosQueue'
//  [RabbitmqQueue.PosQueue]: "#.pos.#",@Beanpublic Queue PosQueue() {return new Queue("PosQueue");}@BeanBinding bindingPosExchangeMessage() {return BindingBuilder.bind(PosQueue()).to(exchange()).with("#.pos.#");}//  static MemberSimpleQueue = 'MemberSimpleQueue'
//  [RabbitmqQueue.MemberSimpleQueue]: "#.member.#",//    @Bean
//  public Queue MemberSimpleQueue() {//        return new Queue("MemberSimpleQueue");
//    }
//
//    @Bean
//    Binding bindingMemberExchangeMessage() {//        return BindingBuilder.bind(MemberSimpleQueue()).to(exchange()).with("#.member.#");
//    }//  static FeeSimpleQueue = 'FeeSimpleQueue'
//  [RabbitmqQueue.FeeSimpleQueue]: "#.fee.#",@Beanpublic Queue FeeSimpleQueue() {return new Queue("FeeSimpleQueue");}@BeanBinding bindingFeeExchangeMessage() {return BindingBuilder.bind(FeeSimpleQueue()).to(exchange()).with("#.fee.#");}//    [RabbitmqQueue.PosMkt]: "#.mkt.#"
//  [RabbitmqQueue.PosMkt]: "#.mkt.#"@Beanpublic Queue PosMkt() {return new Queue("PosMkt");}@BeanBinding bindingMktExchangeMessage() {return BindingBuilder.bind(PosMkt()).to(exchange()).with("#.mkt.#");}@Beanpublic Queue PosAlliance() {return new Queue("PosAlliance");}@BeanBinding bindingAllianceExchangeMessage() {return BindingBuilder.bind(PosAlliance()).to(exchange()).with("#.alliance.#");}@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);RetryTemplate retryTemplate = new RetryTemplate();retryTemplate.setRetryPolicy(new SimpleRetryPolicy(10));rabbitTemplate.setRetryTemplate(retryTemplate);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack) {logger.info("ConfirmCallback:"+"确认情况:"+ack);logger.info("ConfirmCallback:     "+"相关数据:"+correlationData);logger.info("ConfirmCallback:     "+"原因:"+cause);}else {//                  int cnt = ordersEventDao.updateStatus(new Date(),correlationData.getId());
//                  if(cnt > 0) {//                      try {//                          OrdersEvent oe = ordersEventDao.findByRequstIdAndStatus(correlationData.getId(),1).get(0);
//                          ordersLogerService.info(oe.getOrdersId(),  correlationData.getId(), "投递成功", oe);
//                      }catch(Exception e) {//                          e.printStackTrace();
//                      }
//                  }}}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.error("ReturnCallback:     "+"消息:"+message);logger.error("ReturnCallback:     "+"回应码:"+replyCode);logger.error("ReturnCallback:     "+"回应信息:"+replyText);logger.error("ReturnCallback:     "+"交换机:"+exchange);logger.error("ReturnCallback:     "+"路由键:"+routingKey);
//              ordersLogerService.error(message., null, null, correlationData.getId(), "投递成功", message);}});return rabbitTemplate;}public static Map<String, Integer> getKdsMap() {return KdsMap;}public static Map<String, Integer> getPayMap() {return PayMap;}public static Map<String, Integer> getPosMap() {return PosMap;}// public static Map<String, Integer> getMemberMap() {//      return MemberMap;
//  }public static Map<String, Integer> getMktMap() {return MktMap;}public static Map<String, Integer> getFeeMap() {return FeeMap;}public static Map<String, Integer> getAllianceMap() { return AllianceMap; }}

消费者

package com.downtown.service.mq;import com.alibaba.fastjson.JSON;
import com.downtown.service.alliance.storeSaleResource.service.StoreSaleResourceService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import javax.validation.Valid;
import java.io.IOException;/*** 新门店创建消息处理* @author Zhaobin*/
@Slf4j
@Component
/*@RabbitListener(queues = "StorePayQueue")*/
public class CreateBrandStoreQueueListener {private static final String ACTION_ADD_STORE = "Pos_StoreAdd";private static final String ACTION_ADD_BRAND = "Pos_BrandAdd";private static final String POS_TOPIC_EXCHANGE = "posTopicExchange";private static final String STORE_ALLIANCE_QUEUE = "StoreAllianceQueue";private static final String STORE_PAY_QUEUE = "StorePayQueue";/*** 借用支付接口的BindingKey*/private static final String ALLIANCE_BINDING_KEY = "#.pay.#";@Value("${mq.create-brand-store.enabled}")private boolean enabled;@Resourceprivate StoreSaleResourceService storeSaleResourceService;@RabbitListener(bindings =@QueueBinding(value = @Queue(value = STORE_ALLIANCE_QUEUE, durable = "true"),exchange =@Exchange(value = POS_TOPIC_EXCHANGE,ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {ALLIANCE_BINDING_KEY}))@RabbitHandlerpublic void process(Message message, Channel channel) throws IOException {String content = new String(message.getBody());log.info("StoreAllianceQueue MQ String Process 收到授权MQ消息:" + content);goprocess(content, message, channel);}//    @RabbitListener(
//          bindings =
//          @QueueBinding(
//                  value = @Queue(value = STORE_ALLIANCE_QUEUE, durable = "true"),
//                  exchange =
//                  @Exchange(
//                          value = POS_TOPIC_EXCHANGE,
//                          ignoreDeclarationExceptions = "true",
//                          type = ExchangeTypes.TOPIC),
//                  key = {ALLIANCE_BINDING_KEY})
//                  ack
//  )
//  @RabbitHandler
//  public void process(Message message, Channel channel) throws IOException {//      String content = new String(message.getBody());
//      log.info("StoreAllianceQueue MQ String Process 收到授权MQ消息:" + content);
//      goprocess(content, message, channel);
//  }/*** @param content* @param message* @param channel*/public void goprocess(String content, Message message, Channel channel) {if (!enabled) {return;}try {CreateBrandStoreMsg createBrandStoreMsg = JSON.parseObject(content, CreateBrandStoreMsg.class);String action = createBrandStoreMsg.getAction();if (null != message && null != channel) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}CreateBrandStoreDataDTO data = createBrandStoreMsg.getData();if (null == data) {log.error("消息格式错误,无法获取object节点");return;}// 新增门店,所有的可授权模块自动授权5天试用时间if (action.equals(ACTION_ADD_STORE)) {storeSaleResourceService.autoGrantNewStore(data.getBrandId(), data.getStoreId(), data.getStoreName());} else if (action.equals(ACTION_ADD_BRAND)) {storeSaleResourceService.autoGrantNewBrand(data.getBrandId(), data.getBrandName());}} catch (Exception e) {e.printStackTrace();}}
}

延时队列

package com.downtown.service.mq;import com.downtown.log.OLog;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;/*** @author zcz* rabbitMq生产者*/
@Component
public class RabbitProduct {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate OLog logger;@Value("${time-zone.offset}")String timeZone;/*** 发送延时消息* @param params* @param delayMill*/public void sendDelayMessage(Object params, long delayMill) {//这里的消息可以是任意对象,无需额外配置,直接传即可this.logger.info("发送延时消息", params);this.rabbitTemplate.convertAndSend("deliveryDelayExchange","#.delivery.#",params,message -> {//注意这里时间可以使long,而且是设置headermessage.getMessageProperties().setHeader("x-delay", delayMill);return message;});}public void sendDelayMessage(Object params, LocalDateTime delayTime) {LocalDateTime nowTime = LocalDateTime.now(ZoneId.of(timeZone));Duration duration = Duration.between(nowTime, delayTime);long delayMill = duration.toMillis();this.sendDelayMessage(params, delayMill);}/**** @param params 参数* @param delayTime 延迟时间*/public void sendBatchCallMessage(Object params, long delayTime) {long delayDate = TimeUnit.MINUTES.toMillis(delayTime);this.sendDelayMessage(params, delayDate);}
}

消费

package com.downtown.service.mq;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.downtown.enums.TaskTypeEnum;
import com.downtown.log.OLog;
import com.downtown.service.service.OrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author zcz* 配送延时队列*/
@Component
public class DeliveryDelayQueue {@AutowiredOLog logger;@AutowiredOrderService orderService;@RabbitListener(queues = "DeliveryDelayQueue")@RabbitHandlerpublic void process(Message oMessage, Channel channel) throws IOException {String messageString = new String(oMessage.getBody());try {logger.info("配送延迟任务", messageString);JSONObject objContent = JSON.parseObject(messageString);String type = objContent.getString("type");String uniqueId = objContent.getString("uniqueId");if (TaskTypeEnum.RESERVE.getTaskType().equals(type)) {this.orderService.dealReserveDelayTask(new Integer(uniqueId));}//分批呼叫if (TaskTypeEnum.BATCH_CALL.getTaskType().equals(type)) {this.logger.info("mq分批呼叫延迟消费", objContent);this.orderService.batchCallOrderProcessingDelayTask(new Integer(uniqueId));}}catch(Exception err) {this.logger.info("订单配送单取消失败", err);}finally {channel.basicAck(oMessage.getMessageProperties().getDeliveryTag(), false);}}
}
配置
package com.downtown.service.mq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author zcz* RabbitMq配置*/
@Configuration
public class RabbitMqConfig {@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数return new CustomExchange("deliveryDelayExchange", "x-delayed-message", true, false, args);}@Beanpublic Queue delayQueue() {//属性参数 队列名称 是否持久化return new Queue("DeliveryDelayQueue", true);}@Beanpublic Binding cfgDelayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("#.delivery.#").noargs();}
}<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

rabbitmq 生产者和消费者相关推荐

  1. Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)

    1. 安装 rabbitmq 的 golang 包 golang 可使用库 github.com/streadway/amqp 操作 rabbitmq .使用下面命令安装 RabbitMQ . go ...

  2. RabbitMQ生产者和消费者Java实现

    添加Maven依赖: 使用rabbitmq-client的最新Maven坐标: <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp ...

  3. rabbitmq生产者和消费者

    如果你曾经在工作中使用过网络软件,脑海中应该会有客户端和服务器端的概念.不管是浏览器和Web服务器,还是应用程序和MySQL服务器,都是其中一方发送请求,而另一方服务这些请求.你可以将其视为快餐车模式 ...

  4. Spring Cloud Stream与RabbitMQ 生产者和消费者位于同一个应用服务

    第一种模型:交换机类型为topic,路由key为"#",这是简单的使用模型 当前Spring Cloud Rabbit的版本为2.1.2 <dependency>< ...

  5. mq多个消费者消费一个消息_消息中间件——RabbitMQ(五)快速入门生产者与消费者...

    求关注 快速入门生产者与消费者,SpringBoot整合RabbitMQ! 前言 本章我们来一次快速入门RabbitMQ--生产者与消费者.需要构建一个生产端与消费端的模型.什么意思呢?我们的生产者发 ...

  6. SpringBoot整合RabbitMQ(包含生产者和消费者)

    生产者 创建一个SpringBoot项目springboot-producer,作为RabbitMQ的生产者. 在pom文件中引入相关的依赖坐标 <dependency><group ...

  7. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  8. RabbitMQ消息队列生产者和消费者

    概述 生产者生产数据至 RabbitMQ 队列,消费者消费 RabbitMQ 队列里的数据. 详细 代码下载:http://www.demodashi.com/demo/10723.html 一.准备 ...

  9. Spring整合RabbitMQ(包含生产者和消费者)

    生产者 创建一个MAVEN项目spring-exchange-producer作为消息队列的生产者 导入相关的依赖坐标 <dependencies><!-- https://mvnr ...

最新文章

  1. 运算符的优先级和实际操作
  2. String/Stringbuilder/StringBuffer
  3. 专题突破一之分块——Untitled Problem II,Balanced Lineup,[ioi2009]Regions
  4. 发送广播_DHCP服务器什么时候发送?为什么request要广播发送?那还不看?
  5. asp.net中MaintainScrollPositionOnPostback属性的使用
  6. 模块概念与使用及注意事项
  7. Linux的实际操作:文件目录类的实用指令(帮助指令 man help)
  8. 最优化学习笔记(九)——基本的共轭方向算法
  9. key没有引号的字符串如何转json变为字典格式
  10. Web2.0创业者面临艰难选择:出售还是发展
  11. mysql清空数据库_mysql命令行快速清空数据库的方法
  12. 与VX msn 聊天记录
  13. 没有学过C语言可以学Java吗?
  14. Linux(CentOS 7)如何创建软件桌面启动图标
  15. 计算机ram和rom的特点的是,什么是ROM和RAM?它们各有什么特点?
  16. 亚马逊的规则你知道多少
  17. springboot社区快递代取服务系统毕业设计-附源码
  18. c语言关键词中英翻译机编程,C语言关键字中英翻译机.doc
  19. 工作中的完美主义 感悟_如何克服设计中的完美主义
  20. Ubuntu BBR加速

热门文章

  1. iframe 的使用理解
  2. Oracle优化 latch free问题Result Cache:RC Latch引起数据库缓慢
  3. 大头菜价格预测详解+模型
  4. Java代码加密混淆工具有哪些?
  5. http状态码-504
  6. Android模拟点击的四种方式
  7. 京东怎么做《IOS系统APP耗电量检测分析和优化》?
  8. 使用蓝桥杯单片机做一个智能密码锁可以修改密码
  9. 【小工具】极客时间GitChat专栏下载脚本
  10. Dijkstra算法(单源最短路径)