Heka采集系统Output插件开发
2019独角兽企业重金招聘Python工程师标准>>>
1、插件接口实现:
注册插件实例
func init() {pipeline.RegisterPlugin("HttpOutput", func() interface{} {return new(HttpOutput)})
}
内部数据初始化
func (o *HttpOutput) ConfigStruct() interface{} {return &HttpOutputConfig{HttpTimeout: 0,Headers: make(http.Header),Method: "POST",}
}func (o *HttpOutput) Init(config interface{}) (err error) {o.HttpOutputConfig = config.(*HttpOutputConfig)if o.url, err = url.Parse(o.Address); err != nil {return fmt.Errorf("Can't parse URL '%s': %s", o.Address, err.Error())}if o.url.Scheme != "http" && o.url.Scheme != "https" {return errors.New("`address` must contain an absolute http or https URL.")}o.Method = strings.ToUpper(o.Method)if o.Method != "POST" && o.Method != "GET" && o.Method != "PUT" {return errors.New("HTTP Method must be POST, GET, or PUT.")}if o.Method != "GET" {o.sendBody = true}o.client = new(http.Client)if o.HttpTimeout > 0 {o.client.Timeout = time.Duration(o.HttpTimeout) * time.Millisecond}if o.Username != "" || o.Password != "" {o.useBasicAuth = true}if o.url.Scheme == "https" {transport := &http.Transport{}if transport.TLSClientConfig, err = tcp.CreateGoTlsConfig(&o.Tls); err != nil {return fmt.Errorf("TLS init error: %s", err.Error())}o.client.Transport = transport}return
}
实例运行
func (o *HttpOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (err error) {if or.Encoder() == nil {return errors.New("Encoder must be specified.")}var (e erroroutBytes []byte)inChan := or.InChan()for pack := range inChan {// tob, _ := json.Marshal(pack.Message)// fmt.Println(string(tob))outBytes, e = or.Encode(pack)pack.Recycle()if e != nil {or.LogError(e)continue}if outBytes == nil {continue}if e = o.request(or, outBytes); e != nil {or.LogError(e)}}return
}
一般实例Run接口中的实现基本类似,获取or.InChan,循环读取,调用Encode,回收pack,然后将encode的数据发送到输出源;
2、以Mongodb作为输出源为例,代码如下:
package mongoimport ("encoding/json""errors""fmt""github.com/mozilla-services/heka/pipeline""labix.org/v2/mgo""log""runtime""runtime/debug""strings""time"
)type MongoOutput struct {*MongoOutputConfigmgoSession *mgo.SessionlogMsgChan chan *logMessage
}type ServerId map[string]stringtype MongoOutputConfig struct {Address []stringLogMsgChSize intMgoWriterCount intSrvIds ServerIdUsername string `toml:"username"`Password string `toml:"password"`
}type logMessage struct {ServerId string `json:"Hostname"`ClientIp string `json:"remote_addr"`Host string `json:"host"`Cookie string `json:"http_cookie"`Referer string `json:"http_referer"`URI string `json:"request_uri"`Timestamp int64 `json:"timestamp"`UserAgent string `json:"http_user_agent"`
}//data transferred from agent.
type mgoMessage struct {Url string `json:"url"`UA string `json:"ua"`Referer string `json:"referer"`Cookie string `json:"cookie"`Ip string `json:"ip"`TimeStamp int64 `json:"timestamp"`
}func (o *MongoOutput) ConfigStruct() interface{} {return &MongoOutputConfig{Address: []string{"127.0.0.1:27017"},LogMsgChSize: 10000,MgoWriterCount: runtime.NumCPU(),SrvIds: make(ServerId),}
}func (o *MongoOutput) Init(config interface{}) (err error) {o.MongoOutputConfig = config.(*MongoOutputConfig)//todo: check address validity// if o.url, err = url.Parse(o.Address); err != nil {// return fmt.Errorf("Can't parse URL '%s': %s", o.Address, err.Error())// }//o.logMsgChan = make(chan *logMessage, o.LogMsgChSize)mgoInfo := mgo.DialInfo{Addrs: o.Address, Timeout: time.Second}o.mgoSession, err = mgo.DialWithInfo(&mgoInfo)if err != nil {log.Printf("initialize MongoOutput failed, %s", err.Error())return err}return
}func WriteDataToMgo(mo *MongoOutput) {defer func() {if err, ok := recover().(error); ok {log.Println("WARN: panic in %v", err)log.Println(string(debug.Stack()))}}()//srvlog.Printf("WriteDataToMgo key:%s", key)sessionCopy := mo.mgoSession.Copy()defer sessionCopy.Close()db := sessionCopy.DB("admin")var msg mgoMessagevar err errorfor logMsg := range mo.logMsgChan {if logMsg.ClientIp == "" || logMsg.UserAgent == "" {continue}keyName := mo.SrvIds[logMsg.ServerId]//fmt.Printf("%s, %s", logMsg.ServerId, keyName)if keyName == "" {log.Printf("no invalid mongo key for %s", logMsg.ServerId)continue}year, month, day := time.Now().Date()collName := fmt.Sprintf("%s_%d_%d_%d", keyName, year, month, day)coll := db.C(collName)msg.Url = fmt.Sprintf("http://%s%s", logMsg.Host, logMsg.URI)if logMsg.UserAgent != "-" {msg.UA = logMsg.UserAgent} else {msg.UA = ""}if logMsg.Referer != "-" {msg.Referer = logMsg.Referer} else {msg.Referer = ""}if logMsg.Cookie != "-" {msg.Cookie = logMsg.Cookie} else {msg.Cookie = ""}msg.Ip = logMsg.ClientIpmsg.TimeStamp = logMsg.Timestampif err != nil {log.Println(err.Error())continue}//fmt.Println("MongoOutput-119-%v", msg)err = coll.Insert(msg)if err != nil {log.Println(err.Error())continue}}
}func (o *MongoOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (err error) {if or.Encoder() == nil {return errors.New("Encoder must be specified.")}var (e erroroutBytes []byte)inChan := or.InChan()for i := 0; i < o.MgoWriterCount; i++ {go WriteDataToMgo(o)}for pack := range inChan {outBytes, e = or.Encode(pack)pack.Recycle()if e != nil {or.LogError(e)continue}if outBytes == nil {continue}subStrs := strings.Split(string(outBytes), "\n")//fmt.Printf("MongoOutput:len of subStrs:%d", len(subStrs))if len(subStrs) == 3 {logMsg := &logMessage{}//fmt.Printf("MongoOutput:%s\n", subStrs[1])e = json.Unmarshal([]byte(subStrs[1]), logMsg)if e != nil {log.Printf("MongoOutput-%s", err.Error())continue}//fmt.Printf("MongoOutput-165-%v", logMsg)o.logMsgChan <- logMsg}//fmt.Println("MongoOutput:", string(outBytes))}return
}func init() {pipeline.RegisterPlugin("MongoOutput", func() interface{} {return new(MongoOutput)})
}
以上代码已经运行在实际环境中,核心逻辑是将pack Encode后,解析返回数据,转换为Mongo存储的格式,然后将数据传递到mo.logMsgChan,Mongo写routine持续从该channel缓冲中读取数据,将数据写入mongodb中;
3、编译插件到Heka系统,介绍两种方式:
1)参考官网: http://hekad.readthedocs.org/en/v0.8.2/installing.html#build-include-externals
2)在调用Heka系统的build.sh后,会产生一个build目录,cd build/heka目录,发现目录结构符合标准的go工程目录,在该目录下建立install文件,install文件如下:
#!/usr/bin/env bashif [ ! -f install ]; then
echo 'install must be run within its container folder' 1>&2
exit 1
fiCURDIR=`pwd`
OLDGOPATH="$GOPATH"
export GOPATH="$OLDGOPATH:$CURDIR"gofmt -w src#go install hekad
go install github.com/mozilla-services/heka/cmd/hekad
#go install github.com/mozilla-services/heka/cmd/heka-flood
#go install github.com/mozilla-services/heka/cmd/heka-inject
#go install github.com/mozilla-services/heka/cmd/heka-logstreamer
#go install github.com/mozilla-services/heka/cmd/heka-cat
#go install github.com/mozilla-services/heka/cmd/heka-sbmgr
#go install github.com/mozilla-services/heka/cmd/heka-sbmgrload
#go test github.com/mozilla-services/heka/plugins/mongoexport GOPATH="$OLDGOPATH"echo 'finished'
每次在添加完插件后,需要在hekad/main.go文件import中添加一行代码:
_ "github.com/mozilla-services/heka/plugins/mongo"
ok之后,编译./install即可!
以上如有疑问,请email到cxwshawn@yeah.net
转载于:https://my.oschina.net/shawnChen/blog/361152
Heka采集系统Output插件开发相关推荐
- VSCode插件开发全攻略(六)开发调试技巧
更多文章请戳VSCode插件开发全攻略系列目录导航. 前言 在介绍完一些比较简单的内容点之后,我觉得有必要先和大家介绍一些开发中遇到的一些细节问题以及技巧,特别是后面一章节将要介绍WebView的知识 ...
- 基于FPGA多通道数据采集系统verilog设计
本设计实现多通道数据采集系统,该系统包括多通道数据采集和数据传输,使用verilog语言设计. 本设计实现功能:采集8路16位的AD数据,并发送到串口助手. 该设计架构图如下: 顶层模块代码如下: m ...
- Heka:Go编写,来自Mozilla,高效、灵活的插件式数据挖掘工具(转)
转自:http://www.csdn.net/article/2013-05-02/2815116-introduce-from-mozilla-heka-go 摘要:一直崇尚开源的Mozilla近日 ...
- Eclipse 插件开发 向导
阅读目录 最近由于特殊需要,开始学习插件开发. 下面就直接弄一个简单的插件吧! 1 新建一个插件工程 2 创建自己的插件名字,这个名字最好特殊一点,一遍融合到eclipse的时候,不会发生冲突. 3 ...
- JavaScript学习笔记(四)——jQuery插件开发与发布
jQuery插件就是以jQuery库为基础衍生出来的库,jQuery插件的好处是封装功能,提高了代码的复用性,加快了开发速度,现在网络上开源的jQuery插件非常多,随着版本的不停迭代越来越稳定好用, ...
- S03_CH03_AXI_DMA_OV7725摄像头采集系统
S03_CH03_AXI_DMA_OV7725摄像头采集系统 3.1概述 本课程讲解如何搭建基于DMA的图形系统,方案原理如下. 摄像头采样图像数据后通过DMA送入到DDR,在PS部分产生DMA接收中 ...
- Android 插件开发实现
Android插件开发 老生常谈问题 1 Android类加载 PathClassLoader DexClassLoader 两者都继承自BaseDexClassLoader 但是两者又有区别 Pat ...
- 在滴滴云 DC2 云服务器上搭建 ELK 日志采集系统
前段时间大学同学聚会,一个在读的博士同学谈到他们实验室做实验时,物理服务器需要向老师申请且组内同学都共用那些机器.由于运行一些大数据和人工智能的项目比较耗费资源,且对资源环境的隔离要求比较高,因而很多 ...
- 干货 | 携程机票Sketch插件开发实践
作者简介 尹正波,携程机票研发部前端工程师,专注设计和开发的交叉领域,用系统和工具改进设计体验和交付. Sketch 是伴随移动应用程序崛起而流行的 UI 设计工具.2014年 Sketch V3 增 ...
- 基于PYNQ的AD采集系统
基于PYNQ的AD采集系统 系统概述 AN706的控制 SPI通讯 AXI4-LITE总线 打包IP核 建立PYNQ工程 编写SDK程序 上板验证 代码下载地址 系统概述 打算用PYNQ-Z2开发板做 ...
最新文章
- P2261 [CQOI2007]余数求和
- C++11中override的使用
- 阿里Java面试题剖析:为什么使用消息队列?消息队列有什么优点和缺点?
- mysql数据被截断_有关Mysql数据截断问题的处理方法
- 读写应用程序数据-NSUserDefault、对象归档(NSKeyedArchiver)、文件操作
- Django 学习笔记之一 环境搭建
- Android 关于ZXing的使用
- ziplist之详细分析
- 查询计算机系统安装日期,windows10系统查询系统安装日期方法介绍
- c语言课程设计物业,C语言课程设计报告--物业管理系统.doc
- 考研数学常见的函数图像
- 拼刀刀店铺后台的参数anti-content逆向分析
- matlab 矩阵左右乘除
- 拼多多“出海”的三个考验?
- 《简单的逻辑学》阅读笔记(思维导图)
- oracle数据库基本命令使用汇总
- 风华贴片电容命名规则
- 图文笔记,带你走进《未来简史》(26-30)
- 学生老师的家教服务平台小程序制作
- 微机原理——汇编语言