一、关于Spring-Cloud-Stream

  Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

  在这里我先放一张官网的图:

  应用程序通过Spring Cloud Stream注入到输入和输出通道与外界进行通信。根据此规则我们很容易的实现消息传递,订阅消息与消息中转。并且当需要切换消息中间件时,几乎不需要修改代码,只需要变更配置就行了。

  在用例图中 Inputs代表了应用程序监听消息 、outputs代表发送消息、binder的话大家可以理解为将应用程序与消息中间件隔离的抽象,类似于三层架构下利用dao屏蔽service与数据库的实现的原理。

  springcloud默认提供了rabbitmq与kafka的实现。

二、springcloud集成kafka

1、添加gradle依赖:

dependencies{compile('org.springframework.cloud:spring-cloud-stream')compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')compile('org.springframework.kafka:spring-kafka')
}

View Code

2、定义一个接口:

  spring-cloud-stream已经给我们定义了最基本的输入与输出接口,他们分别是 Source,Sink, Processor

  Sink接口:

package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;public interface Sink {String INPUT = "input";@Input("input")SubscribableChannel input();
}

View Code

  Source接口:

package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}

View Code

  Processor接口:

package org.springframework.cloud.stream.messaging;public interface Processor extends Source, Sink {
}

View Code

  这里面Processor这个接口既定义输入通道又定义了输出通道。同时我们也可以自己定义通道接口,代码如下:

package com.bdqn.lyrk.shop.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface ShopChannel {/*** 发消息的通道名称*/String SHOP_OUTPUT = "shop_output";/*** 消息的订阅通道名称*/String SHOP_INPUT = "shop_input";/*** 发消息的通道** @return*/@Output(SHOP_OUTPUT)MessageChannel sendShopMessage();/*** 收消息的通道** @return*/@Input(SHOP_INPUT)SubscribableChannel recieveShopMessage();}

View Code

3、定义服务类

package com.bdqn.lyrk.shop.server;import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class ShopService {@Resource(name = ShopChannel.SHOP_OUTPUT)private MessageChannel sendShopMessageChannel;@GetMapping("/sendMsg")public String sendShopMessage(String content) {boolean isSendSuccess = sendShopMessageChannel.send(MessageBuilder.withPayload(content).build());return isSendSuccess ? "发送成功" : "发送失败";}@StreamListener(ShopChannel.SHOP_INPUT)public void receive(Message<String> message) {System.out.println(message.getPayload());}
}

View Code

  这里面大家注意 @StreamListener。这个注解可以监听输入通道里的消息内容,注解里面的属性指定我们刚才定义的输入通道名称,而MessageChannel则可以通过

输出通道发送消息。使用@Resource注入时需要指定我们刚才定义的输出通道名称

4、定义启动类

package com.bdqn.lyrk.shop;import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;@SpringBootApplication
@EnableBinding(ShopChannel.class)
public class ShopServerApplication {public static void main(String[] args) {SpringApplication.run(ShopServerApplication.class, args);}
}

View Code

  注意@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称,当然这里也可以传多个相关的接口

5、定义application.yml文件

spring:application:name: shop-servercloud:stream:bindings:#配置自己定义的通道与哪个中间件交互shop_input: #ShopChannel里Input和Output的值destination: zhibo #目标主题shop_output:destination: zhibodefault-binder: kafka #默认的binder是kafkakafka:bootstrap-servers: localhost:9092 #kafka服务地址consumer:group-id: consumer1producer:key-serializer: org.apache.kafka.common.serialization.ByteArraySerializervalue-serializer: org.apache.kafka.common.serialization.ByteArraySerializerclient-id: producer1
server:port: 8100

View Code

  这里是重头戏,我们必须指定所有通道对应的消息主题,同时指定默认的binder为kafka,紧接着定义Spring-kafka的外部化配置,在这里指定producer的序列化类为ByteArraySerializer

启动程序成功后,我们访问 http://localhost:8100/sendMsg?content=2 即可得到如下结果

转载于:https://www.cnblogs.com/niechen/p/8687206.html

