golang基础-WaitGroup、kafka消费者
WaitGroup
WaitGroup在go语言中,用于线程同步,单从字面意思理解,wait等待的意思,group组、团队的意思,WaitGroup就是指等待一组,等待一个系列执行完成后才会继续向下执行。
package mainimport ("fmt""sync""time"
)func main() {wg := sync.WaitGroup{}for i := 0; i < 10; i++ {wg.Add(1)go calc(&wg, i)}wg.Wait()fmt.Println("all goroutine finish")
}
func calc(w *sync.WaitGroup, i int) {fmt.Println("calc:", i)time.Sleep(time.Second)w.Done()
}
输出如下:
PS E:\golang\go_pro\src\safly> go run waitGroup.go
calc: 0
calc: 1
calc: 4
calc: 2
calc: 3
calc: 9
calc: 6
calc: 7
calc: 5
calc: 8
all goroutine finish
PS E:\golang\go_pro\src\safly>
kafka消费者
以下博客是通过生产者创建、发送消息至kafka
博客链接
现在我们站在消费者的角度,来进行收取消息
package mainimport ("fmt""strings""sync""github.com/Shopify/sarama"
)var (wg sync.WaitGroup
)func main() {//创建消费者consumer, err := sarama.NewConsumer(strings.Split("192.168.11.48:9092", ","), nil)if err != nil {fmt.Println("Failed to start consumer: %s", err)return}//设置分区partitionList, err := consumer.Partitions("nginx_log")if err != nil {fmt.Println("Failed to get the list of partitions: ", err)return}fmt.Println(partitionList)//循环分区for partition := range partitionList {pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)return}defer pc.AsyncClose()go func(pc sarama.PartitionConsumer) {wg.Add(1)for msg := range pc.Messages() {fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))fmt.Println()}wg.Done()}(pc)}//time.Sleep(time.Hour)wg.Wait()consumer.Close()
}
接下来我们测试上面的消费者示例代码,在进行测试前我们需要如下的准备工作
1、启动zookeeper
2、启动kafka
3、创立生产者topic
PS E:\develop\kafka\kafka_2.12-1.0.0> .\bin\windows\kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2
181
4、执行生产者发送消息至kafka代码
5、执行消费者代码程序
第4步的代码如下:
package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = truemsg := &sarama.ProducerMessage{}msg.Topic = "nginx_log"msg.Value = sarama.StringEncoder("this is a good test, my message is good")client, err := sarama.NewSyncProducer([]string{"192.168.11.28:9092"}, config)if err != nil {fmt.Println("producer close, err:", err)return}defer client.Close()pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send message failed,", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
然后最后看效果图如下:
golang基础-WaitGroup、kafka消费者相关推荐
- [Golang] kafka集群搭建和golang版生产者和消费者
一.kafka集群搭建 至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了. (没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*) 1. ...
- kafka基础篇(四)——kafka消费者客户端
一.入门程序 先上代码,从代码入手,讲解kafka消费者客户端的细节. public class HelloKafkaConsumer {public static void main(String[ ...
- kafka基础入门(4):kafka消费者
kafka消费者 kafka消费方式 kafka采用pull(拉)模式,consumer从broker中拉取数据 pull模式的不足:如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据 k ...
- golang基础面试题总结
golang基础面试题总结 前言:由于正在准备之后的实习面试,故总结了一部分golang语言基础的问题,回答全为自己组织的语言,若有错各位大佬可及时指出,大家共同进步,谢谢. 1.go中怎样实现安全读 ...
- Kafka消费者APi
Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...
- Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...
- Kafka快速入门(Kafka消费者)
Kafka 消费者 1. Kafka 消费方式 2 Kafka 消费者工作流程 2.1 消费者总体工作流程 2.2 消费者组原理 Consumer Group(CG):消费者组,由多个consumer ...
- kafka 消费者详解
前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? ...
- 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的
大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...
最新文章
- Nginx卡在登录页面不断跳转如何解决?(登不进登录页面)ip_hash机制(还是没解决)
- Android系统移植与驱动开发概述
- 注意力机制BAM和CBAM详细解析(附代码)
- java代理机制简单实现
- Schwarz导数与凹凸性
- New Adventure----GUI Design Studio
- js基础——function类型
- 软考中级–软件设计师考试大纲
- CoreOS部署及应用
- 四川大学计算机学院2020推免公示,2020四川大学计算机学院推免夏令营通知
- 从零开始设计RISC-V处理器——指令系统
- 上班族程序员怎么减肥
- 计算机网络 sci期刊,计算机方向的sci期刊有哪些
- 迪杰斯算法c语言,欧博体育APP-欧博体育APP
- 定义一个长方形类,定义 求周长和面积的方法,然后定义一个测试了Test,进行测试
- linux 操作系统:setenv
- window docker 找不到原先所有镜像和容器
- 普通电脑可以装苹果系统吗?Windows电脑装Mac系统
- R语言查看版本 R包查看版本
- 三轴机械臂/三自由度四足单腿DH正逆运动学及matlab验证
热门文章
- Weka算法Classifier-tree-J48源代码分析(一个)基本数据结构和算法
- 剑指Offer之旋转数组中的最小数字(题8)
- 《计算复杂性:现代方法》——第0章 记 号 约 定 0.1 对象的字符串表示
- Android四大组件之——Activity(一)定义、状态和后退栈(图文详解)
- 技术能力与真不是几年经验成正比的
- [转]Resource for Windows Phone 7
- 什么是BETA,RC,ALPHA版 - 软件命名规范
- 被投毒的管道:研究员探索CI环境中的攻击方法
- 2021奥斯汀 Pwn2Own黑客大赛落幕,Master of Pwn 诞生
- Django 聚合(译)