Spring Cloud Stream

  • 使用场景
    消息驱动的微服务应用
  • 目的
    简化代码

统一抽象

  • 主要概念
    1.应用模型

2.Binder抽象
3.持久化 发布/订阅支持
4.消费分组支持
5.分区支持

  • 基本概念
    Source:Stream发送源

近义词:Producer、Publisher
Sink:Stream接收器
近义词:Consumer、Subscriber
Processor

  • 编程模型
    激活:- @EnableBinding

    • @Configuration
    • @EnableIntegration
      Source

@Output
MessgeChannel

  • 编程模型
    Sink

    • @Input
      SubschribableChannel

@ServiceActivator
@StreamListener

Spring Cloud 消息驱动整合

整合kafka
改造 user-service-client 消息发送源(Kafka 原生 API)
User 模型实现序列化接口

package com.segumentfault.spring.cloud.lesson12.domain;import java.io.Serializable;/*** 用户模型**/
public class User implements Serializable {private static final long serialVersionUID = -5688097732613347904L;/*** ID*/private Long id;/*** 用户名称*/private String name;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}

增加kafka依赖

 <!-- 整合 Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

利用kafkaTemplate实现消息发送

package com.segumentfault.spring.cloud.lesson12.user.service.client.web.controller;import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import java.util.List;/*** {@link UserService} 客户端 {@link RestController}* <p>* 注意:官方建议 客户端和服务端不要同时实现 Feign 接口* 这里的代码只是一个说明,实际情况最好使用组合的方式,而不是继承**/
@RestController
public class UserServiceClientController implements UserService {@Autowiredprivate UserService userService;private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic UserServiceClientController(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@PostMapping("/user/save/message")public boolean saveUserByMessage(@RequestBody User user) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("sf-users", 0, user);return future.isDone();}// 通过方法继承,URL 映射 :"/user/save"@Overridepublic boolean saveUser(@RequestBody User user) {return userService.saveUser(user);}// 通过方法继承,URL 映射 :"/user/find/all"@Overridepublic List<User> findAll() {return userService.findAll();}}

实现kafka序列化器:Java序列化协议

package com.segumentfault.spring.cloud.lesson12.user.service.client.serializer;import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;/*** Java 序列化协议**/
public class ObjectSerializer implements Serializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic byte[] serialize(String topic, Object object) {System.out.println("topic : " + topic + " , object : " + object);byte[] dataArray = null;ByteArrayOutputStream outputStream = new ByteArrayOutputStream();try {ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);objectOutputStream.writeObject(object);dataArray = outputStream.toByteArray();} catch (Exception e) {throw new RuntimeException(e);}return dataArray;}@Overridepublic void close() {}
}

Spring Cloud Stream整合

改造 user-service-provider 消息接收器(Sink)

引入 spring-cloud-stream-binder-kafka

 <!-- 依赖 Spring Cloud Stream Binder Kafka --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency>

用户消息Stream接口定义

package com.segumentfault.spring.cloud.lesson12.user.service;import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.stream.annotation.EnableBinding;/*** 引导类**/
@SpringBootApplication
@EnableHystrix
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class) // 激活 Stream Binding 到 UserMessage
public class UserServiceProviderApplication {public static void main(String[] args) {SpringApplication.run(UserServiceProviderApplication.class, args);}
}

配置kafka以及Stream Destination

## Spring Cloud Stream Binding 配置
### destination 指定 Kafka Topic
### userMessage 为输入管道名称
spring.cloud.stream.bindings.user-message.destination = sf-users## Kafka 生产者配置spring.kafka.BOOTSTRAP-SERVERS=localhost:9092
spring.kafka.consumer.group-id=sf-group
spring.kafka.consumer.clientId=user-service-provider

添加 User 消息监听器
SubscribableChannel 实现

