0x0 需求

  消费Kafka的日志并写入ElasticSearch供查询

0x1 依赖库

golang版Kafka客户端 https://github.com/Shopify/sarama

golang版ElasticSearch客户端  https://github.com/elastic/go-elasticsearch

0x2 实现

总共分3部分

1、Kafka消费者

// LogJson json格式
type LogJson struct {Tag     string    `json:"tag"`Level   string    `json:"level"`File    string    `json:"file"`Time    time.Time `json:"@timestamp"`Message string    `json:"message"`
}type taskProcessor interface {AddTask(key string, val []byte)
}// MyConsumer 可关闭的带任务处理器的消费者
type MyConsumer struct {processor taskProcessorctx       context.Context
}// NewMyConsumer 构造
func NewMyConsumer(p taskProcessor, ctx context.Context) *MyConsumer {c := &MyConsumer{processor: p,ctx:       ctx,}return c
}// Setup 启动
func (consumer *MyConsumer) Setup(s sarama.ConsumerGroupSession) error {log.Printf("[main] consumer.Setup memberID=[%s]", s.MemberID())return nil
}// Cleanup 当退出时
func (consumer *MyConsumer) Cleanup(s sarama.ConsumerGroupSession) error {log.Printf("[main] consumer.Cleanup memberID=[%s]", s.MemberID())return nil
}// ConsumeClaim 消费日志
func (consumer *MyConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for {select {case message, ok := <-claim.Messages():if !ok {return nil}js := &LogJson{}if err := json.Unmarshal(message.Value, js); nil != err {fmt.Fprintf(os.Stderr, "[MyConsumer] ConsumeClaim json.Unmarshal err=[%s] topic=[%s] key=[%s] val=[%s]\n", err.Error(), message.Topic, message.Key, string(message.Value))} else {index := fmt.Sprintf("%s-%s", message.Topic, js.Time.Format("2006.01.02"))consumer.processor.AddTask(index, message.Value)session.MarkMessage(message, "")}case <-consumer.ctx.Done():return nil}}return nil
}

2、插入ElasticSearch的Worker

