golang实现自己的事件驱动

众所周知,go中的异步操作都已经封装在了运行时的过程中,有关socket的网络的异步操作都封装到了go的netpoll中,从而简化了编程形式。本文也就根据evio库总结而来。

golang跨平台库

如何编写golang的跨平台库,现在主流的方式如下:

evserver
│   go.mod
│   main.go
│
└───show
│   │   show.go
│   │   show_darwin.go
│   │   show_linux.go

通过创建一个package,然后通过命名文件后缀为_{平台后缀}.go的形式,在不同平台运行的时候会自动编译对应的代码。

main.go代码

package mainimport ("evserver/show"
)func main() {show := show.GetDefault()show.ShowHello()
}

show.go

package showtype Show interface {ShowHello()
}

show_linux.go

package showimport "fmt"type osDefaulter struct {}func GetDefault() osDefaulter{return osDefaulter{}
}func(s osDefaulter) ShowHello(){fmt.Println("linux show")
}

show_darwin.go

package showimport "fmt"type osDefaulter struct {}func GetDefault() osDefaulter{return osDefaulter{}
}func(s osDefaulter) ShowHello(){fmt.Println("darwin show")
}

常见的设计的方式就是通过在package中定义一个接口,然后通过不同平台的后缀文件去调用不同的方式去实现,从而完成package统一的对外提供服务的方式,当然跨平台的库也可以有另外一种方式,即如下:

func ShowHello() string {if runtime.GOOS == "windows" {return "windows hello"} else {return "other paltform hello"}
}

但是这种方式针对简单的跨平台性能还可以,针对复杂的跨平台的功能就对代码侵入比较严重。

https://techblog.steelseries.com/2014/04/08/multi-platform-development-go.html

https://blog.gopheracademy.com/advent-2013/day-23-multi-platform-applications/

golang事件驱动-Kqueue

总体的代码目录如下:

