Go语言学习之11 日志收集系统kafka库实战
本节主要内容:
1. 日志收集系统设计
2. 日志客户端开发
1. 项目背景
a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题
b. 当系统机器比较少时,登陆到服务器上查看即可满足
c. 当系统机器规模巨大,登陆到机器上查看几乎不现实
2. 解决方案
a. 把机器上的日志实时收集,统一的存储到中心系统
b. 然后再对这些日志建立索引,通过搜索即可以找到对应日志
c. 通过提供界面友好的web界面,通过web即可以完成日志搜索
3. 面临的问题
a. 实时日志量非常大,每天几十亿条
b. 日志准实时收集,延迟控制在分钟级别
c. 能够水平可扩展
4. 业界方案ELK
日志收集系统架构
该方案问题:
a. 运维成本高,每增加一个日志收集,都需要手动修改配置
b. 监控缺失,无法准确获取logstash的状态
c. 无法做定制化开发以及维护
6. 日志收集系统设计
各组件介绍:
a. Log Agent,日志收集客户端,用来收集服务器上的日志
b. Kafka,高吞吐量的分布式队列,linkin开发,apache顶级开源项目
c. ES,elasticsearch,开源的搜索引擎,提供基于http restful的web接口
d. Hadoop,分布式计算框架,能够对大量数据进行分布式处理的平台
7. kafka应用场景
1. 异步处理, 把非关键流程异步化,提高系统的响应时间和健壮性
2. 应用解耦,通过消息队列
3. 流量削峰3. 流量削峰
8. zookeeper应用场景
1. 服务注册&服务发现
2. 配置中心
3. 分布式锁
- Zookeeper是强一致的
- 多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功
9. 安装kafka
见博客:https://www.cnblogs.com/xuejiale/p/10505391.html
10. log agent设计
11. log agent流程
11. kafka示例
先导入第三方包:
github.com/Shopify/sarama
我的kafka和ZooKeeper都安装在Linux(Centos6.5,ip: 192.168.30.136)上:
![](/assets/blank.gif)
![](/assets/blank.gif)
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 "github.com/Shopify/sarama" 7 ) 8 9 func main() { 10 11 config := sarama.NewConfig() 12 config.Producer.RequiredAcks = sarama.WaitForAll 13 config.Producer.Partitioner = sarama.NewRandomPartitioner 14 config.Producer.Return.Successes = true 15 16 client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config) 17 if err != nil { 18 fmt.Println("producer close, err:", err) 19 return 20 } 21 22 defer client.Close() 23 for { 24 msg := &sarama.ProducerMessage{} 25 msg.Topic = "nginx_log" 26 msg.Value = sarama.StringEncoder("this is a good test, my message is good") 27 28 pid, offset, err := client.SendMessage(msg) 29 if err != nil { 30 fmt.Println("send message failed,", err) 31 return 32 } 33 34 fmt.Printf("pid:%v offset:%v\n", pid, offset) 35 time.Sleep(time.Second) 36 } 37 }
kafka示例
注意:Shopify/sarama的同步/异步producer,https://www.jianshu.com/p/666d2604e8f8
Windows启动程序往Linux上的kafka发送数据:
Linux上的kafka接收数据:
再来看一个kafka生产和消费示例:
![](/assets/blank.gif)
![](/assets/blank.gif)
1 package main 2 3 import ( 4 "fmt" 5 "github.com/Shopify/sarama" 6 ) 7 8 func main() { 9 // 新建一个arama配置实例 10 config := sarama.NewConfig() 11 // WaitForAll waits for all in-sync replicas to commit before responding. 12 config.Producer.RequiredAcks = sarama.WaitForAll 13 // NewRandomPartitioner returns a Partitioner which chooses a random partition each time. 14 config.Producer.Partitioner = sarama.NewRandomPartitioner 15 config.Producer.Return.Successes = true 16 17 // new producer 18 client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config) 19 if err != nil { 20 fmt.Println("producer close, err:", err) 21 return 22 } 23 defer client.Close() 24 25 // new message 26 msg := &sarama.ProducerMessage{} 27 msg.Topic = "food" 28 msg.Key = sarama.StringEncoder("fruit") 29 msg.Value = sarama.StringEncoder("apple") 30 31 // send message 32 pid, offset, err := client.SendMessage(msg) 33 if err != nil { 34 fmt.Println("send message failed,", err) 35 return 36 } 37 fmt.Printf("pid: %v, offset:%v\n", pid, offset) 38 39 // new message 40 msg2 := &sarama.ProducerMessage{} 41 msg2.Topic = "food" 42 msg2.Key = sarama.StringEncoder("fruit") 43 msg2.Value = sarama.StringEncoder("orange") 44 45 // send message 46 pid2, offset2, err := client.SendMessage(msg2) 47 if err != nil { 48 fmt.Println("send message failed,", err) 49 return 50 } 51 fmt.Printf("pid2: %v, offset2:%v\n", pid2, offset2) 52 53 fmt.Println("Produce success.") 54 }
produce
![](/assets/blank.gif)
![](/assets/blank.gif)
1 package main 2 3 import ( 4 "sync" 5 "github.com/Shopify/sarama" 6 "fmt" 7 ) 8 9 var wg sync.WaitGroup 10 11 func main() { 12 consumer, err := sarama.NewConsumer([]string{"192.168.30.136:9092"}, nil) 13 if err != nil { 14 fmt.Println("consumer connect error:", err) 15 return 16 } 17 fmt.Println("connnect success...") 18 defer consumer.Close() 19 20 partitions, err := consumer.Partitions("food") 21 if err != nil { 22 fmt.Println("geet partitions failed, err:", err) 23 return 24 } 25 26 for _, p := range partitions { 27 partitionConsumer, err := consumer.ConsumePartition("food", p, sarama.OffsetOldest) 28 if err != nil { 29 fmt.Println("partitionConsumer err:", err) 30 continue 31 } 32 wg.Add(1) 33 go func(){ 34 for m := range partitionConsumer.Messages() { 35 fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset) 36 } 37 wg.Done() 38 }() 39 } 40 wg.Wait() 41 42 fmt.Println("Consumer success.") 43 }
consumer
12. tailf组件使用
先导入第三方包:
github.com/hpcloud/tail
![](/assets/blank.gif)
![](/assets/blank.gif)
1 package main 2 3 import ( 4 "fmt" 5 "github.com/hpcloud/tail" 6 "time" 7 ) 8 func main() { 9 filename := "F:\\Go\\project\\src\\go_dev\\logCollect\\tailf\\my.log" 10 tails, err := tail.TailFile(filename, tail.Config{ 11 ReOpen: true, 12 Follow: true, 13 //Location: &tail.SeekInfo{Offset: 0, Whence: 2}, 14 MustExist: false, 15 Poll: true, 16 }) 17 if err != nil { 18 fmt.Println("tail file err:", err) 19 return 20 } 21 var msg *tail.Line 22 var ok bool 23 for { 24 msg, ok = <-tails.Lines 25 if !ok { 26 fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename) 27 time.Sleep(100 * time.Millisecond) 28 continue 29 } 30 fmt.Println("msg:", msg) 31 } 32 }
tailf示例代码
my.log文件内容(unix格式):
在Windows上,当我的上面代码里日志文件(my.log)为Windows格式,代码执行结果如下:
当时用notepade++将文件格式转换为Unix格式,执行代码结果如下:
注意:最后一行必须有换行符,否则该行无法读取。
13. 配置文件库使用
先导入第三方包:
github.com/astaxie/beego/config
1) 初始化配置库
iniconf, err := NewConfig("ini", "testini.conf") if err != nil {log.Fatal(err) }
2) 读取配置项
String(key string) string Int(key string) (int, error) Int64(key string) (int64, error) Bool(key string) (bool, error) Float(key string) (float64, error)
例如:
iniconf.String("server::listen_ip") iniconf.Int("server::listen_port")[server] listen_ip = "0.0.0.0" listen_port = 8080[logs] log_level=debug log_path=./logs/logagent.log[collect] log_path=/home/work/logs/nginx/access.log topic=nginx_log
![](/assets/blank.gif)
![](/assets/blank.gif)
1 package main 2 3 import ( 4 "fmt" 5 "github.com/astaxie/beego/config" 6 ) 7 8 func main() { 9 conf, err := config.NewConfig("ini", "./logcollect.conf") 10 if err != nil { 11 fmt.Println("new config failed, err:", err) 12 return 13 } 14 15 port, err := conf.Int("server::listen_port") 16 if err != nil { 17 fmt.Println("read server:port failed, err:", err) 18 return 19 } 20 21 fmt.Println("Port:", port) 22 log_level := conf.String("log::log_level") 23 if err != nil { 24 fmt.Println("read log_level failed, ", err) 25 return 26 } 27 fmt.Println("log_level:", log_level) 28 29 log_path := conf.String("log::log_path") 30 fmt.Println("log_path:", log_path) 31 }
config示例代码
配置文件内容:
[server] listen_ip = "0.0.0.0" listen_port = 8080[log] log_level=debug log_path=./logs/logagent.log[collect] log_path=/home/work/logs/nginx/access.log topic=nginx_log
执行结果:
14. 日志库的使用
先导入第三方包:
github.com/astaxie/beego/logs
1) 配置log组件
config := make(map[string]interface{}) config["filename"] = "./logs/logcollect.log" config["level"] = logs.LevelDebugconfigStr, err := json.Marshal(config) if err != nil {fmt.Println("marshal failed, err:", err)return }
2) 初始化日志组件
logs.SetLogger(“file”, string(configStr))
![](/assets/blank.gif)
![](/assets/blank.gif)
1 package main 2 3 import ( 4 "encoding/json" 5 "fmt" 6 "github.com/astaxie/beego/logs" 7 ) 8 9 func main() { 10 config := make(map[string]interface{}) 11 config["filename"] = "./logcollect.log" 12 config["level"] = logs.LevelDebug 13 14 configStr, err := json.Marshal(config) 15 if err != nil { 16 fmt.Println("marshal failed, err:", err) 17 return 18 } 19 20 logs.SetLogger(logs.AdapterFile, string(configStr)) 21 22 logs.Debug("this is a test, my name is %s", "stu01") 23 logs.Trace("this is a trace, my name is %s", "stu02") 24 logs.Warn("this is a warn, my name is %s", "stu03") 25 }
logs示例
15. 日志收集项目整体实现
开发环境为Windows系统,go version go1.12.1 windows/amd64, kafka_2.11-2.0.0,zookeeper-3.4.12。
先实现了一个demo,V1版本:
(1)代码结构图
(2)代码地址见本人github:https://github.com/XJL635438451/logCollectProject/tree/master
(3)如何运行
1)先安装 go, kafka,zookeeper;
2)先启动 zookeeper,然后启动kafka,下面是启动的命令;
启动ZK .\zkServer.cmd启动kafka F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties创建topic F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTest启动生产者: F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kafkaTest启动消费者: F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic nginx_log --from-beginning
3)如果自己不行写日志文件,可以运行代码中的 writeLogTest/log.go,然后运行 main.exe (如果自己修改了代码还需要重新编译);
4)可以起一个kafka的consumer来查看日志是否写入到了kafka,方法就是上面的启动生产者命令,如果正常就可以看到日志一直在kafka中刷新。
转载于:https://www.cnblogs.com/xuejiale/p/10657989.html
Go语言学习之11 日志收集系统kafka库实战相关推荐
- go语言日志收集系统
0.项目地址 完整项目的GitHub地址 https://github.com/taw19960426/learning-go-language/tree/main/go-log-collect 一. ...
- 号称下一代日志收集系统!来看看它有多强
点击下方公众号「关注」和「星标」 回复"1024"获取独家整理的学习资料! 关于日志收集.处理.分析的方案,其实是很多,常见的就是ELK组合,即:Elasticsearch + L ...
- 一起来解读分布式日志收集系统:Facebook Scribe
1.分布式日志收集系统:背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应 ...
- 分布式日志收集系统:Facebook Scribe
转载于博主新浪微博:http://weibo.com/freshairbrucewoo. 欢迎大家相互交流,共同提高技术. 以下是我在公司内部分享的关于分布式日志收集系统的PPT内容,现在与大家分享, ...
- python分布式日志收集系统_分布式日志收集系统Scribe原理
1.分布式日志收集系统:背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应 ...
- 系统调用日志收集系统
1,系统调用日志收集系统的意义. 系统调用是用户获取系统服务的唯一入口,因此对系统调用的安全调用直接关系到系统 的安全,假如有用户恶意不断地调用系统调用,将会导致系统负载增加,所以如果能收集到时谁 调 ...
- 借鉴开源框架自研日志收集系统
踏浪无痕 岂安科技高级架构师 十余年数据研发经验,擅长数据处理领域工作,如爬虫.搜索引擎.大数据应用高并发等.担任过架构师,研发经理等岗位.曾主导开发过大型爬虫,搜索引擎及大数据广告DMP系统目前负责 ...
- python分布式日志收集系统_Go实现海量日志收集系统(一)
项目背景 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 当系统机器比较少时,登陆到服务器上查看即可满足 当系统机器规模巨大,登陆到机器上查看几乎不现实 当然即使是机器规模不大,一个系统通常 ...
- 探秘Hadoop生态12:分布式日志收集系统Flume
这位大侠,这是我的公众号:程序员江湖. 分享程序员面试与技术的那些事. 干货满满,关注就送. 在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: 从Hadoop的业务开发流程 ...
最新文章
- 都是CPU松动若的祸
- QT的QMutexLocker类的使用
- valgrind 常见错误提示信息
- 捷信达登录信息代码133_报名!广州40所热门民办初中学费均价38678,有学校涨幅133%...
- mysql数据库之事务与存储过程
- SparkStructured报错:Failed to send RPC xxx to /xxx:34744: ClosedChannelException Lost executor
- 如何在应用内设计一份调查?
- 超全面的权限系统设计方案!
- MySQL 基础一(B站黑马程序员MySQL教程笔记)
- 2021.10.13 向日葵黑屏驱动卸载方法
- 以pdf转cad为例,所有格式之间任意转换
- Nvidia风扇速度自动调节工具推荐
- 如何看误差累积分布图
- JavaScript 通过注册表的形式调用搜狗输入法的手写输入功能
- android 4.4 zygote 开机速度,一种安卓系统快速开机的方法及装置的制造方法
- 临江仙·送钱穆父 | 苏轼
- python求组合数c(m、n)编程题_c语言编程问题,计算出从n 个不同元素中取出m 个元素(m≤n)的组合数。编写程序...
- Activity启动另一个Activity并返回的完整生命周期
- 耳麦没声音,耳麦不能说话
- <<视觉问答>>2021:Check It Again: Progressive Visual Question Answering via Visual Entailment
热门文章
- 【二叉树】牛客网:二叉树的镜像
- 【计算机网络笔记】计算机网络定义分类
- 50行代码实现的一个最简单的基于 DirectShow 的视频播放器
- python困境_学习 Python 编程的三种困境
- jquery 选择器 逗号
- IDEA离线使用本地maven仓库
- C++ 点(.)操作符和箭头(->)操作符
- Python3 GUI编程: 自带图形库 tkinter 学习教程
- java翻译数字串并打印_如何使用Java翻译字符串?
- JAVA→封装类Wrapper、字符串String及其方法、==与equals()、正则表达式、StringBuilder与StringBuffer、内嵌类