操作消息队列的另一种方法,SpringCloud是Spring的组件之一,官方定义Spring Cloud Stream,给微服务应用构建消息驱动能力的框架,下面我简称Stream,应用程序通过Input,或者Output,来与Stream交互,而Stream中的Binder与中间件交互,Binder是Stream的一个抽象概念,是应用与消息中间件之间,联合记,使用Stream最大的方便之处,对于消息中间件的进一步封装,可以做到代码方法的无感知,并且动态的切换成连接,切换Topic,当然他也有局限,Stream支持的只有两种,一种是RabbitMQ,另外一种是Kafka,现在我们就来实际使用一下,找一下感觉

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>order</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>cn.learn</groupId><artifactId>microcloud02</artifactId><version>0.0.1</version></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies><!-- 这个插件,可以将应用打包成一个可执行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
server.port=8010eureka.client.serviceUrl.defaultZone=http://admin:1234@10.40.8.152:8761/eurekaspring.application.name=order
eureka.instance.prefer-ip-address=true
eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ipAddress}:${spring.application.instance_id:${server.port}}spring.rabbitmq.host=59.110.138.145
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672spring.cloud.stream.bindings.input.group=order
package com.learn.message;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface StreamClient {// 报错:Invalid bean definition with name 'myMessageOrdersssss' // defined in com.imooc.order.message.StreamClient: bean definition with this name already exists//解决方法:@Input和@Output不可一样,同一服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道String INPUT = "input";String OUTPUT = "output";@Input(StreamClient.INPUT)SubscribableChannel input();@Output(StreamClient.OUTPUT)MessageChannel output();//    String INPUT2 = "input2";
//    String OUTPUT2 = "output2";
//    @Input(StreamClient.INPUT2)
//    SubscribableChannel input2();
//    @Output(StreamClient.OUTPUT2)
//    MessageChannel output2();
}
package com.learn.message;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {private  final Logger log = LoggerFactory.getLogger(StreamReceiver.class);@StreamListener(StreamClient.INPUT)public void process(Object message){System.out.println(message);log.info("StreamReceiver:{}",message);}/*** 接收orderDTO对象消息* */
//    @StreamListener(value = StreamClient.INPUT)
//    @SendTo(StreamClient.INPUT2) //消息发送给谁
//    public String process(OrderDTO message){
//        log.info("StreamReceiver:{}",message);
//        return"收到了";
//    }/*** 接收到消息之后返回的消息* */
//    @StreamListener(value = StreamClient.INPUT2)
//    public void process2(OrderDTO message){
//        log.info("StreamReceiver2:{}",message);
//    }
}
package com.learn.controller;import java.util.Date;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import com.learn.message.StreamClient;@RestController
public class SendMessageController {@Autowiredprivate StreamClient streamClient;@GetMapping("/sendMessage")public void process(){String message = "now "+new Date();streamClient.input().send(MessageBuilder.withPayload(message).build());
//        streamClient.output().send(MessageBuilder.withPayload(message).build());}//    @GetMapping("/sendMessage")
//    public void process(){
//        OrderDTO orderDTO = new OrderDTO();
//        orderDTO.setOrderId("123456");
//        streamClient.input().send(MessageBuilder.withPayload(orderDTO).build());
//    }
}
spring-cloud-starter-stream-rabbit<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>2.2.1.RELEASE</version>
</dependency>KAFKA你就改成KAFKA,第二步,这里用的是RabbitMQ,所以你要在配置文件里面配置一下,但是我们上节课已经配置过了,不需要再配置,接下来我们就来看如何来使用Stream,来发送和接收消息,一般我们会这么来做,StreamClientlocalhost:8010/sendMessage我们已经完成发送和接收,功能上没有多大的问题spring.cloud.stream.bindings.myMessage.group=orderspring.cloud.stream.bindings.input.group=order
package com.learn.dto;import java.math.BigDecimal;
import java.util.List;import com.learn.dataobject.OrderDetail;public class OrderDTO {/** 订单id. */private String orderId;/** 买家名字. */private String buyerName;/** 买家手机号. */private String buyerPhone;/** 买家地址. */private String buyerAddress;/** 买家微信Openid. */private String buyerOpenid;/** 订单总金额. */private BigDecimal orderAmount;/** 订单状态, 默认为0新下单. */private Integer orderStatus;/** 支付状态, 默认为0未支付. */private Integer payStatus;private List<OrderDetail> orderDetailList;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getBuyerName() {return buyerName;}public void setBuyerName(String buyerName) {this.buyerName = buyerName;}public String getBuyerPhone() {return buyerPhone;}public void setBuyerPhone(String buyerPhone) {this.buyerPhone = buyerPhone;}public String getBuyerAddress() {return buyerAddress;}public void setBuyerAddress(String buyerAddress) {this.buyerAddress = buyerAddress;}public String getBuyerOpenid() {return buyerOpenid;}public void setBuyerOpenid(String buyerOpenid) {this.buyerOpenid = buyerOpenid;}public BigDecimal getOrderAmount() {return orderAmount;}public void setOrderAmount(BigDecimal orderAmount) {this.orderAmount = orderAmount;}public Integer getOrderStatus() {return orderStatus;}public void setOrderStatus(Integer orderStatus) {this.orderStatus = orderStatus;}public Integer getPayStatus() {return payStatus;}public void setPayStatus(Integer payStatus) {this.payStatus = payStatus;}public List<OrderDetail> getOrderDetailList() {return orderDetailList;}public void setOrderDetailList(List<OrderDetail> orderDetailList) {this.orderDetailList = orderDetailList;}@Overridepublic String toString() {return "OrderDTO [orderId=" + orderId + ", buyerName=" + buyerName + ", buyerPhone=" + buyerPhone+ ", buyerAddress=" + buyerAddress + ", buyerOpenid=" + buyerOpenid + ", orderAmount=" + orderAmount+ ", orderStatus=" + orderStatus + ", payStatus=" + payStatus + ", orderDetailList=" + orderDetailList+ "]";}}
package com.learn.dataobject;import java.math.BigDecimal;public class OrderDetail {private String detailId;/** 订单id. */private String orderId;/** 商品id. */private String productId;/** 商品名称. */private String productName;/** 商品单价. */private BigDecimal productPrice;/** 商品数量. */private Integer productQuantity;/** 商品小图. */private String productIcon;public String getDetailId() {return detailId;}public void setDetailId(String detailId) {this.detailId = detailId;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getProductId() {return productId;}public void setProductId(String productId) {this.productId = productId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public BigDecimal getProductPrice() {return productPrice;}public void setProductPrice(BigDecimal productPrice) {this.productPrice = productPrice;}public Integer getProductQuantity() {return productQuantity;}public void setProductQuantity(Integer productQuantity) {this.productQuantity = productQuantity;}public String getProductIcon() {return productIcon;}public void setProductIcon(String productIcon) {this.productIcon = productIcon;}@Overridepublic String toString() {return "OrderDetail [detailId=" + detailId + ", orderId=" + orderId + ", productId=" + productId+ ", productName=" + productName + ", productPrice=" + productPrice + ", productQuantity="+ productQuantity + ", productIcon=" + productIcon + "]";}}

Spring Cloud Stream的使用(上)相关推荐

  1. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  2. 一文了解Spring Cloud Stream体系

    点击蓝色"程序猿DD"关注我哟 加个"星标",不忘签到哦 来源:阿里巴巴中间件 Spring Cloud Stream 在 Spring Cloud 体系内用于 ...

  3. Spring Cloud Stream 学习小清单

    由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...

  4. Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

    应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...

  5. Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

    应用场景  前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...

  6. Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

    应用场景  上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...

  7. Spring Cloud Stream消费失败后的处理策略(一):自动重试

    之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如: 如何处理消息重复消费? 如何消费自己生产的消息? 下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式. 不 ...

  8. Spring Cloud Stream如何消费自己生产的消息?

    在上一篇<Spring Cloud Stream如何处理消息重复消费?>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的 ...

  9. Spring Cloud Stream 体系及原理介绍

    https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于 ...

最新文章

  1. 正则表达式匹配多个字符(*、+、?、{m}、{m,n})
  2. 如何设置VSS源代码管理工具使用KDiff3
  3. gunzip 和 unzip 解压文件到指定的目录
  4. Android规范文档
  5. 深度学习100例-卷积神经网络(CNN)3D医疗影像识别 | 第23天
  6. 斯坦福机器学习公开课学习笔记(1)—机器学习的动机与应用
  7. 初步认识Volatile-缓存一致性协议
  8. android 行布局选择器,『自定义View实战』—— 银行种类选择器
  9. 滑动轨迹 曲线 python_python – 计算轨迹(路径)中的转折点/枢轴点
  10. 反转链表python
  11. 工龄是怎么计算的?几个月算工龄吗?
  12. C++算法学习(力扣:1254. 统计封闭岛屿的数目)
  13. 一个将汉字转换成拼音的npm包
  14. 模数转换器ADC的常用术语和主要技术指标(一)
  15. 基于华为云ModelArts(实现垃圾分类识别)
  16. 基于MATLAB的一维条码二维码识别
  17. Android--- Drawer and Tab Navigation with ViewPager
  18. 基于微信小程序的学习记录与提醒应用设计与实现-计算机毕业设计源码+LW文档
  19. DIY自己的超级PE
  20. 数字集成电路版图设计(附录)——持续补充...

热门文章

  1. hdu2489 Minimal Ratio Tree
  2. 自由鸟书评排行网开始上线试运行,欢迎前往评论!
  3. MySql中的varchar类型
  4. JavaScript jQuery获取radio/下拉框的选中值
  5. 用java分组查elasticsearch
  6. weh shell高大上?一文教你实现
  7. [ZOJ 4024] Peak
  8. Dart 基礎 - 4
  9. JAVASCRIPT实现绚丽TAB选项卡
  10. Method Tracking