evserver
│   go.mod
│   main.go
│
└───poll
│   │   poller.go
│   │   poller_darwin.go
│   │   poller_linux.go
main.go
package mainimport ("evserver/poll""fmt"
)func main() {IP := "127.0.0.1"Port := 6667s := &poll.Server{Ip:IP,Port: Port,}s.Init()s.Data = func(c *poll.Conn, in []byte) (out []byte) {out = inout = append(out, []byte("back")...)return}fmt.Printf(" running in %s:%d\n", IP, Port)poll.LoopRun(s)
}
poller.go
package pollimport ("log""net""syscall"
)const READ_FLAG  = 1
const WRITE_FLAG  = 2type Conn struct {fd         int              // file descriptorlnidx      int              // listener index in the server lns listout        []byte           // write buffersa         syscall.Sockaddr // remote socket addressreuse      bool             // should reuse input bufferopened     bool             // Connection opened event firedctx        interface{}      // user-defined contextloop       *loop            // Connected loop
}func (c *Conn) Context() interface{}       { return c.ctx }
func (c *Conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *Conn) Wake() {//if c.loop != nil {//  c.loop.poll.Trigger(c)//}
}type loop struct {idx     int            // loop index in the server loops listpoll    *Poll // epoll or kqueuepacket  []byte         // read packet bufferfdConns map[int]*Conn  // loop Connections fd -> Conn
}type Server struct {Ip stringPort intfd intData func(c *Conn, in []byte)(out []byte)
}func (s *Server)Init()  {fd ,err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)if err != nil {panic(err)}var serverAddr [4]byteip := s.IpIP := net.ParseIP(ip)if IP == nil {log.Fatal("Unable to process IP: ", ip)}copy(serverAddr[:], IP[12:16])if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {syscall.Close(fd)panic(err)}err = syscall.Bind(fd, &syscall.SockaddrInet4{Port: s.Port, Addr:serverAddr})if err != nil {panic(err)}err = syscall.Listen(fd, 1024)if err != nil {panic(err)}s.fd = fd
}func LoopRun(s *Server){l := &loop{poll:OpenPoll(),packet:  make([]byte, 0xFFFF),fdConns: make(map[int]*Conn),}l.poll.AddRead(s.fd)err := l.poll.Wait(func(fd int, note interface{}) error {if fd == s.fd {err := LoopAccept(s, l, fd)if err != nil {panic(err)}} else {c := l.fdConns[fd]flag := note.(int)if flag == READ_FLAG {LoopRead(s, l, c)} else {LoopWrite(s, l, c)}}return nil})if err != nil {panic(err)}
}func LoopAccept(s *Server, l *loop, fd int) error{nfd, sa, err := syscall.Accept(fd)if err != nil {if err != nil {if err == syscall.EAGAIN {return nil}return err}}if err := syscall.SetNonblock(nfd, true); err != nil {return err}c := &Conn{fd: nfd, sa: sa, loop: l}c.out = []byte{}l.fdConns[c.fd] = cl.poll.AddReadWrite(c.fd)return nil
}func LoopRead(s *Server, l *loop, c *Conn)error {var in []byten, err := syscall.Read(c.fd, l.packet)if n == 0 || err != nil {if err == syscall.EAGAIN {return nil}return LoopClose(s, l, c)}in = l.packet[:n]if !c.reuse {in = append([]byte{}, in...)}if s.Data != nil {out := s.Data(c, in)if len(out) > 0 {c.out = append(c.out[:0], out...)}}if len(c.out) != 0 {l.poll.ModReadWrite(c.fd)}return nil
}func LoopWrite(s *Server, l *loop, c *Conn)error {if c == nil {return nil}if c.out == nil || len(c.out) == 0 {return nil}n, err := syscall.Write(c.fd, c.out)if err != nil {if err == syscall.EAGAIN {return nil}return LoopClose(s, l, c)}if n == len(c.out) {// release the connection output page if it goes over page size,// otherwise keep reusing existing page.if cap(c.out) > 4096 {c.out = nil} else {c.out = c.out[:0]}} else {c.out = c.out[n:]}if len(c.out) == 0{l.poll.ModRead(c.fd)}return nil
}func LoopClose(s *Server, l *loop, c *Conn)error{delete(l.fdConns, c.fd)err := syscall.Close(c.fd)return err
}
poller_darwin.go
package pollimport ("syscall"
)// Poll ...
type Poll struct {fd      intchanges []syscall.Kevent_t
}// OpenPoll ...
func OpenPoll() *Poll {l := new(Poll)p, err := syscall.Kqueue()if err != nil {panic(err)}l.fd = p_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{Ident:  0,Filter: syscall.EVFILT_USER,Flags:  syscall.EV_ADD | syscall.EV_CLEAR,}}, nil, nil)if err != nil {panic(err)}return l
}// Close ...
func (p *Poll) Close() error {return syscall.Close(p.fd)
}// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {events := make([]syscall.Kevent_t, 128)for {n, err := syscall.Kevent(p.fd, p.changes, events, nil)if err != nil && err != syscall.EINTR {return err}p.changes = p.changes[:0]for i := 0; i < n; i++ {if fd := int(events[i].Ident); fd != 0 {var flag intif events[i].Filter == syscall.EVFILT_READ {flag = READ_FLAG} else {flag = WRITE_FLAG}if err := iter(fd, flag); err != nil {return err}}}}
}// AddRead ...
func (p *Poll) AddRead(fd int) {p.changes = append(p.changes,syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,},)
}// AddReadWrite ...
func (p *Poll) AddReadWrite(fd int) {p.changes = append(p.changes,syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,},syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE,},)
}// ModRead ...
func (p *Poll) ModRead(fd int) {p.changes = append(p.changes, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE,})
}// ModReadWrite ...
func (p *Poll) ModReadWrite(fd int) {p.changes = append(p.changes, syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE,})
}// ModDetach ...
func (p *Poll) ModDetach(fd int) {p.changes = append(p.changes,syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ,},syscall.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE,},)
}
运行测试
import socketsock = socket.socket()
sock.connect(("127.0.0.1", 6667))print(sock.send(b"data123121231232"))
print(sock.recv(4096))

此时运行程序go run main.go和测试客户端程序。

此时就可以看出简单的事件驱动的过程。

压测

将main.go进行修改,修改为http的处理方式。

