Spring Cloud Stream

  • 使用场景
    消息驱动的微服务应用
  • 目的
    简化代码

统一抽象

  • 主要概念
    1.应用模型

2.Binder抽象
3.持久化 发布/订阅支持
4.消费分组支持
5.分区支持

  • 基本概念
    Source:Stream发送源

近义词:Producer、Publisher
Sink:Stream接收器
近义词:Consumer、Subscriber
Processor

  • 编程模型
    激活:- @EnableBinding

    • @Configuration
    • @EnableIntegration
      Source

@Output
MessgeChannel

  • 编程模型
    Sink

    • @Input
      SubschribableChannel

@ServiceActivator
@StreamListener

Spring Cloud 消息驱动整合

整合kafka
改造 user-service-client 消息发送源(Kafka 原生 API)
User 模型实现序列化接口

package com.segumentfault.spring.cloud.lesson12.domain;import java.io.Serializable;/*** 用户模型**/
public class User implements Serializable {private static final long serialVersionUID = -5688097732613347904L;/*** ID*/private Long id;/*** 用户名称*/private String name;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}

增加kafka依赖

 <!-- 整合 Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

利用kafkaTemplate实现消息发送

package com.segumentfault.spring.cloud.lesson12.user.service.client.web.controller;import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import java.util.List;/*** {@link UserService} 客户端 {@link RestController}* <p>* 注意:官方建议 客户端和服务端不要同时实现 Feign 接口* 这里的代码只是一个说明,实际情况最好使用组合的方式,而不是继承**/
@RestController
public class UserServiceClientController implements UserService {@Autowiredprivate UserService userService;private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic UserServiceClientController(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@PostMapping("/user/save/message")public boolean saveUserByMessage(@RequestBody User user) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("sf-users", 0, user);return future.isDone();}// 通过方法继承,URL 映射 :"/user/save"@Overridepublic boolean saveUser(@RequestBody User user) {return userService.saveUser(user);}// 通过方法继承,URL 映射 :"/user/find/all"@Overridepublic List<User> findAll() {return userService.findAll();}}

实现kafka序列化器:Java序列化协议

package com.segumentfault.spring.cloud.lesson12.user.service.client.serializer;import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;/*** Java 序列化协议**/
public class ObjectSerializer implements Serializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic byte[] serialize(String topic, Object object) {System.out.println("topic : " + topic + " , object : " + object);byte[] dataArray = null;ByteArrayOutputStream outputStream = new ByteArrayOutputStream();try {ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);objectOutputStream.writeObject(object);dataArray = outputStream.toByteArray();} catch (Exception e) {throw new RuntimeException(e);}return dataArray;}@Overridepublic void close() {}
}

Spring Cloud Stream整合

改造 user-service-provider 消息接收器(Sink)

引入 spring-cloud-stream-binder-kafka

 <!-- 依赖 Spring Cloud Stream Binder Kafka --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency>

用户消息Stream接口定义

package com.segumentfault.spring.cloud.lesson12.user.service;import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.stream.annotation.EnableBinding;/*** 引导类**/
@SpringBootApplication
@EnableHystrix
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class) // 激活 Stream Binding 到 UserMessage
public class UserServiceProviderApplication {public static void main(String[] args) {SpringApplication.run(UserServiceProviderApplication.class, args);}
}

配置kafka以及Stream Destination

## Spring Cloud Stream Binding 配置
### destination 指定 Kafka Topic
### userMessage 为输入管道名称
spring.cloud.stream.bindings.user-message.destination = sf-users## Kafka 生产者配置spring.kafka.BOOTSTRAP-SERVERS=localhost:9092
spring.kafka.consumer.group-id=sf-group
spring.kafka.consumer.clientId=user-service-provider

添加 User 消息监听器
SubscribableChannel 实现

package com.segumentfault.spring.cloud.lesson12.user.service.provider.service;import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;/*** 用户 消息服务**/
@Service
public class UserMessageService {@Autowiredprivate UserMessage userMessage;@Autowired@Qualifier("inMemoryUserService")private UserService userService;private void saveUser(byte[] data) {// message body 是字节流 byte[]ByteArrayInputStream inputStream = new ByteArrayInputStream(data);try {ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);User user = (User) objectInputStream.readObject(); // 反序列化成 User 对象userService.saveUser(user);} catch (Exception e) {throw new RuntimeException(e);}}@PostConstructpublic void init() {SubscribableChannel subscribableChannel = userMessage.input();subscribableChannel.subscribe(message -> {System.out.println("Subscribe by SubscribableChannel");// message body 是字节流 byte[]byte[] body = (byte[]) message.getPayload();saveUser(body);});}}

ServiceActivator 实现

   @ServiceActivator(inputChannel = INPUT)public void listen(byte[] data) {System.out.println("Subscribe by @ServiceActivator");saveUser(data);}

@StreamListener 实现

