总览

该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序。

在本教程结束时,您将运行一个简单的基于Spring Boot的Greetings微服务

  1. 从REST API获取消息
  2. 将其写入Kafka主题
  3. 从主题中读取
  4. 将其输出到控制台

让我们开始吧!

顺便说一句,您可以在此处找到源代码。

什么是Spring Cloud Streaming?

Spring Cloud Stream是基于Spring Boot构建的框架,用于构建消息驱动的微服务。

什么是卡夫卡?

Kafka是最初由LinkedIn开发的流行的高性能和水平可伸缩的消息传递平台。

安装Kafka

从这里下载Kafka并将其解压缩:

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

启动Zookeeper和Kafka

在Windows上:

> bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> bin\windows\kafka-server-start.bat config\server.properties

在Linux或Mac上:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties

如果计算机从休眠状态唤醒后,Kafka没有运行并且无法启动,请删除<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。

什么是Lombok?

Lombok是一个Java框架,可在代码中自动生成getter,setter,toString(),构建器,记录器等。

Maven依赖

转到https://start.spring.io创建一个Maven项目:

  1. 添加必要的依赖项: Spring Cloud StreamKafkaDevtools (用于在开发过程中进行热重新部署,可选), Actuator (用于监视应用程序,可选), Lombok (确保在IDE中也安装了Lombok插件)
  2. 单击生成项目按钮以zip文件形式下载项目
  3. 解压缩zip文件并将maven项目导入到您喜欢的IDE

注意pom.xml文件中的Maven依赖项:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><!-- Also install the Lombok plugin in your IDE --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency>

…还有<dependencyManagement>部分:

