[Golang] 消费Kafka的日志提交到ElasticSearch
0x0 需求
消费Kafka的日志并写入ElasticSearch供查询
0x1 依赖库
golang版Kafka客户端 https://github.com/Shopify/sarama
golang版ElasticSearch客户端 https://github.com/elastic/go-elasticsearch
0x2 实现
总共分3部分
1、Kafka消费者
// LogJson json格式 type LogJson struct {Tag string `json:"tag"`Level string `json:"level"`File string `json:"file"`Time time.Time `json:"@timestamp"`Message string `json:"message"` }type taskProcessor interface {AddTask(key string, val []byte) }// MyConsumer 可关闭的带任务处理器的消费者 type MyConsumer struct {processor taskProcessorctx context.Context }// NewMyConsumer 构造 func NewMyConsumer(p taskProcessor, ctx context.Context) *MyConsumer {c := &MyConsumer{processor: p,ctx: ctx,}return c }// Setup 启动 func (consumer *MyConsumer) Setup(s sarama.ConsumerGroupSession) error {log.Printf("[main] consumer.Setup memberID=[%s]", s.MemberID())return nil }// Cleanup 当退出时 func (consumer *MyConsumer) Cleanup(s sarama.ConsumerGroupSession) error {log.Printf("[main] consumer.Cleanup memberID=[%s]", s.MemberID())return nil }// ConsumeClaim 消费日志 func (consumer *MyConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for {select {case message, ok := <-claim.Messages():if !ok {return nil}js := &LogJson{}if err := json.Unmarshal(message.Value, js); nil != err {fmt.Fprintf(os.Stderr, "[MyConsumer] ConsumeClaim json.Unmarshal err=[%s] topic=[%s] key=[%s] val=[%s]\n", err.Error(), message.Topic, message.Key, string(message.Value))} else {index := fmt.Sprintf("%s-%s", message.Topic, js.Time.Format("2006.01.02"))consumer.processor.AddTask(index, message.Value)session.MarkMessage(message, "")}case <-consumer.ctx.Done():return nil}}return nil }
2、插入ElasticSearch的Worker
package elastic_workerimport ("context""encoding/json""fmt""log""runtime""sync""time""github.com/olivere/elastic" )// Config 配置 type Config struct {MaxMessage int `xml:"max_msg"` // 最大缓冲WorkerNum int `xml:"worker_number"` // 线程个数BatchSize int `xml:"batch_size"` // 每个批次最大条数TickTime int `xml:"tick_millisecond"` // 处理频率 }type task struct {key stringval []byte }// Worker 消息处理器 type Worker struct {msgQ chan *taskclient *elastic.Clientwg sync.WaitGroupconfig *Config }// NewWorker 构造 func NewWorker(client *elastic.Client, cfg *Config) *Worker {w := &Worker{client: client,config: cfg,msgQ: make(chan *task, cfg.MaxMessage),}return w }// Run 开工 func (w *Worker) Run(ctx context.Context) {// 线程数thread := w.config.WorkerNumif thread <= 0 {thread = runtime.NumCPU()}// tickertickTime := time.Duration(w.config.TickTime) * time.Millisecondif tickTime <= 0 {tickTime = time.Duration(100) * time.Millisecond}// 启动for i := 0; i < thread; i++ {w.wg.Add(1)time.Sleep(tickTime / time.Duration(thread))go func(idx int) {// 构造一个service,server可以反复使用service := w.client.Bulk()service.Refresh("wait_for")defer service.Reset()log.Printf("[elastic_worker] worker[%d] start", idx)defer w.wg.Done()// tickerticker := time.NewTicker(tickTime)defer ticker.Stop()LOOP:for {select {case <-ctx.Done():log.Printf("[elastic_worker] worker[%d] is quiting", idx)// 要把通道里的全部执行完才能退出for {if num := w.process(service); num > 0 {log.Printf("[elastic_worker] worker[%d] process batch [%d] when quiting", idx, num)} else {break LOOP}time.Sleep(tickTime)}case <-ticker.C:if num := w.process(service); num > 0 {log.Printf("[elastic_worker] worker[%d] process batch [%d] ", idx, num)}}}log.Printf("[elastic_worker] worker[%d] stop", idx)}(i)} }// AddTask 添加任务,goroutine safe func (w *Worker) AddTask(key string, val []byte) {t := &task{key: key,val: val,}w.msgQ <- t }// process 处理任务 func (w *Worker) process(service *elastic.BulkService) int {//service.Reset()// 每个批次最多w.config.BatchSize个 LOOP:for i := 0; i < w.config.BatchSize; i++ {// 有任务就加到这个批次,没任务就退出select {case m := <-w.msgQ:req := elastic.NewBulkIndexRequest().Index(m.key).Type("doc").Doc(json.RawMessage(m.val))service.Add(req)default:break LOOP}}total := service.NumberOfActions()if total > 0 {if resp, err := service.Do(context.Background()); nil != err {panic(err)} else {if resp.Errors {for _, v := range resp.Failed() {fmt.Println("service.Do failed", v)}panic("resp.Errors")}}}return total }// Close 关闭 需要外面的context关闭,和等待msgQ任务被执行完毕 func (w *Worker) Close() {w.wg.Wait()if n := len(w.msgQ); n > 0 {log.Printf("[elastic_worker] worker Close remain msg[%d]", n)} }
3、main.go
package mainimport ("context""encoding/xml""flag""fmt""io/ioutil""log""os""os/signal""runtime""strings""syscall""time""consumer""elastic_worker""github.com/Shopify/sarama""github.com/olivere/elastic" )// Consumer Consumer配置 type ConsumerConfig struct {Topic []string `xml:"topic"`Broker string `xml:"broker"`Partition int32 `xml:"partition"`Replication int16 `xml:"replication"`Group string `xml:"group"`Version string `xml:"version"` }// Config 配置 type Config struct {Consumer ConsumerConfig `xml:"consumer"`ElasticURL string `xml:"elastic_url"`Filters []string `xml:"filter"`Worker elastic_worker.Config `xml:"elastic_worker"` }var (configFile = "" // 配置路径initTopic = falselistTopic = falsedelTopic = ""cfg = &Config{}web = "" )func init() {flag.StringVar(&configFile, "config", "cfg.xml", "config file ")flag.BoolVar(&initTopic, "init", initTopic, "create topic")flag.BoolVar(&listTopic, "list", listTopic, "list topic")flag.StringVar(&delTopic, "del", delTopic, "delete topic") }var (elasticClient *elastic.Client )func main() {runtime.GOMAXPROCS(runtime.NumCPU())defer time.Sleep(time.Second)// 获取host名字hostName, err := os.Hostname()if nil != err {hostName = "[beats]"}// 加载配置if contents, err := ioutil.ReadFile(configFile); err != nil {panic(err)} else {if err = xml.Unmarshal(contents, cfg); err != nil {panic(err)}}// sarama的loggersarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", hostName), log.LstdFlags)// 指定kafka版本,一定要支持kafka集群version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)if err != nil {panic(err)}config := sarama.NewConfig()config.Version = versionconfig.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.ClientID = hostName// 工具if tool(cfg, config) {return} else {initTopic = truetool(cfg, config)}// 启动elastic客户端urls := strings.Split(cfg.ElasticURL, ",")if cli, err := elastic.NewClient(elastic.SetURL(urls...)); err != nil {panic(err)} else {elasticClient = cli// ping检查if ret, _, err := elasticClient.Ping(urls[0]).Do(context.Background()); nil != err {panic(err)} else {log.Printf("elasticClient.Ping %+v", ret)}defer elasticClient.Stop()}// ctxctx, cancel := context.WithCancel(context.Background())// Workerworker := elastic_worker.NewWorker(elasticClient, &cfg.Worker)worker.Run(ctx)defer worker.Close()// kafka consumer clientkafkaClient, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)if err != nil {panic(err)}consumer := consumer.NewMyConsumer(worker, ctx)go func() {for {select {case <-ctx.Done():returndefault:err := kafkaClient.Consume(ctx, cfg.Consumer.Topic, consumer)if err != nil {log.Printf("[main] client.Consume error=[%s]", err.Error())time.Sleep(time.Second)}}}}()// os signalsigterm := make(chan os.Signal, 1)signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)//time.Sleep(time.Second * 4)sig := <-sigtermlog.Printf("[main] os sig=[%v]", sig)cancel()log.Printf("[main] cancel")if err := kafkaClient.Close(); nil != err {log.Printf("[main] kafkaClient close error=[%s]", err.Error())}log.Printf("[main] beats quit") }func tool(cfg *Config, config *sarama.Config) bool {if initTopic || listTopic || len(delTopic) > 0 {ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)if nil != err {panic(err)}if len(delTopic) > 0 { // 删除Topicif err := ca.DeleteTopic(delTopic); nil != err {panic(err)}log.Printf("delete ok topic=[%s]\n", delTopic)} else if initTopic { // 初始化Topicif detail, err := ca.ListTopics(); nil != err {panic(err)} else {for _, v := range cfg.Consumer.Topic {if d, ok := detail[v]; ok {if cfg.Consumer.Partition > d.NumPartitions {if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {panic(err)}log.Println("alter topic ok", v, cfg.Consumer.Partition)}} else {if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {panic(err)}log.Println("create topic ok", v)}}}}// 显示Topic列表if detail, err := ca.ListTopics(); nil != err {log.Println("ListTopics error", err)} else {for k := range detail {log.Printf("[%s] %+v", k, detail[k])}}if err := ca.Close(); nil != err {panic(err)}return true}return false }
0x3 配置文件
<?xml version="1.0" encoding="utf-8"?> <config><consumer><!-- Kafka cluster --><broker>127.0.0.1:9092</broker><!-- topic 可以配多个--><topic>top1</topic><topic>top2</topic><!-- Kafka 分组 --><group>test-group</group><!-- Kafka 版本 --><version>2.2.0</version><!-- partition 个数,开consumer个数不能超过这个 --><partition>16</partition><!-- 副本因子 --><replication>2</replication></consumer><elastic_url>http://127.0.0.1:9200</elastic_url><elastic_worker><!-- 最大缓冲 这个小点可以防止崩溃导致丢失太多--><max_msg>2048</max_msg><!-- 线程个数 --><worker_number>1</worker_number><!-- 每个批次最大数量 --><batch_size>1024</batch_size><!-- 处理频率(毫秒) --><tick_millisecond>5000</tick_millisecond></elastic_worker></config>
0x4 注意
1、如果你的ElasticSearch集群的配置足够高,你可以修改配置文件里的<worker_number>1</worker_number>给Worker开多协程,否则还是单协程性能更高一些。
2、可以适当调整<batch_size>1024</batch_size>每个批次的数量来提升写入性能。
3、如果报这个错误 EsRejectedExcutionException,说明ES性能扛不住了,需要提升配置,降低写入量。
转载于:https://www.cnblogs.com/mrblue/p/11251498.html
[Golang] 消费Kafka的日志提交到ElasticSearch相关推荐
- flink 消费 kafka offset 自动提交
flink 消费kafka 程序重启后,从原先的自动提交的点继续消费,earliest 不用再从开始消费 如果开启了checkpoint 以 checkpoint为准 ,enable.auto.com ...
- Filebeat+Kafka+ELK日志采集(五)——Elasticsearch
一.下载.安装.配置.启动: 1.下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.3.2-li ...
- 使用filbeat从kafka中消费json格式日志并发送到ElasticSearch
环境 filbeat 7.10 kafka 2.1 elasticsearch 7.4.2 windows 10 需求描述 Java程序生产Json格式的日志发送到kafka中,再由filebeat从 ...
- 【Kafka】Flink 消费 kafka 部分 分区 一直不提交 offset
文章目录 1.概述 1.1 查看消费组 1.2 检测topic 1.3 内置topic 1.4 flink数据监控 1.5 提交offset指标 M.扩展 1.概述 一个环境,突然发现,Flink消费 ...
- flink消费kafka从指定时间消费offset的日志
有时生产上会按指定时间消费kafka的数据,具体日志如下: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] ...
- 搭建elk+logstash+kafka+filebeat日志收集平台
文章目录 前言 组件介绍 原理图 环境介绍 安装 日志收集与展示 前言 在日常的运维过程中,对系统日志和业务日志的处理比较重要,对于以后的数据分析.排查异常问题有很重的作用.今天就分享一个自己基于ka ...
- 基于kafka的日志收集
目录 一.环境准备 Ⅰ.准备好三台虚拟机用于搭建nginx和kafka集群 Ⅱ.配置静态ip地址 Ⅲ.修改主机名 Ⅳ.域名解析 编辑 Ⅴ.安装基本软件 Ⅵ.安装时间同步服务 Ⅶ.关闭防火墙 二.ng ...
- 【FLink】Flink 消费 kafka 消费组 死掉 Marking the coordinator dead for group 造成数据重复消费
文章目录 1.概述 2.源码分析 2.2 能不能设置多次提交呢? 2.3 监控日志 1.概述 首先参考几个案例: [Flink]Flink Kafka 消费卡死 消费组卡死 topic无写入 实际有数 ...
- java kafka分布式_JavaWeb项目架构之Kafka分布式日志队列
架构.分布式.日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了. kafka介绍 Kafka是由Apache软件基金会开发的一个开源流处理平台,由S ...
最新文章
- 关于css中overflow的一些理解
- device.cpp
- PDF Annotator 8中文版
- 微信公众平台开发(103) 四六级成绩查询
- imac android studio,Mac安装Android Studio的时候忘记安装Avd Manager怎么办?
- JavaScript的类型自动转换高级玩法JSFuck
- 天锐绿盾教您如何管控外接设备
- jdk、jre、jvm区别
- 【JavaWeb】(血泪踩雷史...)Token登录前后端交互及跨域问题
- java的方法覆盖与方法重载有什么异同_Java语言中方法重载与方法覆盖的异同
- 带栩字的优美古诗句_带栩字有寓意的男孩名字
- 忘了Linux服务器密码怎么办
- 计算机硕士可以入伍么,2021年下半年“征兵”已开始,大学生在校入伍好,还是毕业入伍好...
- HDP vs CDH
- [DB][Oracle]Oracle格式化数字的方法(指定小数点位数,每3位加逗号)
- 识别图书ISBN号并输出查询结果的示例
- Roblox入场教育游戏,是换道拥抱元宇宙还是新瓶装旧酒?
- Xshell建立SSH隧道连接
- win10 修改用户目录(%USERPROFILE%)位置
- 每周读书#9 - 《在路上,爱上从未有过的自己》