 @StreamListener(INPUT)public void onMessage(byte[] data) {System.out.println("Subscribe by @StreamListener");saveUser(data);}

改造 user-service-client 消息发送源( Stream Binder : Rabbit MQ)
增加 spring-cloud-stream-binder-rabbitmq 依赖

<!-- 整合 Spring Cloud Stream Binder Rabbit MQ --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>

配置发送源管道
添加用户消息接口

package com.segumentfault.spring.cloud.lesson12.user.service.client.stream;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;/*** 用户消息(输出)**/
public interface UserMessage {@Output("user-message-out")MessageChannel output();}

激活用户消息接口

package com.segumentfault.spring.cloud.lesson12.user.service.client;import com.netflix.loadbalancer.IRule;
import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.user.service.client.rule.MyRule;
import com.segumentfault.spring.cloud.lesson12.user.service.client.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;/*** 引导类**/
@SpringBootApplication
@RibbonClient("user-service-provider") // 指定目标应用名称
@EnableCircuitBreaker // 使用服务短路
@EnableFeignClients(clients = UserService.class) // 申明 UserService 接口作为 Feign Client 调用
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class)
public class UserServiceClientApplication {public static void main(String[] args) {SpringApplication.run(UserServiceClientApplication.class, args);}/*** 将 {@link MyRule} 暴露成 {@link Bean}** @return {@link MyRule}*/@Beanpublic IRule myRule() {return new MyRule();}/*** 申明 具有负载均衡能力 {@link RestTemplate}** @return*/@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}

实现消息发送到 RabbitMQ

@PostMapping("/user/save/message/rabbit")public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {MessageChannel messageChannel = userMessage.output();// User 序列化成 JSONString payload = objectMapper.writeValueAsString(user);GenericMessage<String> message = new GenericMessage<String>(payload);// 发送消息return messageChannel.send(message);}

启动 Rabbit MQ
改造 user-service-provider 消息接收器( Stream Binder : Rabbit MQ)
替换依赖

<!--&lt;!&ndash; 依赖 Spring Cloud Stream Binder Kafka &ndash;&gt;--><!--<dependency>--><!--<groupId>org.springframework.cloud</groupId>--><!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>--><!--</dependency>--><!-- 整合 Spring Cloud Stream Binder Rabbit MQ --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>

Spring Cloud消息驱动整合 1相关推荐

  1. Spring Cloud Data Flow整合UAA使用外置数据库和API接口

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 之前的文章<Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限 ...

  2. Spring Cloud Data Flow整合UAA之使用LDAP进行账号管理

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 Spring Cloud Data Flow整合UAA的文章已经写了两篇,之前的方案是把用户信息保存在数据库 ...

  3. spring cloud+zookeeper+feign整合 简单实例(一)

    一.前言 各位热爱知识的小伙伴们大家好呀!很高兴大家能点开这个博客,这是我个人的第一篇博客,之后也会持续的更新java以及spring项目的相关代码,希望大家持续关注.如果对本篇博客有什么不懂的地方或 ...

  4. spring cloud笔记(1) 整合消息中间件ONS(商业版rocketmq)

    之前一直用spring boot+dubbo+zookeeper来搭建分布式项目,后来在网上看到有人说用spring cloud更好,前两天抽时间了解了一下,spring cloud 为我们提供了许多 ...

  5. Spring Cloud_27_消息驱动/RabbitMQ的使用

    转载自 版权声明:本文为博主原创文章,艾特米工作室,欢迎转载 https://blog.csdn.net/zhaozao5757/article/details/79693670 消息驱动/Rabbi ...

  6. Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限控制

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 关于Spring Cloud Data Flow这里不多介绍,有兴趣可以看下面的文章.本文主要介绍如何整合D ...

  7. Spring Cloud 入门 ---- Security 整合 Oauth2 认证授权【随笔】

    Spring Cloud Security Oauth2 文档参考:https://docs.spring.io/spring-security/site/docs/5.4.1/reference/h ...

  8. Spring Cloud Gateway 网关整合 Knife4j

    文章目录 1:环境准备 2:gateway服务设置 1:导包 2:yml配置 3:添加配置类,从网关服务中获取服务列表 4:重写并覆盖/swagger-resources接口 3:其他业务逻辑服务设置 ...

  9. 回顾:oauth2和 spring cloud alibaba gateway 整合

    一.oauth2认证中心:登录用户进行认证,生成token, 同时定义受保护的api服务 (一)oauth2的四种认证模式:授权码模式,简化模式,密码模式,客户端模式.其中授权码模式和密码模式用的最多 ...

最新文章

  1. ads1115寄存器操作
  2. 变量的存储类别以及作用空间
  3. HTML+CSS+JS实现 ❤️520爱心背景表白网页动画特效❤️
  4. flyme7 android彩蛋,魅族 15 系列开启预约,Flyme7 或是发布会彩蛋
  5. linux邮件加密码,linux系统的邮件服务器的加密与验证
  6. Gogs代码托管系统安装配置手册
  7. 设计模式之依赖倒置原则
  8. C++实现基于mfc的仓库管理系统(可连MySQL数据库)
  9. linux 远程安装teamviewer,如何在CentOS 7上安装TeamViewer
  10. 用Python看穿马赛克!
  11. 实验二 译码器及其应用
  12. 花生壳内网穿透操作文档
  13. 阿里云开启ipv6通过appstore审核的方法
  14. leet198.打家劫舍
  15. C++的future,promise,packaged_task
  16. 有了这些网站,让你有看不完的好电影!
  17. 前端面试题汇总(含答案)(HTML+CSS篇)
  18. 好用的在线奖状生成工具推荐!
  19. 山东大学计算机专业知名教授,胡思煌 山东大学主页平台管理系统--胡思煌
  20. 世界geojson_开源项目使用 Stata 绘制全世界所有国家以及部分国家的区划地图代码和数据...

热门文章

  1. ECMWF时间序列处理
  2. 解决maven项目install之后报出Perhaps you are running on a JRE rather than a JDK?错误
  3. 5.7之前的MySQL版本不完全支持两阶段提交
  4. 论看过的书对我产生了什么影响?
  5. 杰里之ANC版级配置【篇】
  6. Eclipse 3.4 Ganymede 新增的插件安装方式
  7. 右键压缩包,没有WinRAR解压选项,怎么办?
  8. 万豪发布后疫情时代餐饮业十大新兴趋势;凯悦旗下中高端酒店品牌逸扉在上海亮相 | 美通企业日报...
  9. SCRM领域真能长出To B的BAT吗?
  10. 鸿蒙 harmonyOS 使用Text文字超长时使用省略号结尾