spring cloud stream kafka

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>

自定义管道

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;@Component
public interface MyProcessor {String MESSAGE_INPUT = "log_input";String MESSAGE_OUTPUT = "log_output";String LOG_FORMAT_INPUT = "log_format_input";String LOG_FORMAT_OUTPUT = "log_format_output";@Input(MESSAGE_INPUT)SubscribableChannel logInput();@Output(MESSAGE_OUTPUT)MessageChannel logOutput();@Input(LOG_FORMAT_INPUT)SubscribableChannel logFormatInput();@Output(LOG_FORMAT_OUTPUT)MessageChannel logFormatOutput();}

发送,需要绑定@EnableBinding(MyProcessor.class)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;@EnableBinding(MyProcessor.class)
public class SendService {@Autowiredprivate MyProcessor source;public void sendMsg(String msg) {source.logOutput().send(MessageBuilder.withPayload(msg).build());}public void sendFormatMsg(String msg) {source.logFormatOutput().send(MessageBuilder.withPayload(msg).build());}
}

接收,receive方法在接收到消息后,可以直接通过@SendTo 转发到其他的消息队列中

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;@EnableBinding(MyProcessor.class)
@Slf4j
public class ReceiveListener {@StreamListener(value = MyProcessor.MESSAGE_INPUT)@SendTo(value = MyProcessor.LOG_FORMAT_OUTPUT)public String receive(String payload) {log.info("接收到的信息:" + payload);String result="格式化的消息:"+payload;return result;}@StreamListener(value = MyProcessor.LOG_FORMAT_INPUT)public void receiveFormat(String payload) {log.info("接收到的信息:" + payload);}}

绑定了2个kafka,注意要指定默认的绑定器default-binder,不然启动报错

spring:cloud:stream:default-binder: kafka1bindings:#与@StreamListener注解中的value一致,是绑定的渠道名log_input:binder: kafka1#绑定的kafka topic名称为testdestination: topicOne#消费组#          group: cloud-test2-group1#          content-type: application/jsonlog_format_input:binder: kafka2#绑定的kafka topic名称为testdestination: topicTwo#消费组# group: cloud-test2-group1#          content-type: application/jsonlog_output:binder: kafka1destination: topicOne#          content-type: application/jsonlog_format_output:binder: kafka2destination: topicTwobinders:kafka1:type: kafkaenvironment:spring:cloud:stream:kafka:binder:brokers: localhost:9092zkNodes: localhost:2181kafka2:type: kafkaenvironment:spring:cloud:stream:kafka:binder:brokers: localhost:9093zkNodes: localhost:2182

spring cloud stream kafka 处理消息相关推荐

  1. spring cloud stream kafka 动态写入不同的topic(Using dynamically bound destinations)

    引入依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spr ...

  2. Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

    应用场景 我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时 ...

  3. Spring Cloud Stream 与 Kafka 整合

    重要注意事项: 1. 尽管 Spring Cloud Stream Binder 中存在 Kafka 的整合,然而Spring Kafka 和 Spring Cloud Stream Kafka 在处 ...

  4. Kafka的Spring Cloud Stream

    总览 该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序. 在本教程结束时,您 ...

  5. 一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录

    一.前言 Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架. 本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现 ...

  6. Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送

    一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...

  7. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  8. Spring Cloud Stream核心原理介绍

    一.简介  目前,市面上消息中间件产品种类繁多,譬如RabbitMq,RocektMq,Kafka,Azure EventHub, Amazon Kenesis.各种中间件的原理.机制差异很大,但归根 ...

  9. Spring Cloud Stream中文指导手册

    Spring Cloud Stream中文指导手册 source 文章目录 Spring Cloud Stream中文指导手册 @[toc] Spring Cloud Stream 核心 1.简介 2 ...

最新文章

  1. [sqlserver脚本]查看指定SQL语句生成了哪些执行计划
  2. Python属性简介__name__
  3. 深度残差网络和Highway网络
  4. 计算图片的宽和高 动态设置图片的位置
  5. mysql之union合并查询
  6. 雷军:互联网思维本质上就是群众路线
  7. 解决scanf_s函数报错:没有为格式字符串传递足够的参数
  8. 计算机网络技术放块队解说词,基于《计算机网络技术》课程多媒体课件制作与设计.doc...
  9. python创建类和类方法
  10. 图片不存在显示默认图片
  11. PostgreSQL 优势,MySQL 数据库自身的特性并不十分丰富,触发器和存储过程的支持较弱,Greenplum、AWS 的 Redshift 等都是基于 PostgreSQL 开发的...
  12. Image.FormFile引起的若干问题
  13. 安装esxi时候的No Network Adapters报错 解决办法
  14. 线程创建的三种方式和线程池创建的四种方式
  15. linux删除文件名的文件夹,Linux删除文件夹和修改文件名
  16. Android关于BottomNavigationView效果实现指南
  17. 【C语言】如何优雅地进行嵌入式C开发?(万字总结)
  18. 一篇论文的正确格式是什么?
  19. 日文windows系统 oracle数据库乱码的处理
  20. tieredImageNet 数据集介绍

热门文章

  1. 那些年我们的青葱岁月
  2. WebSocket——vue3简易聊天室
  3. Dell T110 服务器安装
  4. windows7 python 指定源组播 10049_Windows Python多播上的错误10049
  5. 【HanLP】eclipse/myeclipse下配置HanLP
  6. 信息安全体系建设☞通过防火墙实现内部网络的微隔离
  7. qt子控件单独指定鼠标形状等样式
  8. 黑猴子的家:stop-cluster.sh
  9. 获取数据库服务器时间
  10. web前端数据库基础知识