package mainimport ("evserver/poll""fmt""strconv""strings""time"
)func main() {IP := "127.0.0.1"Port := 6667s := &poll.Server{Ip:IP,Port: Port,}s.Init()s.Data = func(c *poll.Conn, in []byte) (out []byte) {if in == nil {return}data := in//if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {//  // for testing minimal single packet request -> response.//  out = appendresp(nil, "200 OK", "", res)// return//}// process the pipelinevar req requestfor {leftover, err := parsereq(data, &req)if err != nil {// bad thing happenedout = appendresp(out, "500 Error", "", err.Error()+"\n")break} else if len(leftover) == len(data) {// request not ready, yetbreak}// handle the requestout = appendhandle(out, &req)data = leftover}return}res = "Hello World!\r\n"fmt.Printf(" running in %s:%d\n", IP, Port)poll.LoopRun(s)
}var res stringtype request struct {proto, method stringpath, query   stringhead, body    stringremoteAddr    string
}// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {return appendresp(b, "200 OK", "", res)
}// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {b = append(b, "HTTP/1.1"...)b = append(b, ' ')b = append(b, status...)b = append(b, '\r', '\n')b = append(b, "Server: error\r\n"...)b = append(b, "Date: "...)b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")b = append(b, '\r', '\n')if len(body) > 0 {b = append(b, "Content-Length: "...)b = strconv.AppendInt(b, int64(len(body)), 10)b = append(b, '\r', '\n')}b = append(b, head...)b = append(b, '\r', '\n')if len(body) > 0 {b = append(b, body...)}return b
}// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {sdata := string(data)var i, s intvar top stringvar clen intvar q = -1// method, path, proto linefor ; i < len(sdata); i++ {if sdata[i] == ' ' {req.method = sdata[s:i]for i, s = i+1, i+1; i < len(sdata); i++ {if sdata[i] == '?' && q == -1 {q = i - s} else if sdata[i] == ' ' {if q != -1 {req.path = sdata[s:q]req.query = req.path[q+1 : i]} else {req.path = sdata[s:i]}for i, s = i+1, i+1; i < len(sdata); i++ {if sdata[i] == '\n' && sdata[i-1] == '\r' {req.proto = sdata[s:i]i, s = i+1, i+1break}}break}}break}}if req.proto == "" {return data, fmt.Errorf("malformed request")}top = sdata[:s]for ; i < len(sdata); i++ {if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {line := sdata[s : i-1]s = i + 1if line == "" {req.head = sdata[len(top)+2 : i+1]i++if clen > 0 {if len(sdata[i:]) < clen {break}req.body = sdata[i : i+clen]i += clen}return data[i:], nil}if strings.HasPrefix(line, "Content-Length:") {n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)if err == nil {clen = int(n)}}}}// not enough datareturn data, nil
}

此时运行go run main.go,并将前文编写的原生的http和evio的http进行对比。同样进行wrk进行测试。

原生http

 wrk -t8 -c200 -d60s --latency  http://127.0.0.1:8000
Running 1m test @ http://127.0.0.1:80008 threads and 200 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency     4.60ms    6.67ms 229.63ms   93.36%Req/Sec     7.41k     1.73k   14.17k    72.51%Latency Distribution50%    2.86ms75%    4.06ms90%    7.82ms99%   35.12ms3537128 requests in 1.00m, 431.78MB readSocket errors: connect 0, read 27, write 0, timeout 0
Requests/sec:  58849.98
Transfer/sec:      7.18MB

evio的http(开启3个loop)

wrk -t8 -c200 -d60s --latency  http://127.0.0.1:7979
Running 1m test @ http://127.0.0.1:79798 threads and 200 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency     2.19ms  730.09us  36.98ms   93.56%Req/Sec    11.39k     1.37k   17.97k    68.33%Latency Distribution50%    2.02ms75%    2.37ms90%    2.68ms99%    4.24ms5444990 requests in 1.00m, 540.05MB read
Requests/sec:  90669.19
Transfer/sec:      8.99MB

自己实现的http

wrk -t8 -c200 -d60s --latency  http://127.0.0.1:6667
Running 1m test @ http://127.0.0.1:66678 threads and 200 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency     2.32ms    1.38ms  47.39ms   95.27%Req/Sec    11.06k     1.98k   31.58k    74.97%Latency Distribution50%    2.04ms75%    2.41ms90%    2.90ms99%    6.85ms5286393 requests in 1.00m, 529.36MB read
Requests/sec:  87953.74
Transfer/sec:      8.81MB
原生http evio处理http 手工实现http
Qps 58849.98 90669.19 87953.74

总结

本文主要就是梳理了一下go的跨平台的主流方式,并简单实现了有关mac的kqueue的过程,后续大家有兴趣可自行开发事件驱动的框架,当前比较火热的是evio,gev和gnet。由于本人才疏学浅,如有错误请批评指正。