package elastic_workerimport ("context""encoding/json""fmt""log""runtime""sync""time""github.com/olivere/elastic"
)// Config 配置
type Config struct {MaxMessage int `xml:"max_msg"`          // 最大缓冲WorkerNum  int `xml:"worker_number"`    // 线程个数BatchSize  int `xml:"batch_size"`       // 每个批次最大条数TickTime   int `xml:"tick_millisecond"` // 处理频率
}type task struct {key stringval []byte
}// Worker 消息处理器
type Worker struct {msgQ   chan *taskclient *elastic.Clientwg     sync.WaitGroupconfig *Config
}// NewWorker 构造
func NewWorker(client *elastic.Client, cfg *Config) *Worker {w := &Worker{client: client,config: cfg,msgQ:   make(chan *task, cfg.MaxMessage),}return w
}// Run 开工
func (w *Worker) Run(ctx context.Context) {// 线程数thread := w.config.WorkerNumif thread <= 0 {thread = runtime.NumCPU()}// tickertickTime := time.Duration(w.config.TickTime) * time.Millisecondif tickTime <= 0 {tickTime = time.Duration(100) * time.Millisecond}// 启动for i := 0; i < thread; i++ {w.wg.Add(1)time.Sleep(tickTime / time.Duration(thread))go func(idx int) {// 构造一个service,server可以反复使用service := w.client.Bulk()service.Refresh("wait_for")defer service.Reset()log.Printf("[elastic_worker] worker[%d] start", idx)defer w.wg.Done()// tickerticker := time.NewTicker(tickTime)defer ticker.Stop()LOOP:for {select {case <-ctx.Done():log.Printf("[elastic_worker] worker[%d] is quiting", idx)// 要把通道里的全部执行完才能退出for {if num := w.process(service); num > 0 {log.Printf("[elastic_worker] worker[%d] process batch [%d] when quiting", idx, num)} else {break LOOP}time.Sleep(tickTime)}case <-ticker.C:if num := w.process(service); num > 0 {log.Printf("[elastic_worker] worker[%d] process batch [%d] ", idx, num)}}}log.Printf("[elastic_worker] worker[%d] stop", idx)}(i)}
}// AddTask 添加任务,goroutine safe
func (w *Worker) AddTask(key string, val []byte) {t := &task{key: key,val: val,}w.msgQ <- t
}// process 处理任务
func (w *Worker) process(service *elastic.BulkService) int {//service.Reset()// 每个批次最多w.config.BatchSize个
LOOP:for i := 0; i < w.config.BatchSize; i++ {// 有任务就加到这个批次,没任务就退出select {case m := <-w.msgQ:req := elastic.NewBulkIndexRequest().Index(m.key).Type("doc").Doc(json.RawMessage(m.val))service.Add(req)default:break LOOP}}total := service.NumberOfActions()if total > 0 {if resp, err := service.Do(context.Background()); nil != err {panic(err)} else {if resp.Errors {for _, v := range resp.Failed() {fmt.Println("service.Do failed", v)}panic("resp.Errors")}}}return total
}// Close 关闭 需要外面的context关闭,和等待msgQ任务被执行完毕
func (w *Worker) Close() {w.wg.Wait()if n := len(w.msgQ); n > 0 {log.Printf("[elastic_worker] worker Close remain msg[%d]", n)}
}

3、main.go

package mainimport ("context""encoding/xml""flag""fmt""io/ioutil""log""os""os/signal""runtime""strings""syscall""time""consumer""elastic_worker""github.com/Shopify/sarama""github.com/olivere/elastic"
)// Consumer Consumer配置
type ConsumerConfig struct {Topic       []string `xml:"topic"`Broker      string   `xml:"broker"`Partition   int32    `xml:"partition"`Replication int16    `xml:"replication"`Group       string   `xml:"group"`Version     string   `xml:"version"`
}// Config 配置
type Config struct {Consumer   ConsumerConfig        `xml:"consumer"`ElasticURL string                `xml:"elastic_url"`Filters    []string              `xml:"filter"`Worker     elastic_worker.Config `xml:"elastic_worker"`
}var (configFile = "" // 配置路径initTopic  = falselistTopic  = falsedelTopic   = ""cfg        = &Config{}web        = ""
)func init() {flag.StringVar(&configFile, "config", "cfg.xml", "config file ")flag.BoolVar(&initTopic, "init", initTopic, "create topic")flag.BoolVar(&listTopic, "list", listTopic, "list topic")flag.StringVar(&delTopic, "del", delTopic, "delete topic")
}var (elasticClient *elastic.Client
)func main() {runtime.GOMAXPROCS(runtime.NumCPU())defer time.Sleep(time.Second)// 获取host名字hostName, err := os.Hostname()if nil != err {hostName = "[beats]"}// 加载配置if contents, err := ioutil.ReadFile(configFile); err != nil {panic(err)} else {if err = xml.Unmarshal(contents, cfg); err != nil {panic(err)}}// sarama的loggersarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", hostName), log.LstdFlags)// 指定kafka版本,一定要支持kafka集群version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)if err != nil {panic(err)}config := sarama.NewConfig()config.Version = versionconfig.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.ClientID = hostName// 工具if tool(cfg, config) {return} else {initTopic = truetool(cfg, config)}// 启动elastic客户端urls := strings.Split(cfg.ElasticURL, ",")if cli, err := elastic.NewClient(elastic.SetURL(urls...)); err != nil {panic(err)} else {elasticClient = cli// ping检查if ret, _, err := elasticClient.Ping(urls[0]).Do(context.Background()); nil != err {panic(err)} else {log.Printf("elasticClient.Ping %+v", ret)}defer elasticClient.Stop()}// ctxctx, cancel := context.WithCancel(context.Background())// Workerworker := elastic_worker.NewWorker(elasticClient, &cfg.Worker)worker.Run(ctx)defer worker.Close()// kafka consumer clientkafkaClient, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)if err != nil {panic(err)}consumer := consumer.NewMyConsumer(worker, ctx)go func() {for {select {case <-ctx.Done():returndefault:err := kafkaClient.Consume(ctx, cfg.Consumer.Topic, consumer)if err != nil {log.Printf("[main] client.Consume error=[%s]", err.Error())time.Sleep(time.Second)}}}}()// os signalsigterm := make(chan os.Signal, 1)signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)//time.Sleep(time.Second * 4)sig := <-sigtermlog.Printf("[main] os sig=[%v]", sig)cancel()log.Printf("[main] cancel")if err := kafkaClient.Close(); nil != err {log.Printf("[main] kafkaClient close error=[%s]", err.Error())}log.Printf("[main] beats quit")
}func tool(cfg *Config, config *sarama.Config) bool {if initTopic || listTopic || len(delTopic) > 0 {ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)if nil != err {panic(err)}if len(delTopic) > 0 { // 删除Topicif err := ca.DeleteTopic(delTopic); nil != err {panic(err)}log.Printf("delete ok topic=[%s]\n", delTopic)} else if initTopic { // 初始化Topicif detail, err := ca.ListTopics(); nil != err {panic(err)} else {for _, v := range cfg.Consumer.Topic {if d, ok := detail[v]; ok {if cfg.Consumer.Partition > d.NumPartitions {if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {panic(err)}log.Println("alter topic ok", v, cfg.Consumer.Partition)}} else {if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {panic(err)}log.Println("create topic ok", v)}}}}// 显示Topic列表if detail, err := ca.ListTopics(); nil != err {log.Println("ListTopics error", err)} else {for k := range detail {log.Printf("[%s] %+v", k, detail[k])}}if err := ca.Close(); nil != err {panic(err)}return true}return false
}

0x3 配置文件

<?xml version="1.0" encoding="utf-8"?>
<config><consumer><!-- Kafka cluster --><broker>127.0.0.1:9092</broker><!-- topic 可以配多个--><topic>top1</topic><topic>top2</topic><!-- Kafka 分组 --><group>test-group</group><!-- Kafka 版本 --><version>2.2.0</version><!-- partition 个数,开consumer个数不能超过这个 --><partition>16</partition><!-- 副本因子 --><replication>2</replication></consumer><elastic_url>http://127.0.0.1:9200</elastic_url><elastic_worker><!-- 最大缓冲 这个小点可以防止崩溃导致丢失太多--><max_msg>2048</max_msg><!-- 线程个数 --><worker_number>1</worker_number><!-- 每个批次最大数量 --><batch_size>1024</batch_size><!-- 处理频率(毫秒) --><tick_millisecond>5000</tick_millisecond></elastic_worker></config>

0x4 注意

1、如果你的ElasticSearch集群的配置足够高,你可以修改配置文件里的<worker_number>1</worker_number>给Worker开多协程,否则还是单协程性能更高一些。

2、可以适当调整<batch_size>1024</batch_size>每个批次的数量来提升写入性能。

3、如果报这个错误  EsRejectedExcutionException,说明ES性能扛不住了,需要提升配置,降低写入量。

转载于:https://www.cnblogs.com/mrblue/p/11251498.html

[Golang] 消费Kafka的日志提交到ElasticSearch相关推荐

  1. flink 消费 kafka offset 自动提交

    flink 消费kafka 程序重启后,从原先的自动提交的点继续消费,earliest 不用再从开始消费 如果开启了checkpoint 以 checkpoint为准 ,enable.auto.com ...

  2. Filebeat+Kafka+ELK日志采集(五)——Elasticsearch

    一.下载.安装.配置.启动: 1.下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.3.2-li ...

  3. 使用filbeat从kafka中消费json格式日志并发送到ElasticSearch

    环境 filbeat 7.10 kafka 2.1 elasticsearch 7.4.2 windows 10 需求描述 Java程序生产Json格式的日志发送到kafka中,再由filebeat从 ...

  4. 【Kafka】Flink 消费 kafka 部分 分区 一直不提交 offset

    文章目录 1.概述 1.1 查看消费组 1.2 检测topic 1.3 内置topic 1.4 flink数据监控 1.5 提交offset指标 M.扩展 1.概述 一个环境,突然发现,Flink消费 ...

  5. flink消费kafka从指定时间消费offset的日志

    有时生产上会按指定时间消费kafka的数据,具体日志如下: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] ...

  6. 搭建elk+logstash+kafka+filebeat日志收集平台

    文章目录 前言 组件介绍 原理图 环境介绍 安装 日志收集与展示 前言 在日常的运维过程中,对系统日志和业务日志的处理比较重要,对于以后的数据分析.排查异常问题有很重的作用.今天就分享一个自己基于ka ...

  7. 基于kafka的日志收集

    目录 一.环境准备 Ⅰ.准备好三台虚拟机用于搭建nginx和kafka集群 Ⅱ.配置静态ip地址 Ⅲ.修改主机名 Ⅳ.域名解析 ​编辑 Ⅴ.安装基本软件 Ⅵ.安装时间同步服务 Ⅶ.关闭防火墙 二.ng ...

  8. 【FLink】Flink 消费 kafka 消费组 死掉 Marking the coordinator dead for group 造成数据重复消费

    文章目录 1.概述 2.源码分析 2.2 能不能设置多次提交呢? 2.3 监控日志 1.概述 首先参考几个案例: [Flink]Flink Kafka 消费卡死 消费组卡死 topic无写入 实际有数 ...

  9. java kafka分布式_JavaWeb项目架构之Kafka分布式日志队列

    架构.分布式.日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了. kafka介绍 Kafka是由Apache软件基金会开发的一个开源流处理平台,由S ...

最新文章

  1. 关于css中overflow的一些理解
  2. device.cpp
  3. PDF Annotator 8中文版
  4. 微信公众平台开发(103) 四六级成绩查询
  5. imac android studio,Mac安装Android Studio的时候忘记安装Avd Manager怎么办?
  6. JavaScript的类型自动转换高级玩法JSFuck
  7. 天锐绿盾教您如何管控外接设备
  8. jdk、jre、jvm区别
  9. 【JavaWeb】(血泪踩雷史...)Token登录前后端交互及跨域问题
  10. java的方法覆盖与方法重载有什么异同_Java语言中方法重载与方法覆盖的异同
  11. 带栩字的优美古诗句_带栩字有寓意的男孩名字
  12. 忘了Linux服务器密码怎么办
  13. 计算机硕士可以入伍么,2021年下半年“征兵”已开始,大学生在校入伍好,还是毕业入伍好...
  14. HDP vs CDH
  15. [DB][Oracle]Oracle格式化数字的方法(指定小数点位数,每3位加逗号)
  16. 识别图书ISBN号并输出查询结果的示例
  17. Roblox入场教育游戏,是换道拥抱元宇宙还是新瓶装旧酒?
  18. Xshell建立SSH隧道连接
  19. win10 修改用户目录(%USERPROFILE%)位置
  20. 每周读书#9 - 《在路上,爱上从未有过的自己》

热门文章

  1. 【Jsp】第二课 Servlet入门学习(一)
  2. Win10关闭Windows防火墙
  3. Dubbo-1.Zookeeper基本配置
  4. Nodejs Day07 登录+路由
  5. 深刻理解Flink的有界流和无界流
  6. 通过rel=preload进行内容预加载
  7. Linux挂载磁盘出现只读的问题
  8. screen设置翻页
  9. 【算法提高——第三讲(一)】图论
  10. 电力电子simulink练习04:三相_桥式_整流