Spring Cloud消息驱动整合
Spring Cloud Stream
- 使用场景
消息驱动的微服务应用 - 目的
简化代码
统一抽象
- 主要概念
1.应用模型
2.Binder抽象
3.持久化 发布/订阅支持
4.消费分组支持
5.分区支持
- 基本概念
Source:Stream发送源
近义词:Producer、Publisher
Sink:Stream接收器
近义词:Consumer、Subscriber
Processor
编程模型
激活:- @EnableBinding- @Configuration
- @EnableIntegration
Source
@Output
MessgeChannel
编程模型
Sink- @Input
SubschribableChannel
- @Input
@ServiceActivator
@StreamListener
Spring Cloud 消息驱动整合
整合kafka
改造 user-service-client 消息发送源(Kafka 原生 API)
User 模型实现序列化接口
package com.segumentfault.spring.cloud.lesson12.domain;import java.io.Serializable;/*** 用户模型**/
public class User implements Serializable {private static final long serialVersionUID = -5688097732613347904L;/*** ID*/private Long id;/*** 用户名称*/private String name;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}
增加kafka依赖
<!-- 整合 Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
利用kafkaTemplate实现消息发送
package com.segumentfault.spring.cloud.lesson12.user.service.client.web.controller;import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;import java.util.List;/*** {@link UserService} 客户端 {@link RestController}* <p>* 注意:官方建议 客户端和服务端不要同时实现 Feign 接口* 这里的代码只是一个说明,实际情况最好使用组合的方式,而不是继承**/
@RestController
public class UserServiceClientController implements UserService {@Autowiredprivate UserService userService;private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic UserServiceClientController(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@PostMapping("/user/save/message")public boolean saveUserByMessage(@RequestBody User user) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("sf-users", 0, user);return future.isDone();}// 通过方法继承,URL 映射 :"/user/save"@Overridepublic boolean saveUser(@RequestBody User user) {return userService.saveUser(user);}// 通过方法继承,URL 映射 :"/user/find/all"@Overridepublic List<User> findAll() {return userService.findAll();}}
实现kafka序列化器:Java序列化协议
package com.segumentfault.spring.cloud.lesson12.user.service.client.serializer;import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;/*** Java 序列化协议**/
public class ObjectSerializer implements Serializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic byte[] serialize(String topic, Object object) {System.out.println("topic : " + topic + " , object : " + object);byte[] dataArray = null;ByteArrayOutputStream outputStream = new ByteArrayOutputStream();try {ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);objectOutputStream.writeObject(object);dataArray = outputStream.toByteArray();} catch (Exception e) {throw new RuntimeException(e);}return dataArray;}@Overridepublic void close() {}
}
Spring Cloud Stream整合
改造 user-service-provider 消息接收器(Sink)
引入 spring-cloud-stream-binder-kafka
<!-- 依赖 Spring Cloud Stream Binder Kafka --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency>
用户消息Stream接口定义
package com.segumentfault.spring.cloud.lesson12.user.service;import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.stream.annotation.EnableBinding;/*** 引导类**/
@SpringBootApplication
@EnableHystrix
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class) // 激活 Stream Binding 到 UserMessage
public class UserServiceProviderApplication {public static void main(String[] args) {SpringApplication.run(UserServiceProviderApplication.class, args);}
}
配置kafka以及Stream Destination
## Spring Cloud Stream Binding 配置
### destination 指定 Kafka Topic
### userMessage 为输入管道名称
spring.cloud.stream.bindings.user-message.destination = sf-users## Kafka 生产者配置spring.kafka.BOOTSTRAP-SERVERS=localhost:9092
spring.kafka.consumer.group-id=sf-group
spring.kafka.consumer.clientId=user-service-provider
添加 User 消息监听器
SubscribableChannel 实现
package com.segumentfault.spring.cloud.lesson12.user.service.provider.service;import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;/*** 用户 消息服务**/
@Service
public class UserMessageService {@Autowiredprivate UserMessage userMessage;@Autowired@Qualifier("inMemoryUserService")private UserService userService;private void saveUser(byte[] data) {// message body 是字节流 byte[]ByteArrayInputStream inputStream = new ByteArrayInputStream(data);try {ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);User user = (User) objectInputStream.readObject(); // 反序列化成 User 对象userService.saveUser(user);} catch (Exception e) {throw new RuntimeException(e);}}@PostConstructpublic void init() {SubscribableChannel subscribableChannel = userMessage.input();subscribableChannel.subscribe(message -> {System.out.println("Subscribe by SubscribableChannel");// message body 是字节流 byte[]byte[] body = (byte[]) message.getPayload();saveUser(body);});}}
ServiceActivator 实现
@ServiceActivator(inputChannel = INPUT)public void listen(byte[] data) {System.out.println("Subscribe by @ServiceActivator");saveUser(data);}
@StreamListener 实现
@StreamListener(INPUT)public void onMessage(byte[] data) {System.out.println("Subscribe by @StreamListener");saveUser(data);}
改造 user-service-client 消息发送源( Stream Binder : Rabbit MQ)
增加 spring-cloud-stream-binder-rabbitmq 依赖
<!-- 整合 Spring Cloud Stream Binder Rabbit MQ --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>
配置发送源管道
添加用户消息接口
package com.segumentfault.spring.cloud.lesson12.user.service.client.stream;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;/*** 用户消息(输出)**/
public interface UserMessage {@Output("user-message-out")MessageChannel output();}
激活用户消息接口
package com.segumentfault.spring.cloud.lesson12.user.service.client;import com.netflix.loadbalancer.IRule;
import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.user.service.client.rule.MyRule;
import com.segumentfault.spring.cloud.lesson12.user.service.client.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;/*** 引导类**/
@SpringBootApplication
@RibbonClient("user-service-provider") // 指定目标应用名称
@EnableCircuitBreaker // 使用服务短路
@EnableFeignClients(clients = UserService.class) // 申明 UserService 接口作为 Feign Client 调用
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class)
public class UserServiceClientApplication {public static void main(String[] args) {SpringApplication.run(UserServiceClientApplication.class, args);}/*** 将 {@link MyRule} 暴露成 {@link Bean}** @return {@link MyRule}*/@Beanpublic IRule myRule() {return new MyRule();}/*** 申明 具有负载均衡能力 {@link RestTemplate}** @return*/@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}
实现消息发送到 RabbitMQ
@PostMapping("/user/save/message/rabbit")public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {MessageChannel messageChannel = userMessage.output();// User 序列化成 JSONString payload = objectMapper.writeValueAsString(user);GenericMessage<String> message = new GenericMessage<String>(payload);// 发送消息return messageChannel.send(message);}
启动 Rabbit MQ
改造 user-service-provider 消息接收器( Stream Binder : Rabbit MQ)
替换依赖
<!--<!– 依赖 Spring Cloud Stream Binder Kafka –>--><!--<dependency>--><!--<groupId>org.springframework.cloud</groupId>--><!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>--><!--</dependency>--><!-- 整合 Spring Cloud Stream Binder Rabbit MQ --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>
Spring Cloud消息驱动整合相关推荐
- Spring Cloud消息驱动整合 1
Spring Cloud Stream 使用场景 消息驱动的微服务应用 目的 简化代码 统一抽象 主要概念 1.应用模型 2.Binder抽象 3.持久化 发布/订阅支持 4.消费分组支持 5.分区支 ...
- Spring Cloud Data Flow整合UAA使用外置数据库和API接口
我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 之前的文章<Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限 ...
- Spring Cloud Data Flow整合UAA之使用LDAP进行账号管理
我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 Spring Cloud Data Flow整合UAA的文章已经写了两篇,之前的方案是把用户信息保存在数据库 ...
- spring cloud+zookeeper+feign整合 简单实例(一)
一.前言 各位热爱知识的小伙伴们大家好呀!很高兴大家能点开这个博客,这是我个人的第一篇博客,之后也会持续的更新java以及spring项目的相关代码,希望大家持续关注.如果对本篇博客有什么不懂的地方或 ...
- spring cloud笔记(1) 整合消息中间件ONS(商业版rocketmq)
之前一直用spring boot+dubbo+zookeeper来搭建分布式项目,后来在网上看到有人说用spring cloud更好,前两天抽时间了解了一下,spring cloud 为我们提供了许多 ...
- Spring Cloud_27_消息驱动/RabbitMQ的使用
转载自 版权声明:本文为博主原创文章,艾特米工作室,欢迎转载 https://blog.csdn.net/zhaozao5757/article/details/79693670 消息驱动/Rabbi ...
- Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限控制
我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 关于Spring Cloud Data Flow这里不多介绍,有兴趣可以看下面的文章.本文主要介绍如何整合D ...
- Spring Cloud 入门 ---- Security 整合 Oauth2 认证授权【随笔】
Spring Cloud Security Oauth2 文档参考:https://docs.spring.io/spring-security/site/docs/5.4.1/reference/h ...
- Spring Cloud Gateway 网关整合 Knife4j
文章目录 1:环境准备 2:gateway服务设置 1:导包 2:yml配置 3:添加配置类,从网关服务中获取服务列表 4:重写并覆盖/swagger-resources接口 3:其他业务逻辑服务设置 ...
- 回顾:oauth2和 spring cloud alibaba gateway 整合
一.oauth2认证中心:登录用户进行认证,生成token, 同时定义受保护的api服务 (一)oauth2的四种认证模式:授权码模式,简化模式,密码模式,客户端模式.其中授权码模式和密码模式用的最多 ...
最新文章
- ppt文本框显示缺字,信息显示不全
- 【建站系列教程】6、.htaccess文件的url重写规则-网页伪静态化
- PPT讲解机器人产业发展现状与未来展望,重磅资料
- editplus配置 linux shell 语法高亮 自动补全
- [COCI 2017-2018-2]-San
- 基于Matlab的跨孔电磁波\跨孔雷达的胖射线追踪(一)
- php商城关键字拆词搜索,做搜索查询的时候 怎么拆分词?
- Linux2.6内核本地提权
- 手机各种JS语法,随时更新
- python在自动化中的应用_python中在自动化运维的应用
- 《算法导论》.pdf
- 基于FPGA的四轴运动控制IC设计(verilog)
- kpi绩效考核流程图_XX公司KPI绩效考核案例.doc
- PAT a1138
- 入门力扣自学笔记118 C++ (题目编号1413)
- 纯css svg 改变图片颜色 ios android 小程序
- 函数逼近和曲线拟合、插值
- mc服务器常用指令_掌控MC服务器 我的世界服务器指令大全
- 动态设置html根字体大小(随着设备屏幕的大小而变化,从而实现响应式)
- 我的Fitbit Force手环使用体验
热门文章
- Apache Flink ML 2.0.0 发布公告
- Flink 1.10 SQL、HiveCatalog 与事件时间整合示例
- android动画之从源码角度分析动画原理
- 同事乱用 Redis 卡爆,我真是醉了
- 一条 update 语句引起的事故,这回可以长长记性了
- findViewById中NullPointerException的错误
- python基础知识——函数(中)
- mysql工厂模式_工厂模式IDAL详解
- datastage连接mysql库_Datastage 8.5 连接远程Oracle 数据库
- docker server 容器连接sql_借力 Docker ,三分钟搞定 MySQL 主从复制!