玩转Kafka—Spring整合Kafka

1 新建Spring Boot项目,增加依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
</dependencies>

2 项目结构

3 代码

3.1 配置文件和Kafka服务器所需配置

application.properties

server.port=8080
#制定kafka代理地址
spring.kafka.bootstrap-servers=8.131.57.161:9092
#消息发送失败重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
#每次批量发送消息的缓冲区大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 指定默认消费者group id
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.bootstrap-servers=8.131.57.161:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

Kafka服务器所需配置,server.properties文件

# 33行左右 0.0.0.0代表允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 36行左右 ip代表外部代理地址
advertised.listeners=PLAINTEXT://8.131.57.161:9092

3.2 生产者和实体类代码

Student.java

/*** @desc: 实体类* @author: YanMingXin* @create: 2021/11/20-12:43**/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {private String id;private String name;private String context;}

StudentService.java

/*** @desc: 接口* @author: YanMingXin* @create: 2021/11/20-12:43**/
public interface StudentService {void stuSayHello(Student student);
}

StudentServiceImpl.java

/*** @desc: 接口实现类* @author: YanMingXin* @create: 2021/11/20-12:43**/
@Service
public class StudentServiceImpl implements StudentService {@Autowiredprivate KafkaTemplate kafkaTemplate;/*** topic*/private static final String STU_TOPIC = "stu.sayHello";@Overridepublic void stuSayHello(Student student) {Student stu = new Student("1", "zs", "Hello Ls.");kafkaTemplate.send(STU_TOPIC, JSON.toJSONString(stu));}
}

3.3 消费者代码

MyKafkaListener.java

/*** @desc: 消费者监听* @author: YanMingXin* @create: 2021/11/20-12:44**/
@Component
public class MyKafkaListener {/*** topic*/private static final String STU_TOPIC = "stu.sayHello";@KafkaListener(topics = {STU_TOPIC})public void stuTopicConsumer(ConsumerRecord consumerRecord) {Optional kafkaMsg = Optional.ofNullable(consumerRecord.value());if (kafkaMsg.isPresent()) {Object msg = kafkaMsg.get();System.err.println(msg);}}
}

3.4 测试

@SpringBootTest
class SpKafkaApplicationTests {@Autowiredprivate StudentService studentService;@Testvoid contextLoads() throws Exception{for (int i = 0; i < 900000; i++) {studentService.stuSayHello(new Student());}}
}

玩转Kafka—Golang整合Kafka

几个常见的Go整合Kafka客户端工具:我们本次使用的是Shopify

  • Shopify:https://github.com/Shopify/sarama

  • Big Data Open Source Security:https://github.com/stealthly/go_kafka_client

  • OptioPay:https://github.com/optiopay/kafka

    https://github.com/nuance/kafka

    https://github.com/jdamick/kafka.go

  • Confluent:https://github.com/confluentinc/confluent-kafka-go

    Docs: http://docs.confluent.io/current/clients/index.html

  • Travis Bischel: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo

ps:配置go get代理(类似于Maven配置阿里云镜像)教程:

https://goproxy.io/zh/docs/getting-started.html

1 新建go modules

2 项目结构

3 生产者代码

KakaProducer.go

package mainimport ("fmt""github.com/Shopify/sarama""time"
)//消息生产者
func main() {//获取配置类config := sarama.NewConfig() //配置类实例(指针类型)config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)config.Producer.Partitioner = sarama.NewRandomPartitioner  //生成用于选择要发送消息的分区的分区(默认为散列消息键)。config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。//获取客户端对象client, err := sarama.NewSyncProducer([]string{"8.131.57.161:9092"}, config)if err != nil {//获取客户端失败fmt.Println("producer close, err:", err)return}//延迟执行,类似于栈,等到其他代码都执行完毕后再执行defer client.Close()//一直循环for {//获取Message对象msg := &sarama.ProducerMessage{}//设置topicmsg.Topic = "go_kafka"//设置Message值msg.Value = sarama.StringEncoder("this is a good test, my message is good")//发送消息,返回pid、片偏移pid, offset, err := client.SendMessage(msg)//发送失败if err != nil {fmt.Println("send message failed,", err)return}//打印返回结果fmt.Printf("pid:%v offset:%v\n", pid, offset)//线程休眠下time.Sleep(10 * time.Second)}
}

