目录

  • 一. 通过http服务端引出 go netpoll 多路复用
    • 端口绑定,fd初始化与开启监听
      • poll_runtime_pollServerInit
      • poll_runtime_pollOpen
      • pollDesc
      • FD
      • IO多路复用需要实现的五个函数
  • 二. 总结
    • 多路复用基础
    • 当前多路复用总结
    • 什么是水平触发/边缘触发
    • epoll为什么高效
    • 怎么实现创建epoll实例并将fd加入到epoll列表,也就是初始化过程
    • 怎么实现将goroutine挂起的
    • 怎么实现将goroutine唤醒的

一. 通过http服务端引出 go netpoll 多路复用

  1. 先说一下golang中对多路复用的支持: golang 在src/runtime/netpoll.go 调用了不同平台封装的多路复用api,例如linux环境下epoll封装的文件在src/runtime/netpoll_epoll.go中,windows环境下多路复用模型实现在src/runtime/netpoll_windows.go, 进而提供了多路复用的功能
  2. 还是以"net/http"构建http服务为例,通过net/http编写服务端时, 首先调用NewServeMux()创建多路复用器,编写对外接收请求的接口函数也就是处理器,然后调用多路复用器上的HandleFunc()方法,将接口与接口路径进行绑定,注册路由, 最后调用ListenAndServe()函数在指定端口开启监听,启动服务
func ListenAndServe(addr string, handler Handler) error {//1.创建Serverserver := &Server{Addr: addr, Handler: handler}//2.调用server下的ListenAndServe()return server.ListenAndServe()
}
  1. ListenAndServe中首先封装了一个Server结构体变量,然后调用这个Server下的ListenAndServe(),我们先不管这个Server,先看一下server下的ListenAndServe()方法,这就是多路复用初始化,注册事件等核心, ,查看该方法:
  1. 执行"net.Listen(“tcp”, addr)": 多路复用相关初始化,初始化socket,端口连接绑定,开启监听
  2. “srv.Serve(ln)”: 内部会启动一个无限for循环,循环内执行Accept(),等待接收客户端连接,这里也有多路复用相关的业务
  3. 当接收到连接后Accept()返回,继续向下执行,会启动协程处理一个一个的请求,会执行"go c.serve()"