<dependencyManagement><dependencies><dependency><!-- Import dependency management from Spring Boot --><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>${spring-cloud-stream.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

…和<repository>部分:

<repository><id>spring-milestones</id><name>Spring Milestones</name><url>http://repo.spring.io/libs-milestone</url><snapshots><enabled>false</enabled></snapshots>
</repository>

定义卡夫卡流

package com.kaviddiss.streamkafka.stream;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  public interface GreetingsStreams {String INPUT = "greetings-in";String OUTPUT = "greetings-out";@Input(INPUT)SubscribableChannel inboundGreetings();@Output(OUTPUT)MessageChannel outboundGreetings();
}

为了使我们的应用程序能够与Kafka进行通信,我们需要定义一个出站流以将消息写入Kafka主题,并定义一个入站流以读取来自Kafka主题的消息。

通过简单地创建一个接口为每个流定义单独的方法,Spring Cloud提供了一种方便的方法。

inboundGreetings()方法定义要从Kafka读取的入站流,而outboundGreetings()方法定义要写入Kafka的出站流。

在运行时,Spring将为GreetingsStreams接口创建一个基于Java代理的实现,该实现可以作为Spring Bean注入到代码中的任何位置,以访问我们的两个流。

配置Spring Cloud Stream

下一步是将Spring Cloud Stream配置为绑定到GreetingsStreams接口中的流。 这可以通过使用以下代码创建@Configurationcom.kaviddiss.streamkafka.config.StreamsConfig来完成:

package com.kaviddiss.streamkafka.config;import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

使用@EnableBinding批注(将GreatingsService接口传递到该批注)完成@EnableBindingGreatingsService

Kafka的配置属性

默认情况下,配置属性存储在src/main/resources/application.properties文件中。

但是,我更喜欢使用YAML格式,因为它不太冗长,并且允许将公共属性和特定于环境的属性保留在同一文件中。

现在,让我们将application.properties重命名为application.yaml并将config片段下方粘贴到文件中:

spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:greetings-in:destination: greetingscontentType: application/jsongreetings-out:destination: greetingscontentType: application/json

上面的配置属性配置要连接的Kafka服务器的地址,以及我们用于代码中的入站和出站流的Kafka主题。 他们俩都必须使用相同的Kafka主题!

contentType属性告诉Spring Cloud Stream在流中以String的形式发送/接收我们的消息对象。

创建消息对象

使用下面的代码创建一个简单的com.kaviddiss.streamkafka.model.Greetings类,该代码将表示我们从中读取并写入的greetings Kafka主题:

package com.kaviddiss.streamkafka.model;// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;@Getter @Setter @ToString @Builder
public class Greetings {private long timestamp;private String message;
}

注意,由于Lombok批注,该类如何没有任何getter和setter。 @ToString将使用类的字段生成toString()方法,而@Builder批注将允许我们使用流畅的生成器创建Greetings对象(请参见下文)。

创建服务层以写入Kafka

让我们创建的com.kaviddiss.streamkafka.service.GreetingsService下面的代码,将写一个类Greetings对象的greetings卡夫卡话题:

package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;@Service
@Slf4j
public class GreetingsService {private final GreetingsStreams greetingsStreams;public GreetingsService(GreetingsStreams greetingsStreams) {this.greetingsStreams = greetingsStreams;}public void sendGreeting(final Greetings greetings) {log.info("Sending greetings {}", greetings);MessageChannel messageChannel = greetingsStreams.outboundGreetings();messageChannel.send(MessageBuilder.withPayload(greetings).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());}

@Service批注会将此类配置为Spring Bean,并通过构造函数注入GreetingsService依赖项。

@Slf4j批注将生成一个SLF4J记录器字段,可用于记录日志。

sendGreeting()方法中,我们使用注入的GreetingsStream对象发送由Greetings对象表示的消息。

创建REST API

现在,我们将创建一个REST api端点,该端点将触发使用GreetingsService Spring Bean向Kafka发送消息:

package com.kaviddiss.streamkafka.web;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; @RestController
public class GreetingsController {private final GreetingsService greetingsService;public GreetingsController(GreetingsService greetingsService) {this.greetingsService = greetingsService;}@GetMapping("/greetings")@ResponseStatus(HttpStatus.ACCEPTED)public void greetings(@RequestParam("message") String message) {Greetings greetings = Greetings.builder().message(message).timestamp(System.currentTimeMillis()).build();greetingsService.sendGreeting(greetings);}
}

@RestController注释告诉Spring这是一个Controller bean(MVC中的C)。 greetings()方法定义一个HTTP GET /greetings端点,该端点接受message请求参数,并将其传递给GreetingsServicesendGreeting()方法。

听问候卡夫卡主题

让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类,该类将侦听greetings Kafka主题上的消息并将其记录在控制台上:

package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class GreetingsListener {@StreamListener(GreetingsStreams.INPUT)public void handleGreetings(@Payload Greetings greetings) {log.info("Received greetings: {}", greetings);}
}

@Component批注类似于@Service @Component@Service @RestController定义了一个Spring Bean。

GreetingsListener有一个方法, handleGreetings()将通过云春流与每一个新的调用Greetings的消息对象greetings卡夫卡的话题。 这要感谢为handleGreetings()方法配置的@StreamListener批注。

运行应用程序

最后一个难题是由Spring Initializer自动生成的com.kaviddiss.streamkafka.StreamKafkaApplication类:

package com.kaviddiss.streamkafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamKafkaApplication {public static void main(String[] args) {SpringApplication.run(StreamKafkaApplication.class, args);}
}

无需在此处进行任何更改。 您可以在您的IDE中将此类作为Java应用程序运行,也可以使用Spring Boot maven插件从命令行运行该应用程序:

> mvn spring-boot:run

应用程序运行后,在浏览器中转到http:// localhost:8080 / greetings?message = hello并检查您的控制台。

摘要

我希望您喜欢本教程。 随时提出任何问题并留下您的反馈。

翻译自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html

Kafka的Spring Cloud Stream相关推荐

  1. Spring Cloud Stream 与 Kafka 整合

    重要注意事项: 1. 尽管 Spring Cloud Stream Binder 中存在 Kafka 的整合,然而Spring Kafka 和 Spring Cloud Stream Kafka 在处 ...

  2. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  3. 一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录

    一.前言 Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架. 本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现 ...

  4. spring cloud stream kafka 处理消息

    spring cloud stream kafka <dependency><groupId>org.springframework.cloud</groupId> ...

  5. spring cloud stream kafka 动态写入不同的topic(Using dynamically bound destinations)

    引入依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spr ...

  6. 一文了解Spring Cloud Stream体系

    点击蓝色"程序猿DD"关注我哟 加个"星标",不忘签到哦 来源:阿里巴巴中间件 Spring Cloud Stream 在 Spring Cloud 体系内用于 ...

  7. Spring Cloud Stream 体系及原理介绍

    https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于 ...

  8. Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务

    文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...

  9. Spring Cloud Stream的使用(上)

    操作消息队列的另一种方法,SpringCloud是Spring的组件之一,官方定义Spring Cloud Stream,给微服务应用构建消息驱动能力的框架,下面我简称Stream,应用程序通过Inp ...

最新文章

  1. 客户资料查询传递数据格式
  2. 移柯L620模组使用MQTT协议连接移动OneNet云平台(NB-IoT专栏—进阶篇3)
  3. 图像二值化----otsu(最大类间方差法、大津算法)(二)
  4. 一键fxxk,代码修复神器拯救你
  5. asp.net弹出div层,并把弹出层上的值赋值给界面
  6. 渐进式迭代教学法--PHP
  7. python怎么画多重饼状图_Python通过matplotlib画双层饼图及环形图简单示例
  8. python函数里面引用外部变量_Python基础 变量进阶
  9. Chrome用户不喜新版:宁用其他浏览器也不要用旧版本
  10. 修炼一名软件工程师的职业水准
  11. 五年后的4.20地震
  12. 前端开源项目周报1213
  13. 万用表二极管档和三极管档的使用
  14. QQ及QQ群聊天窗口调用方法
  15. win11怎么开启休眠睡眠?
  16. 域服务器无法修改域账户密码,域用户使用Ctrl+Alt+del不能修改密码
  17. win10打开热点的时候提示我们无法设置热点
  18. office办公所有版本齐全
  19. idea设置类注释和方法注释模板
  20. 协同办公工具:在线白板初起步,在线设计已红海

热门文章

  1. maven中scope属性的
  2. JavaFX官方教程(十三)之应用效果
  3. JavaWeb的web.xml标签元素(二)
  4. js的三元表达式用来替换表格中的颜色
  5. express中获取url参数
  6. Spring AOP知识点简介
  7. 《四世同堂》金句摘抄(十八)
  8. Hibernate基本概念 (5)
  9. java反射机制的原理与简单使用
  10. React不提交表单并且获取表单中的数据