WaitGroup

WaitGroup在go语言中,用于线程同步,单从字面意思理解,wait等待的意思,group组、团队的意思,WaitGroup就是指等待一组,等待一个系列执行完成后才会继续向下执行。

package mainimport ("fmt""sync""time"
)func main() {wg := sync.WaitGroup{}for i := 0; i < 10; i++ {wg.Add(1)go calc(&wg, i)}wg.Wait()fmt.Println("all goroutine finish")
}
func calc(w *sync.WaitGroup, i int) {fmt.Println("calc:", i)time.Sleep(time.Second)w.Done()
}

输出如下:

PS E:\golang\go_pro\src\safly> go run waitGroup.go
calc: 0
calc: 1
calc: 4
calc: 2
calc: 3
calc: 9
calc: 6
calc: 7
calc: 5
calc: 8
all goroutine finish
PS E:\golang\go_pro\src\safly>

kafka消费者

以下博客是通过生产者创建、发送消息至kafka 
博客链接

现在我们站在消费者的角度,来进行收取消息

package mainimport ("fmt""strings""sync""github.com/Shopify/sarama"
)var (wg sync.WaitGroup
)func main() {//创建消费者consumer, err := sarama.NewConsumer(strings.Split("192.168.11.48:9092", ","), nil)if err != nil {fmt.Println("Failed to start consumer: %s", err)return}//设置分区partitionList, err := consumer.Partitions("nginx_log")if err != nil {fmt.Println("Failed to get the list of partitions: ", err)return}fmt.Println(partitionList)//循环分区for partition := range partitionList {pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)return}defer pc.AsyncClose()go func(pc sarama.PartitionConsumer) {wg.Add(1)for msg := range pc.Messages() {fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))fmt.Println()}wg.Done()}(pc)}//time.Sleep(time.Hour)wg.Wait()consumer.Close()
}

接下来我们测试上面的消费者示例代码,在进行测试前我们需要如下的准备工作 
1、启动zookeeper 
2、启动kafka 
3、创立生产者topic

PS E:\develop\kafka\kafka_2.12-1.0.0> .\bin\windows\kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2
181

4、执行生产者发送消息至kafka代码 
5、执行消费者代码程序

第4步的代码如下:

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = truemsg := &sarama.ProducerMessage{}msg.Topic = "nginx_log"msg.Value = sarama.StringEncoder("this is a good test, my message is good")client, err := sarama.NewSyncProducer([]string{"192.168.11.28:9092"}, config)if err != nil {fmt.Println("producer close, err:", err)return}defer client.Close()pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send message failed,", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

然后最后看效果图如下: 

golang基础-WaitGroup、kafka消费者相关推荐

  1. [Golang] kafka集群搭建和golang版生产者和消费者

    一.kafka集群搭建 至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了. (没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*) 1. ...

  2. kafka基础篇(四)——kafka消费者客户端

    一.入门程序 先上代码,从代码入手,讲解kafka消费者客户端的细节. public class HelloKafkaConsumer {public static void main(String[ ...

  3. kafka基础入门(4):kafka消费者

    kafka消费者 kafka消费方式 kafka采用pull(拉)模式,consumer从broker中拉取数据 pull模式的不足:如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据 k ...

  4. golang基础面试题总结

    golang基础面试题总结 前言:由于正在准备之后的实习面试,故总结了一部分golang语言基础的问题,回答全为自己组织的语言,若有错各位大佬可及时指出,大家共同进步,谢谢. 1.go中怎样实现安全读 ...

  5. Kafka消费者APi

    Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...

  6. Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

    文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...

  7. Kafka快速入门(Kafka消费者)

    Kafka 消费者 1. Kafka 消费方式 2 Kafka 消费者工作流程 2.1 消费者总体工作流程 2.2 消费者组原理 Consumer Group(CG):消费者组,由多个consumer ...

  8. kafka 消费者详解

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? ...

  9. 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的

    大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...

最新文章

  1. Nginx卡在登录页面不断跳转如何解决?(登不进登录页面)ip_hash机制(还是没解决)
  2. Android系统移植与驱动开发概述
  3. 注意力机制BAM和CBAM详细解析(附代码)
  4. java代理机制简单实现
  5. Schwarz导数与凹凸性
  6. New Adventure----GUI Design Studio
  7. js基础——function类型
  8. 软考中级–软件设计师考试大纲
  9. CoreOS部署及应用
  10. 四川大学计算机学院2020推免公示,2020四川大学计算机学院推免夏令营通知
  11. 从零开始设计RISC-V处理器——指令系统
  12. 上班族程序员怎么减肥
  13. 计算机网络 sci期刊,计算机方向的sci期刊有哪些
  14. 迪杰斯算法c语言,欧博体育APP-欧博体育APP
  15. 定义一个长方形类,定义 求周长和面积的方法,然后定义一个测试了Test,进行测试
  16. linux 操作系统:setenv
  17. window docker 找不到原先所有镜像和容器
  18. 普通电脑可以装苹果系统吗?Windows电脑装Mac系统
  19. R语言查看版本 R包查看版本
  20. 三轴机械臂/三自由度四足单腿DH正逆运动学及matlab验证

热门文章

  1. Weka算法Classifier-tree-J48源代码分析(一个)基本数据结构和算法
  2. 剑指Offer之旋转数组中的最小数字(题8)
  3. 《计算复杂性:现代方法》——第0章 记 号 约 定 0.1 对象的字符串表示
  4. Android四大组件之——Activity(一)定义、状态和后退栈(图文详解)
  5. 技术能力与真不是几年经验成正比的
  6. [转]Resource for Windows Phone 7
  7. 什么是BETA,RC,ALPHA版 - 软件命名规范
  8. 被投毒的管道:研究员探索CI环境中的攻击方法
  9. 2021奥斯汀 Pwn2Own黑客大赛落幕,Master of Pwn 诞生
  10. Django 聚合(译)