package com.segumentfault.spring.cloud.lesson12.user.service.provider.service;import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;/*** 用户 消息服务**/
@Service
public class UserMessageService {@Autowiredprivate UserMessage userMessage;@Autowired@Qualifier("inMemoryUserService")private UserService userService;private void saveUser(byte[] data) {// message body 是字节流 byte[]ByteArrayInputStream inputStream = new ByteArrayInputStream(data);try {ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);User user = (User) objectInputStream.readObject(); // 反序列化成 User 对象userService.saveUser(user);} catch (Exception e) {throw new RuntimeException(e);}}@PostConstructpublic void init() {SubscribableChannel subscribableChannel = userMessage.input();subscribableChannel.subscribe(message -> {System.out.println("Subscribe by SubscribableChannel");// message body 是字节流 byte[]byte[] body = (byte[]) message.getPayload();saveUser(body);});}}

ServiceActivator 实现

   @ServiceActivator(inputChannel = INPUT)public void listen(byte[] data) {System.out.println("Subscribe by @ServiceActivator");saveUser(data);}

@StreamListener 实现

 @StreamListener(INPUT)public void onMessage(byte[] data) {System.out.println("Subscribe by @StreamListener");saveUser(data);}

改造 user-service-client 消息发送源( Stream Binder : Rabbit MQ)
增加 spring-cloud-stream-binder-rabbitmq 依赖

<!-- 整合 Spring Cloud Stream Binder Rabbit MQ --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>

配置发送源管道
添加用户消息接口

package com.segumentfault.spring.cloud.lesson12.user.service.client.stream;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;/*** 用户消息(输出)**/
public interface UserMessage {@Output("user-message-out")MessageChannel output();}

激活用户消息接口

package com.segumentfault.spring.cloud.lesson12.user.service.client;import com.netflix.loadbalancer.IRule;
import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.user.service.client.rule.MyRule;
import com.segumentfault.spring.cloud.lesson12.user.service.client.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;/*** 引导类**/
@SpringBootApplication
@RibbonClient("user-service-provider") // 指定目标应用名称
@EnableCircuitBreaker // 使用服务短路
@EnableFeignClients(clients = UserService.class) // 申明 UserService 接口作为 Feign Client 调用
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class)
public class UserServiceClientApplication {public static void main(String[] args) {SpringApplication.run(UserServiceClientApplication.class, args);}/*** 将 {@link MyRule} 暴露成 {@link Bean}** @return {@link MyRule}*/@Beanpublic IRule myRule() {return new MyRule();}/*** 申明 具有负载均衡能力 {@link RestTemplate}** @return*/@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}

实现消息发送到 RabbitMQ

@PostMapping("/user/save/message/rabbit")public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {MessageChannel messageChannel = userMessage.output();// User 序列化成 JSONString payload = objectMapper.writeValueAsString(user);GenericMessage<String> message = new GenericMessage<String>(payload);// 发送消息return messageChannel.send(message);}

启动 Rabbit MQ
改造 user-service-provider 消息接收器( Stream Binder : Rabbit MQ)
替换依赖

<!--&lt;!&ndash; 依赖 Spring Cloud Stream Binder Kafka &ndash;&gt;--><!--<dependency>--><!--<groupId>org.springframework.cloud</groupId>--><!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>--><!--</dependency>--><!-- 整合 Spring Cloud Stream Binder Rabbit MQ --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>

Spring Cloud消息驱动整合相关推荐

  1. Spring Cloud消息驱动整合 1

    Spring Cloud Stream 使用场景 消息驱动的微服务应用 目的 简化代码 统一抽象 主要概念 1.应用模型 2.Binder抽象 3.持久化 发布/订阅支持 4.消费分组支持 5.分区支 ...

  2. Spring Cloud Data Flow整合UAA使用外置数据库和API接口

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 之前的文章<Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限 ...

  3. Spring Cloud Data Flow整合UAA之使用LDAP进行账号管理

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 Spring Cloud Data Flow整合UAA的文章已经写了两篇,之前的方案是把用户信息保存在数据库 ...

  4. spring cloud+zookeeper+feign整合 简单实例(一)

    一.前言 各位热爱知识的小伙伴们大家好呀!很高兴大家能点开这个博客,这是我个人的第一篇博客,之后也会持续的更新java以及spring项目的相关代码,希望大家持续关注.如果对本篇博客有什么不懂的地方或 ...

  5. spring cloud笔记(1) 整合消息中间件ONS(商业版rocketmq)

    之前一直用spring boot+dubbo+zookeeper来搭建分布式项目,后来在网上看到有人说用spring cloud更好,前两天抽时间了解了一下,spring cloud 为我们提供了许多 ...

  6. Spring Cloud_27_消息驱动/RabbitMQ的使用

    转载自 版权声明:本文为博主原创文章,艾特米工作室,欢迎转载 https://blog.csdn.net/zhaozao5757/article/details/79693670 消息驱动/Rabbi ...

  7. Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限控制

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 关于Spring Cloud Data Flow这里不多介绍,有兴趣可以看下面的文章.本文主要介绍如何整合D ...

  8. Spring Cloud 入门 ---- Security 整合 Oauth2 认证授权【随笔】

    Spring Cloud Security Oauth2 文档参考:https://docs.spring.io/spring-security/site/docs/5.4.1/reference/h ...

  9. Spring Cloud Gateway 网关整合 Knife4j

    文章目录 1:环境准备 2:gateway服务设置 1:导包 2:yml配置 3:添加配置类,从网关服务中获取服务列表 4:重写并覆盖/swagger-resources接口 3:其他业务逻辑服务设置 ...

  10. 回顾:oauth2和 spring cloud alibaba gateway 整合

    一.oauth2认证中心:登录用户进行认证,生成token, 同时定义受保护的api服务 (一)oauth2的四种认证模式:授权码模式,简化模式,密码模式,客户端模式.其中授权码模式和密码模式用的最多 ...

最新文章

  1. ppt文本框显示缺字,信息显示不全
  2. 【建站系列教程】6、.htaccess文件的url重写规则-网页伪静态化
  3. PPT讲解机器人产业发展现状与未来展望,重磅资料
  4. editplus配置 linux shell 语法高亮 自动补全
  5. [COCI 2017-2018-2]-San
  6. 基于Matlab的跨孔电磁波\跨孔雷达的胖射线追踪(一)
  7. php商城关键字拆词搜索,做搜索查询的时候 怎么拆分词?
  8. Linux2.6内核本地提权
  9. 手机各种JS语法,随时更新
  10. python在自动化中的应用_python中在自动化运维的应用
  11. 《算法导论》.pdf
  12. 基于FPGA的四轴运动控制IC设计(verilog)
  13. kpi绩效考核流程图_XX公司KPI绩效考核案例.doc
  14. PAT a1138
  15. 入门力扣自学笔记118 C++ (题目编号1413)
  16. 纯css svg 改变图片颜色 ios android 小程序
  17. 函数逼近和曲线拟合、插值
  18. mc服务器常用指令_掌控MC服务器 我的世界服务器指令大全
  19. 动态设置html根字体大小(随着设备屏幕的大小而变化,从而实现响应式)
  20. 我的Fitbit Force手环使用体验

热门文章

  1. Apache Flink ML 2.0.0 发布公告
  2. Flink 1.10 SQL、HiveCatalog 与事件时间整合示例
  3. android动画之从源码角度分析动画原理
  4. 同事乱用 Redis 卡爆,我真是醉了
  5. 一条 update 语句引起的事故,这回可以长长记性了
  6. findViewById中NullPointerException的错误
  7. python基础知识——函数(中)
  8. mysql工厂模式_工厂模式IDAL详解
  9. datastage连接mysql库_Datastage 8.5 连接远程Oracle 数据库
  10. docker server 容器连接sql_借力 Docker ,三分钟搞定 MySQL 主从复制!