4 消费者代码

KafkaConsumer.go

package mainimport ("fmt""github.com/Shopify/sarama""strings""sync""time"
)var (wg sync.WaitGroup //同步等待组//在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。//主goroutine调用了Add()方法来设置要等待的goroutine的数量。//然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。//与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
)func main() {//获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割consumer, err := sarama.NewConsumer(strings.Split("8.131.57.161:9092", ","), nil)//获取失败if err != nil {fmt.Println("Failed to start consumer: %s", err)return}//对该topic进行监听partitionList, err := consumer.Partitions("go_kafka")if err != nil {fmt.Println("Failed to get the list of partitions: ", err)return}//打印分区fmt.Println(partitionList)//获取分区和片偏移for partition := range partitionList {pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)return}//延迟执行defer pc.AsyncClose()//启动多线程go func(pc sarama.PartitionConsumer) {wg.Add(1)//获得message的信息for msg := range pc.Messages() {fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))fmt.Println()}wg.Done()}(pc)}//线程休眠time.Sleep(10 * time.Second)wg.Wait()consumer.Close()
}

5 测试



参考文章:https://www.cnblogs.com/angelyan/p/10800739.html

欢迎关注公众号

玩转Kafka—SpringGo整合Kafka相关推荐

  1. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  2. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

  3. SpringBoot实战(十四)之整合KafKa

    本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题. 于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章.希望能够给大家帮助,少走一些弯路. ...

  4. SparkStreaming整合Kafka(Offset保存在zookeeper上,Spark2.X + kafka0.10.X)

    先来一段到处都有的原理(出处到处都有,就不注明了) Streaming和Kafka整合有两种方式--Receiver和Direct,简单理解为:Receiver方式是通过zookeeper来连接kaf ...

  5. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  6. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  7. springboot整合kafka_springboot整合kafka实现消息的发送消费

    如下是springboot整合kafka的一个案例,方便需要的小伙伴. 启动kafka Server cd 到kafka的bin目录下:前提是启动zk./kafka-server-start.sh / ...

  8. 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...

  9. kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

    文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...

最新文章

  1. 一文看懂.NET的各种变体
  2. angular 与 highcharts 结合使用
  3. 实例讲解Oracle数据库设置默认表空间问题
  4. 计算机争夺战作文,电脑争夺战作文600字
  5. java map赋值给model_Map,Model,ModelMap使用方法
  6. VS code 使用 Remote-SSH 进行python远程开发
  7. 好用!一键生成数据库文档,这个开源的文档生成工具值得了解
  8. 顺序图中页面、窗口等为什么当做类处理?
  9. 周鸿祎:有的人在25岁时就死了,但在75岁时才被埋葬
  10. Repeater使用方法---基础数据绑定+多级嵌套
  11. python k线合成_在VNPY中策略中,使用分钟线合成日K线
  12. 怎么修改数据库服务器名字,如何更改数据库服务器名字
  13. 学生适合做什么html网站,学生个人网页制作html
  14. 金山文字 职称计算机,职称计算机:金山文字2005考试大纲
  15. 步进电机弯道加速度、起跳速度、单轴加速度设置方法
  16. 青海大学市计算机科学与技术,曹腾飞 - 青海大学 - 计算机技术与应用系
  17. 让面试官哑口无言的JS奇葩知识,你遇到过吗?
  18. Blender插件开发:用fake-bpy-module提供代码补全
  19. 网络创业者莱恩-福克斯:最后一分钟的奇迹
  20. 7-60 有志者,事竟成

热门文章

  1. 品牌对比 | 特步 VS 李宁
  2. 解放双手,Windows Admin Center简化服务器管理
  3. 搭建kafka集群并使用springboot 整合
  4. 使用STL给选手打分
  5. h0156.国王的金矿
  6. 数据挖掘 第四篇:OLS回归分析
  7. 2048【浙江工商大学oj】
  8. 2014中国信用卡报告
  9. 软件与硬件之间的交互流程
  10. UltraISO 制作U盘启动盘