SpringCloud学习之SpringCloudStream集成kafka相关推荐

  1. 最简单的SpringCloudStream集成Kafka教程

    开发中,服务与服务之间通信通常会用到消息中间件,如果我们使用了某一个MQ,那么消息中间件与我们的系统算是高耦合.将来有一天,要替换成另外的MQ,我们的改动就会比较大.为了解决这个问题,我们可以使用Sp ...

  2. SpringCloud学习笔记027---SpringBoot集成MyBatis_实现多数据源_可以自定义数据库类型

    JAVA技术交流QQ群:170933152 1.可以静态配置数据库 2.也可以动态切换数据库 项目提交测试,趁着中当间的这个空档期,把springboot的多数据源配置学习一下,总体来说多数据源配置有 ...

  3. SpringCloud学习笔记024---SpringBoot集成Lucene实现全文检索_分词_索引_更新_删除文档_词条搜索_多条件查询

    JAVA技术交流QQ群:170933152 先看代码实现,下面有lucene介绍: 测试用例 Github 代码 代码我已放到 Github ,导入spring-boot-lucene-demo 项目 ...

  4. SpringCloud学习笔记023---SpringBoot集成Dubbo_依赖zookeeper实现分布式应用一致性以及远程服务调用

    JAVA技术交流QQ群:170933152 刚开始的时候zookeeper,配置的是本地127.0.0.1:2181,这个是可以连接上的,但是如果是远程的 比如: 172.19.128.67:2181 ...

  5. 分布式系统服务注册与发现原理 SpringCloud 学习笔记

    分布式系统服务注册与发现原理 & SpringCloud 学习笔记 分布式系统服务注册与发现原理 引入服务注册与发现组件的原因 单体架构 应用与数据分离 集群部署 微服务架构 架构演进总结 服 ...

  6. SpringBoot集成kafka全面实战

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...

  7. SpringBoot笔记:SpringBoot2.3集成Kafka组件配置

    文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...

  8. SpringCloud学习笔记(1)- Spring Cloud Netflix

    文章目录 SpringCloud学习笔记(1)- Spring Cloud Netflix 单体应用存在的问题 Spring Cloud Eureka Eureka Server代码实现 Eureka ...

  9. springboot 集成kafka 实现多个customer不同group

    springboot正常集成kafka 这个网上很多资料都有些集成,我就不浪费太多篇幅和时间了,笔者找了篇还算很容易理解的博客,自行学习 https://blog.csdn.net/tzs_10412 ...

最新文章

  1. java项目导入包报错_转!java web项目 build path 导入jar包,tomcat启动报错 找不到该类...
  2. Linux iperf 网络性能测试工具
  3. android适配器持有者模式
  4. 《软件设计精要与模式》推荐序三
  5. vector父类类型可以存放子类吗_拼夕夕三轮面经:被问到反射和泛型的bug,你踏空了吗?...
  6. java-抽象类和接口对区别
  7. [目录]搭建一个简单的WebGIS应用程序
  8. 亚马逊创始人下月将乘自家火箭进入太空 亲弟弟同行
  9. 芜湖计算机专业哪个学校最好,芜湖IT行业那么火,到底哪些人适合学IT呢?
  10. 零售分析用vba还是python_数据分析?Excel、VBA和Python?营销套路还是大势所趋!...
  11. [渝粤教育] 同济大学 线性代数学习指导 参考 资料
  12. (转)研究报告:区块链+供应链金融
  13. 数据库试题及答案 两套
  14. 基本知识 100101
  15. Ubuntu20.04安装Qemu-SGX
  16. 腾讯校园招聘笔试——逛街能看到楼的数量
  17. 2000-2019全中国省、市、县企业注册数据(包含经纬度、注册数目等多指标信息)
  18. 雪豹学院主办“2019年《ASM敏捷大咖》见修系列公开课”深圳宝安站活动圆满结束
  19. 摩托罗拉Android系统,摩托罗拉Droid官方Android 2.2系统详细功能展示
  20. Linux /dev/mapper/ubuntu--vg-ubuntu--lv 磁盘空间不足的问题

热门文章

  1. Android技能树 — 网络小结(6)之 OkHttp超超超超超超超详细解析
  2. Linux 获取grep前后几行
  3. ECS启动新“模”法
  4. 单元格填充为0的html,Excel如何在数据信息表中把空单元格填写为数字“0”
  5. python高端写法_python高级篇:使用元类方式实现单例模式详解
  6. mysql 种子表_mysql之3种子查询
  7. 非对称加密原理及RSA加密步骤
  8. 【GWT系列】实现远程过程调用
  9. Netty RPC Demo 实现
  10. php大文件怎么排序去重,简单实现大文件的排序和去重