golang-实现自己的事件驱动相关推荐

  1. Golang 1.14 发布 | 云原生生态周报 Vol. 39

    作者 | 陈俊.何淋波.李鹏.宋净超 业界要闻 Golang 1.14 发布 Golang Release 了 1.14 版本.该版本包含生产级别 go module,改进 defer 性能,以及 G ...

  2. Golang中用到的的Websocket库

    翻译自:How to Use Websockets in Golang 微信公众号:运维开发故事,作者:wanger 在不刷新页面的情况下发送消息并获得即时响应是我们认为理所当然的事情.但在过去,启用 ...

  3. [转]Golang中goroutine的调度器详解

    Go调度器原理浅析 来源:https://www.douban.com/note/300631999/ goroutine是golang的一大特色,或者可以说是最大的特色吧(据我了解),这篇文章主要翻 ...

  4. golang常见面试题总结

    前言 golang面经主要参考知乎大佬总结的面试题:https://zhuanlan.zhihu.com/p/519979757 计算机网络参考:https://blog.csdn.net/justl ...

  5. Golang优秀开源项目汇总(持续更新。。。)

    Golang优秀开源项目汇总(持续更新...) 我把这个汇总放在github上了, 后面更新也会在github上更新. https://github.com/hackstoic/golang-open ...

  6. golang面试问题汇总(陆续更新)

    由于本人在准备找golang的春招实习,所以开此贴方便记录 图床有些问题,最近准备搭建一个个人博客,到时候会放出链接 由于近期看到百度文库等抄袭搬运猖獗,故只放出部分内容,如需全部请私聊. golan ...

  7. 事件驱动库 libev 使用详解

    C/C++Linux服务器开发/后台架构师知识体系 libev 是一个通过 C 语言编写的,高性能的事件循环库,支持多种事件类型,与此类似的事件循环库还有 libevent.libubox 等,在此详 ...

  8. libgo高性能网络服务器,【开源】gnet: 一个轻量级且高性能的 Golang 网络库

    ![](https://ask.qcloudimg.com/http-save/1303222/sipe2g9n9h.png) # Github 主页 [https://github.com/panj ...

  9. 服务器系统goha,推荐一个轻量级且高性能的 Golang 网络库:gnet-Go语言中文社区...

    image Github 主页 博客原文 欢迎大家围观~~,目前还在持续更新,感兴趣的话可以 star 一下暗中观察哦. 简介 gnet 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络 ...

最新文章

  1. IntelliJ IDEA版本和junit版本不适配
  2. 浓烟滚滚!某市联通集体断网,谁的锅?
  3. TensorFlow模型保存和加载方法
  4. 理解 | 理解a: float=10
  5. 解决Android 启动模拟器是出现“Failed to allocate memory: 8”错误提示
  6. JQuery中.css()与.addClass()设置样式的区别
  7. PCA(主成分分析)思想及实现
  8. Longformer:超越RoBERTa,为长文档而生的预训练模型
  9. filebeat 解析日志 并发送到Elasticsearch
  10. 基于libpcan库can总线操作的Barrett 机械手控制及腕部六维力传感器驱动
  11. codeforces gym 100187M Heaviside Function
  12. 2021-01-11
  13. Compile warning: Embedded binary's NSExtensionActivationRule is TRUEPREDICATE
  14. python编写摇骰子游戏_python摇骰子猜大小的小游戏
  15. 牛客网SQL实战二刷 | Day1
  16. 一个网站的pv代表什么?
  17. 卷积神经网络(CNN)网络结构及模型原理介绍
  18. 【数据结构】赫夫曼树
  19. vue中,静态书写select的option选项时如何设置默认选中项
  20. 本地前后端联调跳过cas sso单点登录

热门文章

  1. OpenAI 以 10 亿美元出售「灵魂」,网友热评不再「Open」
  2. 想学Python?那这套教程再适合你不过了!!
  3. 程序员在地铁写代码遭疯狂吐槽!网友:装什么装
  4. ICLR 2020被爆半数审稿人无相关领域经验,同行评审制度在垮塌?
  5. Python快速入门,你想要的就在这里了!
  6. 双十一报名截止,决赛在即!AI Challenger2018极客峰会免费抢票!
  7. 【重磅】吴恩达又一项目Landing.ai曝光,这一次,他是要给传统制造业狠狠开刀!
  8. ICCV2017 | 一文详解GAN之父Ian Goodfellow 演讲《生成对抗网络的原理与应用》(附完整PPT)
  9. Spring Boot + Redis 实现各种操作,写得太好了吧!
  10. IDEA 解决 Java8 的数据流问题,用过的都说好!!!