func (srv *Server) ListenAndServe() error {if srv.shuttingDown() {// 如果Server已关闭,直接返回 ErrServerClosedreturn ErrServerClosed }addr := srv.Addrif addr == "" {// 如果不指定服务器地址信息,默认以":http"作为地址信息addr = ":http" }// 创建TCPListener,接收客户端的连接请求// 第一个参数network: 可以是 tcp、tcp4、tcp6、 unix 或者 unixpacket,// 第二个参数address: 可以用主机名(hostname),但是不建议,因为这样创建的listener(监听器)最多监听主机的一个ip// 如果 address 参数的 port 为空或者"0",如"127.0.0.1:"或者"[::1]:0",将自动选择一个端口号ln, err := net.Listen("tcp", addr)  if err != nil {return err}// 调用Server.Serve()函数并返回return srv.Serve(ln)
}
func Listen(network, address string) (Listener, error) {var lc ListenConfigreturn lc.Listen(context.Background(), network, address)
}
  1. 最终会执行到ListenConfig上的Listen()方法,这就是go多路复用初始化路口,该方法中先后执行了两个重要函数:
  1. DefaultResolver.resolveAddrList是根据协议名称和地址取得 Internet 协议族地址列表
  2. listenTCP或listenUnix从地址列表中取得满足条件的地址进行实际socket创建,fd创建,监听等操作, 具体根据传入的协议族来确定
//net/dial.go
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {//resolveAddrList()根据协议名称和地址取得 Internet 协议族地址列表addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)if err != nil {return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}}sl := &sysListener{ListenConfig: *lc,network:      network,address:      address,}var l Listenerla := addrs.first(isIPv4)//从地址列表中取得满足条件的地址进行实际socket创建,fd创建,监听等操作switch la := la.(type) {case *TCPAddr:l, err = sl.listenTCP(ctx, la)case *UnixAddr:l, err = sl.listenUnix(ctx, la)default://...}//...
}
  1. 查看用来创建socket,fd,添加监听的listenTCP()函数源码:
  1. 调用internetSocket()创建连接,添加监听,端口连接绑定
  2. 完成后封装一个TCPListener结构体返回
// net/tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {//调用internetSocket 创建socket fd监听fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)if err != nil {return nil, err}return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
  1. internetSocket()函数内部最终会调用到net/sock_posix.go下的一个socket()函数
  2. 在socket()函数中会:
  1. 调用sysSocket(),根据平台不同调用对应的api创建socket连接
  2. 调用setDefaultSockopts()设置socket选项
  3. 调用newFD()创建fb, 对返回的系统fd进行包装,封装为netFD结构表示,该接口描述原始 socket 的地址信息、协议类型、协议族以及 option,netFD在整个包装结构中居于用户接口的下一层
  4. 拿到fb后,开启监听,tcp会调用listenStream(),udp调用listenDatagram()
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {//...return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}// net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {//调用各平台对应的socket api创建sockets, err := sysSocket(family, sotype, proto)if err != nil {return nil, err}//设置socket选项,设置了socket的一些属性,例如例如是否只支持ipv6if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {poll.CloseFunc(s)return nil, err}//创建fdif fd, err = newFD(s, family, sotype, net); err != nil {poll.CloseFunc(s)return nil, err}//监听,//laddr != nil && raddr == ni:如果传入了本地地址,没有传入远端地址,则认为新的socket是用来监听的if laddr != nil && raddr == nil {switch sotype {case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET://TCPif err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {fd.Close()return nil, err}return fd, nilcase syscall.SOCK_DGRAM://UDPif err := fd.listenDatagram(laddr, ctrlFn); err != nil {fd.Close()return nil, err}return fd, nil}}//发起连接,非listen socket会走到这里来if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {fd.Close()return nil, err}return fd, nil
}
  1. 在sysSocket()创建socket函数中会调用:
  1. socketFunc创建了 socket,并设置 socket 非阻塞(SOCK_NONBLOCK)以及 fork 时关闭(SOCK_CLOEXEC)标志,注意这两个标志是在 linux 内核版本 2.6.27 之后添加,在此之前的版本代码将会走到syscall.ForkLock.RLock(),主要是为了防止在 fork 时导致文件描述符泄露
//family是AF_INET或者AF_INET6,即ipv4或者ipv6
//sotype是SOCK_STREAM或者SOCK_DGRAM,即tcp或者udp
//SOCK_NONBLOCK是将socket设置为非阻塞
//SOCK_CLOEXEC是将socket设置为close-on-exec
//proto默认0
func sysSocket(family, sotype, proto int) (syscall.Handle, error) {s, err := wsaSocketFunc(int32(family), int32(sotype), int32(proto),nil, 0, windows.WSA_FLAG_OVERLAPPED|windows.WSA_FLAG_NO_HANDLE_INHERIT)if err == nil {return s, nil}syscall.ForkLock.RLock()//系统socket函数s, err = socketFunc(family, sotype, proto)if err == nil {syscall.CloseOnExec(s)}// linux内核版本低于2.6.27时,代码会走到这里,下面的内容主要是防止在fork时候导致描述符泄露// 实际上手动实现简易版SOCK_CLOEXECsyscall.ForkLock.RUnlock()if err != nil {return syscall.InvalidHandle, os.NewSyscallError("socket", err)}return s, nil
}

端口绑定,fd初始化与开启监听

  1. 以TCP连接时执行netFD下的listenStream()方法启动监听为例进行解释
  1. 检查未完成连接和已完成连接两个队列是否超出系统预设。
  2. 调用 socket bind 接口,将接收到的地址绑定到我们创建的socket上
  3. 调用 socket listen 接口。
  4. 调用fd下的init()初始化 fd。
  5. 调用 socket getsockname 接口。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {var err error//......//完成端口和socket的关联if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {return os.NewSyscallError("bind", err)}//完成监听操作//listenFunc: 是一个方法变量,存储各种操作系统的Listen方法if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {return os.NewSyscallError("listen", err)}//内部会调用poll.FD.Init 最终调用到pollDesc.initif err = fd.init(); err != nil {return err}lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)fd.setAddr(fd.addrFunc()(lsa), nil)return nil
}
  1. 我们查看netFD下的init()初始化方法
func (fd *netFD) init() error {//TODO 该方法errcall, err := fd.pfd.Init(fd.net, true)//...return err
}
func (fd *FD) Init(net string, pollable bool) (string, error) {if initErr != nil {return "", initErr}switch net {case "file":fd.kind = kindFilecase "console":fd.kind = kindConsolecase "dir":fd.kind = kindDircase "pipe":fd.kind = kindPipecase "tcp", "tcp4", "tcp6","udp", "udp4", "udp6","ip", "ip4", "ip6","unix", "unixgram", "unixpacket":fd.kind = kindNetdefault:return "", errors.New("internal error: unknown network type " + net)}fd.isFile = fd.kind != kindNetvar err errorif pollable {//TODO err = fd.pd.init(fd)}if logInitFD != nil {logInitFD(net, fd, err)}if err != nil {return "", err}if pollable && useSetFileCompletionNotificationModes {// We do not use events, so we can skip them always.flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE)if net == "tcp" {flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS}err := syscall.SetFileCompletionNotificationModes(fd.Sysfd, flags)if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 {fd.skipSyncNotif = true}}switch net {case "udp", "udp4", "udp6":ret := uint32(0)flag := uint32(0)size := uint32(unsafe.Sizeof(flag))err := syscall.WSAIoctl(fd.Sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0)if err != nil {return "wsaioctl", err}}fd.rop.mode = 'r'fd.wop.mode = 'w'fd.rop.fd = fdfd.wop.fd = fdfd.rop.runtimeCtx = fd.pd.runtimeCtxfd.wop.runtimeCtx = fd.pd.runtimeCtxreturn "", nil
}
  1. 最终会调用到pollDesc下的init()方法,该方法中执行:
  1. 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
  2. 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中
//创建epoll实例,并把listener fd加入到监听队列
func (pd *pollDesc) init(fd *FD) error {// 调用到runtime.poll_runtime_pollServerInit// 如果epoll没有初始化,那就执行初始化操作,使用sync.Once确保只执行一次// 如果你在程序中启动了多个监听服务,那么只需要将listen_fd加入此epoll即可serverInit.Do(runtime_pollServerInit)// 调用到runtime.poll_runtime_pollOpen// Sysfd为系统的fd// 构建pd,并返回// 将fd加入到epollctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))//把真正初始化完成的pollDesc实例复制给当前pollDescpd.runtimeCtx=ctx...return nil
}

poll_runtime_pollServerInit

func poll_runtime_pollServerInit() {netpollGenericInit()
}//根据不同的系统平台调用特定的netpollinit创建epoll实例
func netpollGenericInit() {if atomic.Load(&netpollInited) == 0 {lock(&netpollInitLock)if netpollInited == 0 {//TODO netpollinit()atomic.Store(&netpollInited, 1)}unlock(&netpollInitLock)}
}//netpollinit 会创建一个epoll实例,然后把epoll fd 复制给epfd
//后续 listener 以及他的accept的所有sockets有关的epoll的操作都基于这个全局的epfd
func netpollinit() {epfd = epollcreate1(_EPOLL_CLOEXEC)if epfd < 0 {epfd = epollcreate(1024)if epfd < 0 {println("runtime: epollcreate failed with", -epfd)throw("runtime: netpollinit failed")}closeonexec(epfd)}r, w, errno := nonblockingPipe()if errno != 0 {println("runtime: pipe failed with", -errno)throw("runtime: pipe failed")}ev := epollevent{events: _EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRderrno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)if errno != 0 {println("runtime: epollctl failed with", -errno)throw("runtime: epollctl failed")}netpollBreakRd = uintptr(r)netpollBreakWr = uintptr(w)
}//事件的结构
type epollevent struct {events uint32  // 事件类型 _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET 等data   [8]byte // 额外数据
}

poll_runtime_pollOpen

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {//初始化总大小约为 4KB的pollDesc结构体pd := pollcache.alloc()lock(&pd.lock)//重置pd的属性值//...unlock(&pd.lock)var errno int32//调用netpollopen()向epoll实例epfd加入新的轮询事件监听文件描述符的可读和可写状态errno = netpollopen(fd, pd)return pd, int(errno)
}//netpollopen 注册fd到epoll实例
//注意这里使用的epoll的ET模式,同时会利用万能指针把pollDesc保存到epollevent第一个8为字节数组data里
func netpollopen(fd uintptr, pd *pollDesc) int32 {var ev epollevent// 可读 | 可写 | 对端断开 | 边缘触发ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET//存放user data,后面读写均会用到pollDesc*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd//注册epoll事件return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
  1. pollcache下的alloc()初始化总大小约为 4KB的pollDesc结构体链表,pollCache的链表头如果为空,那么初始化首节点,首节点是一个pollDesc的链表头,每次调用该结构体都会返回链表头还没有被使用的pollDesc,初始化过程中会调用runtime.persistentalloc为这些数据结构分配不会被GC回收的内存,确保这些结构只能被epoll和kqueue在内核空间去引用
  2. 后续每次调用这个方法会先判断链表是否已经分配了,如果是,则解决返回pollDesc,这种方式有效提高了netpoller的吞吐量
  3. runntime会在关闭pollDesc时调用runtime.pollCache.free释放内存

pollDesc

  1. 上方执行pollcache下的alloc(),在第一次调用时会初始化总大小约为 4KB的pollDesc结构体,pollDesc 结构体会使用 link 字段串联成链表存储在 runtime.pollCache 中
  2. 查看pollDesc内部属性,其中有两个二进制的原子信号量,rg, wg,分别用来阻塞读G和写G
  1. rg值可能为: pdRead、pdWait、goroutine地址或nil(0)
  2. wg值可能为: pdRead、pdWait、goroutine地址或nil(0)
type pollDesc struct {link *pollDesc // 链表指针fd   uintptr   // 文件描述符指针atomicInfo atomic.Uint32rg atomic.Uintptr // 是否可读信号量,保存了用户态操作pollDesc的读协程地址wg atomic.Uintptr // 是否可写信号量,保存了用户态操作pollDesc写协程地址lock    mutex     // 互斥锁closing bool //是否正在关闭。表示了当前 pollDesc 是否正在关闭中。当为 true 时,意味着下一次 I/O 操作会失败user    uint32 rseq    uintptrrt      timer     // 读定时器,防止读超时rd      int64     // 可读的截止日期wseq    uintptr wt      timer     // 写定时器,防止写超时wd      int64     // 可写的截止日期self    *pollDesc
}
  1. rg, wg状态解释:
//1. pdWait是一个临时状态,当一个G来读或写这个未就绪的fd时,rg/wg会先被置为pdWait,之后有三种可能:
// a、此G成功的commit, 将rg/wg设置为G的指针,此时可以说,此G已经成功的park在了这个fd上
// b、并行的其他线程, 在commit之前,有可能将rg/wg置为了pdReady,G的park失败。
// c、并行的其他线程, 在commit之前,有超时/关闭的操作,于是rg/wg被置为0,G的park失败
pdWait  uintptr = 2//2. pointer此G已经成功的park在了这个fd上,之后:
//如果有就绪通知时,rg/wg会被置为pdReady
//如果有timeout/close操作时,rg/wg会被置为0,此时这个G就会被唤醒
pointer//3.pdReady表示此pd的 i/o 已经就绪,或者 timeout/closed。
//对应的G被唤醒,来进行读或写操作,完成之后,rg/wg的状态从pdReady切换到0,表示消费了这个通知,将状态Reset为0。
pdReady uintptr = 1//4. 状态流转顺序为:0 --> pdWait --> G pointer --> pdReady --> 0
//举个例子:当我们在用户态协程调用read阻塞时rg就被设置为该读协程,当内核态epoll_wait检测read就绪后就会通过rg找到这个协程让后恢复运行。rg,wg默认是0,rg为pdReady表示读就绪,可以将协程恢复,为pdWait表示读阻塞,协程将要被挂起。wg也是如此。

FD

  1. FD是 Go 中通用的文件描述符类型,net 包和 os 包用FD来表示网络连接或者文件,FD提供了用户接口层到 runtime 之间逻辑处理
type FD struct {// 读写锁fdmu fdMutex// 系统文件描述符Sysfd int// I/O poller.pd pollDesc// 用于在一次函数调用中读、写多个非连续缓冲区,这里主要是写iovecs *[]syscall.Iovec// 关闭文件时的信号量csema uint32// 如果此文件已设置为阻止模式,则为非零值isBlocking uint32// TCP或UDPIsStream bool// 读取到0字节时是否为错误,对于基于消息的基础socket而言为falseZeroReadIsEOF bool// 是否系统中真实文件还是socket连接isFile bool
}

IO多路复用需要实现的五个函数

  1. 上方通过执行netpollinit()初始化了epoll,实际上多路复用模块都要实现以下五个函数,这五个函数构成一个虚拟的接口, runtime/netpoll.go是一个代理,针对不同的平台会去加载不同的实现,比如以Linux下的epoll为例,会去调用runtime/netpoll_epoll.go下的方法
//初始化网络轮询器,通过 sync.Once 和 netpollInited 变量保证函数只会调用一次
func netpollinit()
//将 fd 加入到 epfd,pd 放入 event.data 使用边缘触发(ET)模式
func netpollopen(fd uintptr, pd *pollDesc) int32//启动网络轮询器,delta 参数为设置超时时间
//delta < 0 则无限阻塞,delta == 0则不会阻塞,delta > 0 则阻塞指定的纳秒
//返回值为已经就绪的goroutine列表
func netpoll(delta int64) gList//可以中断epoll_wait调用,如果它处于阻塞状态的话。
//计时器向前修改时间后会通过该函数中断网络轮询器
func netpollBreak()//判断文件描述符是否可以被轮询器使用
func netpollIsPollDescriptor(fd uintptr) boolfunc netpollIsPollDescriptor(fd uintptr) bool {return fd == uintptr(epfd) || fd == netpollBreakRd || fd == netpollBreakWr
}

二. 总结

多路复用基础

  1. 首先知道一下五种io模型有个概念

Blocking IO: 阻塞IO
NoneBlockin IO: 非阻塞IO
IO multiplexing (redis6实际应用的io) : IO多路复用
signal driven IO: 信号驱动IO
asynchronous IO: 异步IO

  1. 通过BIO,NIO 解释多路复用是怎么一步步演变出来的
  1. BIO网络通信时会小于accept()阻塞等待客户端连接,调用read()阻塞等待客户端请求,会产生两个阻塞,假设一个客户端没有执行完毕,会造成其它请求一直阻塞等待,解决这个问题,可以通过多线程的方式,新建线程处理每个客户端请求,但是操作系统中用户态不能直接开辟线程,需要调用内核来创建,会有用户态到内核态的上下文切换,十分耗费资源,所以提出NIO非阻塞式IO进行通信
  2. NIO网络通信中,将多个socket连接放入一个连接容器中,以Java I/O框架为例通过一个线程使用Java的Selector来同时监控多个通道,遍历这个连接容器拿到每个连接,然后调用read方法判断客户端是否传输数据,出现两个问题,要遍历所有连接,假设多个连接中只有少量的几个连接有请求数据,也要遍历一遍,问题二,调用read方法判断客户端是否有传输数据,遍历是在用户态进行的,调用内核态的read()方法,虽然read()不阻塞,但是涉及到用户态到内核态的切换,每遍历调用一次就要切换一次开销较大
  1. 进而提出了IO多路复用,
  1. 将请求封装为文件描述符FD(文件描述符可以是套接字、管道等 I/O 设备等),将多个请求的文件描述符保存到一个集合中
  2. 创建一个事件集合()event set)将需要监听的文件描述符添加到事件集合中,在Linux中,可以使用epoll_create函数创建一个epoll实例,并使用epoll_ctl函数将需要监听的文件描述符添加到epoll实例中
  3. 调用select、poll或epoll等系统调用,将事件集合传递给内核,并等待事件的发生,在Linux中.可以使用epoll_wait函数等待事件的发生,并将发生事件的文件描述符及其事件类型返回给应用程序。
  4. 当发送事件后根据返回的事件类型,进行相应的处理,例如:如果是读事件,就读取数据并进行处理,如果是写事件,就写入数据并进行处理,如果是连接事件,就接受连接并进行处理
  1. 多路复用的优点是:
  1. 将Socket请求, 管道等 I/O等封装为FD文件描述符,内部通过专门的线程去处理,减少线程的创建与销毁,减少资源浪费
  2. 以Linux为例,通过select、poll 或 epoll 等系统调用,将监听文件描述符集合传递给内核,直接放入Linux内核上,不再出现用户态到内核态的切换,直接从内核态获取结果,内核是非阻塞的,也减少资源浪费
  1. Linux中IO多路复用通过select, poll, epoll 三大函数实现,又被称为event driver IO事件驱动IO, 一个进程同时等待多个文件描述符也就是socket套接字,socket连接,其中任意一个进入就绪状态,select函数就可以返回,相当于监听到了事件开始执行
  2. 将socket对应的fd注册到epoll, 通过epoll来监听socket上的消息,整个过程只在调用select, poll, epoll这三个函数时才会阻塞,收发客户消息是不会阻塞的,整个进程或线程都被充分利用起来了
  3. epoll采用的是事件驱动机制,不是select、poll的轮询机制,epoll是非阻塞的,执行流程是:
  1. 调用epoll_create()创建一个epoll实例文件描述符
  2. 封装epoll_event ,设置关注的事件
  3. 调用epoll_ctl()添加绑定了事件的文件描述符,文件描述符在epoll中以红黑树的结构存储,用于快速查找就绪的描述符已经
  4. 调用epoll_wait()等待,内核会遍历所有已注册的文件描述符和事件类型,检查是否有事件已经就绪,如果有就绪的事件,内核会将它们添加到就绪队列中,并返回给应用程序。解除阻塞,应用程序可以通过遍历就绪队列,获取已经就绪的文件描述符及其对应的事件类型,并执行相应的回调函数来处理事件

当前多路复用总结

  1. 首先golang在net/http下对多路复用进行了支持,提供了五个函数,
//初始化网络轮询器
netpollinit()
//将 fd 加入到 epfd,pd 放入 event.data 使用边缘触发(ET)模式
netpollopen()
//启动网络轮询器
netpoll()
//中断epoll_wait调用
netpollBreak()
//判断文件描述符是否可以被轮询器使用
netpollIsPollDescriptor()
  1. golang 在src/runtime/netpoll.go中针对不同平台进行了指定实现,例如linux环境下epoll封装的文件在src/runtime/netpoll_epoll.go中,windows环境下多路复用模型实现在src/runtime/netpoll_windows.go,进而提供了多路复用功能
  2. 我们还是以通过"net/http"构建http服务为例,通过net/http编写服务端时, 首先调用NewServeMux()创建多路复用器,编写对外接收请求的接口函数也就是处理器,然后调用多路复用器上的HandleFunc()方法,将接口与接口路径进行绑定,注册路由, 最后调用ListenAndServe()函数在指定端口开启监听,启动服务,该函数内部会封装一个Server结构体变量,调用Server的ListenAndServe()方法,查看该方法:
  1. 执行"net.Listen(“tcp”, addr)": 多路复用相关初始化,初始化socket,端口连接绑定,开启监听
  2. “srv.Serve(ln)”: 内部会启动一个无限for循环,循环内执行Accept(),等待接收客户端连接,这里也有多路复用相关的业务
  3. 当接收到连接后Accept()返回,继续向下执行,会启动协程处理一个一个的请求,会执行"go c.serve()"
  1. 底层针对多路复用提供了pollDesc 和 FD两个结构体
  1. FD:表示文件描述符的一个类型, net/http 底层通过使用系统调用和文件描述符来进行网络 I/O 操作,FD实现了 io.Reader 和 io.Writer 接口,用于读取和写入网络数据
  2. pollDesc 用来管理这些文件描述符,持有FD,并向操作系统注册对它们的读、写、错误等事件的监听。
  3. 首先Server 对象使用 net.ListenAndServe 方法来启动服务时,net/http 底层会使用 net.Listen() 函数创建一个 TCP 监听器,并返回一个 net.Listener 对象。这个对象包含了一个文件描述符 FD,它表示监听器对应的底层网络连接
  4. 客户端建立连接时,net/http 底层会使用 accept() 函数接受连接,并创建一个新的 FD 对象来表示这个连接。这些连接的 FD 对象会与监听器中的 FD 关联起来,将其加入到 Server 的文件描述符集合中。
  5. 初始化时创建的 FD 对象和接收到请求创建的 FD 对象在作用上是相似的,但是它们的生命周期和使用方式是不同。初始化时创建的 FD 对象是为了监听底层网络连接而存在的,而接收到请求创建的 FD 对象则是为了处理单独的请求而存在的,这个 FD 对象会在请求处理结束后被销毁,并且在下次请求到来时重新创建,保证每次请求都使用一个全新的 FD 对象,确保请求间的隔离性和安全性
  6. 当 Serve 方法开始监听网络事件后,如果有数据可读或可写,就会触发 pollDesc 中的 runner 函数。这时候,runner 会调用 FD 的相关方法来处理网络 I/O 操作,例如读取请求数据、发送响应数据等。
  7. 在这个过程中,pollDesc 和 FD 扮演了底层 I/O 的角色,负责具体实现具体的 I/O 操作。通过 pollDesc 和 FD 的配合,net/http 能够实现对网络 I/O 的有效控制和处理
  1. 当前只关注初始化,在编写http服务时,会调用http.ListenAndServe() 监听指定端口,启动服务,该函数内最终会执行到"net.Listen(“tcp”, addr)"进行多路复用相关初始化,初始化socket,端口连接绑定,开启监听,在这个Listen()函数中
  1. 执行"DefaultResolver.resolveAddrList"根据协议名称和地址取得 Internet 协议族地址列表,
  2. 根据协议执行listenTCP或listenUnix 进行实际socket创建,fd创建,监听等操作, 具体根据传入的协议族来确定
  3. 最后封装一个TCPListener结构体返回
  1. 查看第二步骤listenTCP或listenUnix函数,会初始化多路复用epoll相关业务,以TCP为例,通过listenTCP()函数最终会调用到net/sock_posix.go下的一个socket(),在socket()函数中会:
  1. 调用sysSocket(),根据平台不同调用对应的api创建socket连接
  2. 调用setDefaultSockopts()设置socket选项
  3. 调用newFD()创建fb, 对返回的系统fd进行包装,封装为netFD结构表示,该接口描述原始 socket 的地址信息、协议类型、协议族以及 option,netFD在整个包装结构中居于用户接口的下一层
  4. 拿到fb后,开启监听,tcp会调用listenStream(),udp调用listenDatagram()
  1. 查看listenStream函数,内部重点关注调用了FD.Init()—>pollDesc.init(),最终会调用到pollDesc.init(),在该函数中
  1. 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
  2. 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中
  1. pollDesc是与 epoll 交互最重要的结构之一,可以理解为与 epoll 之间的桥梁:通过alloc()函数,在第一次调用时会初始化总大小约为 4KB的pollDesc结构体,pollDesc 结构体会使用 link 字段串联成链表存储在 runtime.pollCache 中,pollDesc中有两个比较重要的属性rg, wg分别用来阻塞读G和写G, 默认是0,rg为pdReady表示读就绪,可以将协程恢复,为pdWait表示读阻塞,协程将要被挂起。wg也是如此
  2. 自此多路复用器epoll初始化并添加事件成功,Listen函数返回TCPListener结构体变量

什么是水平触发/边缘触发

  1. Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!
  2. Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!

epoll为什么高效

  1. select/poll是需要将所有用户态的fd拷贝到内核态,每次调用select都需要将线程加入到所有监视socket的等待队列,数量巨大时这个效率比较慢。当其中监控的socket有数据到达的时候,还需要将线程从这些监控的socket队列中移除。并且返回之后,还需要将内核空间的数据(包括fd)拷贝到用户空间,然后还需要将所有的fd遍历一遍,对isset的fd进行处理
  2. epoll呢,epoll_create时创建内核高速cache区:就是建立连续的物理内存页,然后在之上建立slab层,简单的说就是物理上分配好你想要的大小的内存对象,每次使用时都是使用空闲的已分配好的对象、内核cache中建立个红黑树来存储通过epoll_ctl添加进来的fd,这些fd其实已经在内核态了,当你再次调用epoll_wait时,不需要再拷贝进内核态、内核cache中再建立就绪链表存储所有就绪的fd。epoll_ctl 时将fd添加到红黑树中(若存在则不添加),当fd有数据到达的时候,然后将这个fd添加到就绪列链表中。 epoll_wait时返回就绪链表里面的数据就可以了,所以这里只需要将就绪链表的数据从内核太拷贝到用户态

怎么实现创建epoll实例并将fd加入到epoll列表,也就是初始化过程

  1. 以原生构建原生http服务为例,在编写http服务时,会调用http.ListenAndServe() 监听指定端口,启动服务,查看该接口源码,内部会调用ListenAndServe,最终会调用到ListenConfig结构体上的一个Listen()方法, 该方法中
  1. DefaultResolver.resolveAddrList是根据协议名称和地址取得 Internet 协议族地址列表
  2. listenTCP或listenUnix从地址列表中取得满足条件的地址进行实际socket创建,fd创建,监听等操作, 具体根据传入的协议族来确定
    3, 最后封装一个TCPListener结构体返回
  1. 查看第二步骤listenTCP或listenUnix函数,会初始化多路复用epoll相关业务,以TCP为例,通过listenTCP()函数最终会调用到net/sock_posix.go下的一个socket(),在socket()函数中会:
  1. 调用sysSocket(),根据平台不同调用对应的api创建socket连接
  2. 调用setDefaultSockopts()设置socket选项
  3. 调用newFD()创建fb, 对返回的系统fd进行包装,封装为netFD结构表示,该接口描述原始 socket 的地址信息、协议类型、协议族以及 option,netFD在整个包装结构中居于用户接口的下一层
  4. 拿到fb后,开启监听,tcp会调用listenStream(),udp调用listenDatagram()
  1. 查看listenStream函数,内部重点关注调用了FD.Init()—>pollDesc.init(),最终会调用到pollDesc.init(),在该函数中
  1. 执行poll_runtime_pollServerInit:通过该函数最终调用到netpollinit(),封装一个epoll文件描述符实例epollevent,并且使用sync.Once封装保证程序中只会创建一个
  2. 执行poll_runtime_pollOpen: 调用alloc()初始化总大小约为 4KB的pollDesc结构体,调用netpollopen(),将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中

怎么实现将goroutine挂起的

  1. 还是以搭建原生http服务为例,执行http.ListenAndServe() 监听指定端口,启动服务,最终会调用到"net.Listen(“tcp”, addr) "进行多路复用的始化, 内部会进行一系列的初始化,例如netFD.init()–>poll.FD.init()–>poll.pollDesc.init()初始化创建一个pollDesc结构体变量,将可读,可写,对端断开,边缘触发 的监听事件注册到epollevent中
  2. 初始化动作执行完毕后,会拿到一个TCPListener,调用Accept()等待接收监听到的网络连接
  1. 查看Listener.Accept源码,内部会调用到netFD的accept方法,调用FD的Accept(),查看FD.Accept方法
  2. 该函数中存在一个for循环,循环内会调用 linux 系统的 accept() 接收新连接,此处的accept()会直接返回,并且会附带返回一个err变量,如果 err == nil 则表示正常建立新连接,
  3. 当err不为空,并且等于syscall.EAGAIN时,表示没有数据可读,执行pollDesc下的waitRead方法阻塞goroutine
  4. 查看pollDesc下 waitRead方法源码,最终会调用到一个netpollblock 函数, 检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 状态,如果不是就绪状态调用gopark()挂起当前 goroutine,并且在gopark时,会传递一个netpollblockcommit()回调函数
  5. 查看 gopark()源码,实现了挂起协程的效果,实际是基于GMP模型,在 Go 的运行时系统中,每个 goroutine 都有一个对应的 M和 P,当一个 goroutine 调用 gopark 函数时,它会将自己从 M 中移除,并标记为等待状态,最终会加入到网络轮询器(netpoller)的等待队列中,然后进入休眠状态
  1. 当接收到连接后,代码由accept()位置继续向下执行,开始Read或Write读写数据, 我们还是以原生http为例, 当accept返回后,会分装一个Conn连接变量, Conn是一个接口,内部提供了读写数据的方法,
  2. 查看Conn.Read方法, 内部会调用 netFD.Read --> FD.Read–>最后使用 syscall.Read 也就是Linux 的系统提供的读取函数完成数据读取,
  1. 查看syscall.Read会附带返回一个err, 如果为nil表示读到了数据正常返回,如果不为nil并且syscall.EAGAIN表示当前没有期待的 I/O 事件发生,也就是 socket 不可读,执行fd.pd.waitRead,挂起当前协程
  2. pollDesc.waitRead内部最终会调用到netpollblock(),与Accept相同,如果没有事件,会执行gopark()挂起协程等待事件
  3. pollDesc.waitWrite 写数据的内部实现原理和 pollDesc.waitRead 是一样的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait–>netpollblock()—>gopark()挂起协程等待事件
  1. 当接收到读写事件,会唤醒协程,由阻塞位置继续向下执行, 那么是怎么唤醒的,通过调用runtime.netpoll()返回一组已经准备就绪的 Goroutine开始执行,方法中会基于非阻塞模式调用 epollwait 方法,获取到就绪事件队列 events,然后遍历事件队列,调用 netpollready 方法将对应的 loop goroutine 添加到 gList 中返回给上层用于执行唤醒操作
  2. 什么时候会调用 runtime.netpoll:
  1. Go scheduler调度器运行的核心方法 runtime.schedule() 里会调用一个 runtime.findrunable() 方法,该方法中调用了 runtime.netpoll获取可运行的 goroutine
  2. go项目在运行时,会单独启动一个sysmon监控线程,用来做抢占式调度等操作, 在循环执行过程中检查距离上一次 runtime.netpoll 被调用是否超过了 10ms,若是则会去调用它拿到可运行的 goroutine 列表并通过调用 injectglist 把 g 列表放入全局调度队列或者当前 P 本地调度队列等待被执行

怎么实现将goroutine唤醒的

  1. 当接收到读写事件,会唤醒协程,由阻塞位置继续向下执行, 那么是怎么唤醒的,通过调用runtime.netpoll()返回一组已经准备就绪的 Goroutine开始执行,方法中会基于非阻塞模式调用 epollwait 方法,获取到就绪事件队列 events,然后遍历事件队列,调用 netpollready 方法将对应的 loop goroutine 添加到 gList 中返回给上层用于执行唤醒操作
  2. 什么时候会调用 runtime.netpoll:
  1. Go scheduler调度器运行的核心方法 runtime.schedule() 里会调用一个 runtime.findrunable() 方法,该方法中调用了 runtime.netpoll获取可运行的 goroutine
  2. go项目在运行时,会单独启动一个sysmon监控线程,用来做抢占式调度等操作, 在循环执行过程中检查距离上一次 runtime.netpoll 被调用是否超过了 10ms,若是则会去调用它拿到可运行的 goroutine 列表并通过调用 injectglist 把 g 列表放入全局调度队列或者当前 P 本地调度队列等待被执行

go 进阶 多路复用支持: 一. netpoller 初始化相关推荐

  1. Spring进阶教程之在ApplicationContext初始化完成后重定义Bean

    之前遇到一个很有意思的问题:我需要批量重定义特定类型的由Spring容器托管的Bean.具体体现在,我有很多控制器类(Controller)和校验器类(Validator),我希望他们都是多例(Pro ...

  2. http.ListenAndServe

    创建WWW服务实现HTTP通信大致可分为两个阶段:注册路由.监听启动 服务端创建Socket监听指定端口,等待客户端请求到来. 监听Socket接受客户端请求并建立连接以获取客户端Socket,服务端 ...

  3. php worker类,Workerman进阶之Worker类-id属性研究

    Workerman进阶之Worker类->id属性研究 龙行    PHP    2019-5-20    1815    0评论 先来看看官方给的例子 如果id===0 设置定时器 use W ...

  4. Qt编写物联网管理平台38-多种数据库支持

    一.前言 本系统设计之初就要求支持多种不同的数据库,比如sqlite.mysql.postgres.sqlserver等,甚至包括国产数据库比如人大金仓kingbase等,(由于现在国产化的大力推进, ...

  5. 学习笔记:C++初阶【C++入门、类和对象、C/C++内存管理、模板初阶、STL简介、string、vector、list、stack、queueu、模板进阶、C++的IO流】

    文章目录 前言 一.C++入门 1. C++关键字 2.命名空间 2.1 C语言缺点之一,没办法很好地解决命名冲突问题 2.2 C++提出了一个新语法--命名空间 2.2.1 命名空间概念 2.2.2 ...

  6. 如何根据C编程语言标准初始化结构

    我想初始化一个struct元素,拆分为声明和初始化. 这就是我所拥有的: typedef struct MY_TYPE {bool flag;short int value;double stuff; ...

  7. pytorch默认初始化_“最全PyTorch分布式教程”来了!

    前言 本文对使用pytorch进行分布式训练(单机多卡)的过程进行了详细的介绍,附加实际代码,希望可以给正在看的你提供帮助.本文分三个部分展开,分别是: 先验知识 使用过程框架 代码解析 若想学习分布 ...

  8. apache2.4.6支不支持jsp_Spring Boot中文参考指南(2.1.6)50、Kotlin 支持

    上一篇[49.4.测试你的自动配置] 下一篇[52.启用生产就绪功能] 英文原文:https://docs.spring.io/spring-boot/docs/2.1.6.RELEASE/refer ...

  9. 复制初始化和直接初始化

    string str("12345"); string str = "12345"; 在写代码时忽然想到这个两个有啥区别呢,其实这个还是c++基础薄弱的原因 于 ...

最新文章

  1. Docker核心技术之容器与镜像深入了解
  2. MongoDB 安装配置
  3. UA OPTI544 量子光学7 2-level system approximation的Density Matrix模型
  4. 全球与中国抗生素软膏市场运营现状十四五及前景规划分析报告2021-2027年版
  5. FPGA Quartus Prime 16.1安装及破解
  6. 一些网站github等无法连接服务器的解决办法
  7. python加入中小学课程_【python即将进入中学课堂,编程从小抓起,竟然在这几点上应验了】- 环球网校...
  8. 结合swiper使用图片懒加载
  9. java缓存管理器_使用@EnableCaching的Spring Boot默认缓存管理器
  10. jsp页面数据与action数据交互 使用导航图语言和set注入
  11. React Mixin
  12. 三菱MX Component通信应用
  13. Internet Explorer 包含五个预定义区域
  14. python爬虫requests库的使用及python正则表达式的使用
  15. Programming Ruby 读书笔记(六)
  16. 揭开手机app中摇一摇的神秘面纱
  17. python实现牛顿法_牛顿法和最速下降法的Python实现
  18. 振弦传感器及核心VM系列振弦采集模块
  19. npm 创建第一个Angular项目
  20. 我用Python告诉你武汉房价

热门文章

  1. 高斯消元法的算法介绍
  2. win10搜索功能不能用解决方案
  3. 开始使用MarkDown
  4. Linux 时间、时区设置
  5. Python的格式化输出(炒鸡详细)
  6. dir-612b虚拟服务器,D-Link DIR 612B路由器设置上网教程
  7. 历数金融危机 摘自http://www.ftchinese.com/sc/index.jsp
  8. PHP Redis的运用篇(一)
  9. html语言制作折线图,html5 canvas 实现简单绘制折线图
  10. MySQL表结构的管理