背景:

  当业务在同一时间出现高并发的时候,这个时候我们不想无限的增加服务器,但是又想提高吞吐量。这时可以考虑使用消息异步处理,进行消峰填谷;同时还可以降低耦合度。常见的消息中间件有kafka,rabbitMQ,activeMQ,rocketMQ。其中性能最好的,吞吐量最高的是以kafka为代表,下面介绍kafka用法。kafka详细原理介绍,参考kafka系列:https://www.cnblogs.com/wangzhuxing/category/1351802.html。

一、引入依赖

<!--kafka支持-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

二、配置yml

spring:kafka:     # 指定kafka 代理地址,可以多个bootstrap-servers: 47.52.199.52:9092template:    # 指定默认topic iddefault-topic: producerlistener:   # 指定listener 容器中的线程数,用于提高并发量concurrency: 5consumer:group-id: myGroup # 指定默认消费者group idclient-id: 200max-poll-records: 200auto-offset-reset: earliest # 最早未被消费的offsetproducer:batch-size: 1000 # 每次批量发送消息的数量retries: 3client-id: 200

三、生成者使用示例

package com.example.demo.kafka;import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.ExecutionException;@Component
public class Producer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/*** 发送消息到kafka*/public RecordMetadata sendChannelMess(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,message);RecordMetadata recordMetadata = null;try {recordMetadata = future.get().getRecordMetadata();} catch (InterruptedException|ExecutionException e) {e.printStackTrace();System.out.println("发送失败");}System.out.println("发送成功");System.out.println("partition:"+recordMetadata.partition());System.out.println("offset:"+recordMetadata.offset());System.out.println("topic:"+recordMetadata.topic());return recordMetadata;}
}

四、消费者使用示例

package com.example.demo.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.List;@Component
public class Consumer {/*** 有消息就读取,只读取消息value*/@KafkaListener(topics = {"test13"})public void receiveMessage(String message){//收到通道的消息之后执行秒杀操作
        System.out.println(message);}/*** 有消息就读取,批量读取消息value*/@KafkaListener(topics = "test12")public void onMessage(List<String> crs) {for(String str : crs){System.out.println("test12:" + str);}}/*** 有消息就读取,读取消息topic,offset,key,value等信息*/@KafkaListener(topics = "test14")public void listenT1(ConsumerRecord<?, ?> cr){System.out.println("listenT1收到消息,topic:>>>" + cr.topic() + "  offset:>>" + cr.offset()+ "  key:>>" + cr.key() + "  value:>>" + cr.value());}
}

转载于:https://www.cnblogs.com/wangzhuxing/p/10186666.html

springboot系列八、springboot整合kafka相关推荐

  1. SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

    SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...

  2. kafka maven 依赖_SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

    SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...

  3. SpringBoot系列之canal和kafka实现异步实时更新

    SpringBoot系列之canal和kafka实现异步实时更新 实验开发环境 JDK 1.8 SpringBoot2.2.1 Maven 3.2+ 开发工具 IntelliJ IDEA smartG ...

  4. spring boot 2.x 系列 —— spring boot 整合 kafka

    文章目录 一.kafka的相关概念: 1.主题和分区 2.分区复制 3. 生产者 4. 消费者 5.broker和集群 二.项目说明 1.1 项目结构说明 1.2 主要依赖 二. 整合 kafka 2 ...

  5. SpringBoot系列3-----Docker SpringBoot与数据访问 启动原理 自定义starters

    五.Docker 1.简介 Docker是一个开源的应用容器引擎:是一个轻量级容器技术 Docker支持将软件编译成一个镜像:然后在镜像中各种软件做好配置,将镜像发布出去,其他使用者可以直接使用这个镜 ...

  6. 【Springboot系列】Springboot接管所有Controller,magic-api源码阅读

    系列文章地址:Spring Boot学习大纲,可以留言自己想了解的技术点 最近在项目中使用了一个第三方的包 magic-api,节省了很多的时间,整体来说就是只用写sql就好了,不用写service, ...

  7. 搭建大型分布式服务(二十二)SpringBoot 如何优雅地整合多个kafka数据源?

    系列文章目录 文章目录 系列文章目录 前言 一.本文要点 二.开发环境 三.创建项目 四.修改项目 五.测试一下 六.小结 前言 在日常开发当中,经常会遇到需要消费的topic不在同一个kafka集群 ...

  8. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  9. SpringBoot 系列教程(八十五):Spring Boot使用MD5加盐验签Api接口之前后端分离架构设计

    加密算法参考: 浅谈常见的七种加密算法及实现 加密算法参考: 加密算法(DES,AES,RSA,MD5,SHA1,Base64)比较和项目应用 目的: 通过对API接口请求报文签名,后端进行验签处理, ...

最新文章

  1. 改变人生的32句励志名言
  2. android 计算执行时间
  3. java short 后缀_自学java的新手问个问题,为什么写个代码中的int能自动转
  4. Python 3.8 已发布,现在是切换至新版本的好时机吗?
  5. Spring Cloud微服务下的权限架构调研
  6. 【Python】:用python做下百度2014笔试题
  7. Logistic回归小结
  8. MPLS ××× 的基本配置(二)
  9. 【光学】基于matlab圆孔衍射动态模拟【含Matlab源码 795期】
  10. Ant 执行 YUICompressor
  11. navicat+for+mysql10.0.11简体中文破解版
  12. 微信小程序-map地图标签的初级使用, 拥有图标,气泡,地图本身无法缩放移动需要点击跳转第三方地图平台
  13. c8500刷机 转帖
  14. 大学物理笔记——第二章质点动力学
  15. 【预测模型】预测某地区未来 10-20 年按年龄划分的人口结构(PDE模型)
  16. 图像翻译三部曲:pix2pix, pix2pixHD, vid2vid
  17. sas 结果导出到excel 打不开解决办法
  18. 吴恩达 (Andrew Ng) 是一个怎样的人
  19. 删除UltraISO(软碟通)卸载后的遗留文件“isoshl64.dll”
  20. その他トランザクションコード

热门文章

  1. 破解前端面试系列(3):如何搞定纸上代码环节?
  2. XStream xml与javabean之间的互转
  3. centos 5 6安装本地yum源
  4. 在mac下安装matplotlib,xlrd
  5. VS2010安装HTML5插件
  6. zoj 3386 Trick or Treat 三分 求最大值的 最小值
  7. Marshal类的简单使用
  8. SQL server 2008 数据库企业版安装教程图解 (转载)
  9. sample solution
  10. blockboard vs canvas