服务器端编程(linux epoll模型)

#!/usr/bin/env python#-*- coding:utf-8 -*-import socketimport selectimport Queue#创建socket对象serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)#设置IP地址复用serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)#ip地址和端口号server_address = ("127.0.0.1", 8888)#绑定IP地址serversocket.bind(server_address)#监听,并设置最大连接数serversocket.listen(10)print  "服务器启动成功,监听IP:" , server_address#服务端设置非阻塞serversocket.setblocking(False)  #超时时间timeout = 10#创建epoll事件对象,后续要监控的事件添加到其中epoll = select.epoll()#注册服务器监听fd到等待读事件集合epoll.register(serversocket.fileno(), select.EPOLLIN)#保存连接客户端消息的字典,格式为{}message_queues = {}#文件句柄到所对应对象的字典,格式为{句柄:对象}fd_to_socket = {serversocket.fileno():serversocket,}while True:print "等待活动连接......"#轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]events = epoll.poll(timeout)if not events:print "epoll超时无活动连接,重新轮询......"continueprint "有" , len(events), "个新事件,开始处理......"for fd, event in events:socket = fd_to_socket[fd]#如果活动socket为当前服务器socket,表示有新连接if socket == serversocket:connection, address = serversocket.accept()print "新连接:" , address#新连接socket设置为非阻塞connection.setblocking(False)#注册新连接fd到待读事件集合epoll.register(connection.fileno(), select.EPOLLIN)#把新连接的文件句柄以及对象保存到字典fd_to_socket[connection.fileno()] = connection#以新连接的对象为键值,值存储在队列中,保存每个连接的信息message_queues[connection]  = Queue.Queue()#关闭事件elif event & select.EPOLLHUP:print 'client close'#在epoll中注销客户端的文件句柄epoll.unregister(fd)#关闭客户端的文件句柄fd_to_socket[fd].close()#在字典中删除与已关闭客户端相关的信息del fd_to_socket[fd]#可读事件elif event & select.EPOLLIN:#接收数据data = socket.recv(1024)if data:print "收到数据:" , data , "客户端:",socket.getpeername()#将数据放入对应客户端的字典message_queues[socket].put(data)#修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)epoll.modify(fd, select.EPOLLOUT)#可写事件elif event & select.EPOLLOUT:try:#从字典中获取对应客户端的信息msg = message_queues[socket].get_nowait()except Queue.Empty:print socket.getpeername() , " queue empty"#修改文件句柄为读事件epoll.modify(fd, select.EPOLLIN)else :print "发送数据:" , data , "客户端:" , socket.getpeername()#发送数据socket.send(msg)#在epoll中注销服务端文件句柄
epoll.unregister(serversocket.fileno())
#关闭epoll
epoll.close()
#关闭服务器socket
serversocket.close()

客户端

import socket
import threading
import time
import randomdef client_con():sock = socket.socket()sock.connect(("127.0.0.1", 8004))data = "client data send"sock.send(data)data = sock.recv(1024)print(data)def start(times):for i in range(times):t = threading.Thread(target=client_con)t.start()if __name__ == "__main__":start(10)

简化伪代码
1.创建服务监听者对象sockserver并初始化
2.将sockserver对象添加到系统的读事件中(r.append(sockserver))

while True: 调用系统select函数获取当前触发对象r,_,_ = select.select(r,[],[],1)if 如果触发的事件是新请求连接:处理连接事件请求,并将新建立的连接加入到r监听列表中elif 如果触发是读事件:处理读事件elif 如果触发是写事件:处理写事件else:(如果错误等其他事件)其他事件处理

gunicorn执行流程

流图为:
1.先读配置文件
2.根据配置文件,生成相应的worker进程
3.管理相应的worker进程

伪代码
读取解析配置文件
根据配置文件
while True:生成子工作进程如果工作进程死亡或者数量超出,进行处理

gunicorn的工作模式概述:
一个完整的http请求
1.服务端触发新建连接请求
2.服务器读取请求发来的数据
3.解析发送来的数据
4.解析数据后进行相应处理,然后生成处理后的结果数据
5.将生成的结果数据发送给客户端
6.一个请求处理完成

其中,gunicorn只完成了1,2,3,5步骤4是由配置好的django等框架执行

gunicorn工作原理如下(侵权删):

讲解一个wsgi访问过程

# 从wsgiref模块导入:from wsgiref.simple_server import make_serverdef application(environ, start_response):start_response('200 OK', [('Content-Type', 'text/html')])return '<h1>Hello, web!</h1>'# 创建一个服务器,IP地址为空,端口是8000,处理函数是application:httpd = make_server('', 8000, application)print "Serving HTTP on port 8000..."# 开始监听HTTP请求:httpd.serve_forever()
    wsgi是通用网关接口,支持该接口的服务器都可以使用可以分析一下上述几个包源码 其中nagle_algorithm为拥塞控制算法,等数据量大到一定程度才发送所有数据避免小字节数据也发送其中处理的是阻塞的handle

make_server源码简要分析

def make_server(host, port, app, server_class=WSGIServer, handler_class=WSGIRequestHandler
):"""Create a new WSGI server listening on `host` and `port` for `app`"""# 初始化服务器类server = server_class((host, port), handler_class)  server.set_app(app)return server

默认server_class=WSGIServer, handler_class=WSGIRequestHandler,

其中

class WSGIServer(HTTPServer):"""BaseHTTPServer that implements the Python WSGI protocol"""application = Nonedef server_bind(self):"""Override server_bind to store the server name."""HTTPServer.server_bind(self)self.setup_environ()def setup_environ(self):# Set up base environmentenv = self.base_environ = {}env['SERVER_NAME'] = self.server_nameenv['GATEWAY_INTERFACE'] = 'CGI/1.1'env['SERVER_PORT'] = str(self.server_port)env['REMOTE_HOST']=''env['CONTENT_LENGTH']=''env['SCRIPT_NAME'] = ''def get_app(self):return self.applicationdef set_app(self,application):self.application = application

WSGIServer继承自HTTPServer

class HTTPServer(socketserver.TCPServer):allow_reuse_address = 1    # Seems to make sense in testing environmentdef server_bind(self):"""Override server_bind to store the server name."""socketserver.TCPServer.server_bind(self)host, port = self.socket.getsockname()[:2]self.server_name = socket.getfqdn(host)self.server_port = port

HTTPServer继承自socketserver.TCPServer, 当make_server后
httpd初始化得到的就是调用TCPServer的初始化方法

def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):"""Constructor.  May be extended, do not override."""BaseServer.__init__(self, server_address, RequestHandlerClass)self.socket = socket.socket(self.address_family,self.socket_type)if bind_and_activate:try:self.server_bind()self.server_activate()except:self.server_close()raise

其中BaseServer主要就是定义了
# 初始化
- init(server_address, RequestHandlerClass)
# 服务端循环接受请求
- serve_forever(poll_interval=0.5)
# 处理请求
- handle_request() # if you do not use serve_forever()
定义了三个主要的方法

当执行到self.server_bind()时,调用WSGIServer中重写过的server_bind方法,其中先调用了HTTPServer的server_bind(),然后再执行self.setip_environ()方法
http.server_forever()调用了BaseServer中的该方法

def serve_forever(self, poll_interval=0.5):"""Handle one request at a time until shutdown.Polls for shutdown every poll_interval seconds. Ignoresself.timeout. If you need to do periodic tasks, do them inanother thread."""self.__is_shut_down.clear()try:# XXX: Consider using another file descriptor or connecting to the# socket to wake this up instead of polling. Polling reduces our# responsiveness to a shutdown request and wastes cpu at all other# times.with _ServerSelector() as selector:selector.register(self, selectors.EVENT_READ)while not self.__shutdown_request:ready = selector.select(poll_interval)if ready:self._handle_request_noblock()self.service_actions()finally:self.__shutdown_request = Falseself.__is_shut_down.set()

当请求接受时,调用self._handle_request_noblock()

def _handle_request_noblock(self):"""Handle one request, without blocking.I assume that selector.select() has returned that the socket isreadable before this function was called, so there should be no risk ofblocking in get_request()."""try:request, client_address = self.get_request()except OSError:returnif self.verify_request(request, client_address):try:self.process_request(request, client_address)except:self.handle_error(request, client_address)self.shutdown_request(request)else:self.shutdown_request(request)

当请求进入后,调用self.process_request(request, client_address)

def process_request(self, request, client_address):"""Call finish_request.Overridden by ForkingMixIn and ThreadingMixIn."""self.finish_request(request, client_address)self.shutdown_request(request)

此时会调用self.finish_request(request, client_address)

 def finish_request(self, request, client_address):"""Finish one request by instantiating RequestHandlerClass."""self.RequestHandlerClass(request, client_address, self)

而其中self.RequestHandlerClass就是我们在make_server中传入的WSGIRequestHandler

分析一下WSGIRequestHandler
WSGIRequestHandler继承自BaseHTTPRequestHandler,
主要定义了handle方法

class WSGIRequestHandler(BaseHTTPRequestHandler):def handle(self):"""Handle a single HTTP request"""self.raw_requestline = self.rfile.readline(65537)if len(self.raw_requestline) > 65536:self.requestline = ''self.request_version = ''self.command = ''self.send_error(414)returnif not self.parse_request(): # An error code has been sent, just exitreturn# Avoid passing the raw file object wfile, which can do partial# writes (Issue 24291)stdout = BufferedWriter(self.wfile)try:handler = ServerHandler(self.rfile, stdout, self.get_stderr(), self.get_environ())handler.request_handler = self      # backpointer for logginghandler.run(self.server.get_app())finally:stdout.detach()

BaseHTTPRequestHandler继承自socketserver.StreamRequestHandler
主要定义了setup(),finish()两个方法

class StreamRequestHandler(BaseRequestHandler):def setup(self):self.connection = self.requestif self.timeout is not None:self.connection.settimeout(self.timeout)if self.disable_nagle_algorithm:self.connection.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY, True)# 将socket变成读写的文件self.rfile = self.connection.makefile('rb', self.rbufsize)  self.wfile = self.connection.makefile('wb', self.wbufsize)def finish(self):if not self.wfile.closed:try:self.wfile.flush()except socket.error:# A final socket error may have occurred here, such as# the local error ECONNABORTED.passself.wfile.close()self.rfile.close()

StreamRequestHandler继承自BaseRequestHandler
BaseRequestHandler定义如下

class BaseRequestHandler:def __init__(self, request, client_address, server):self.request = requestself.client_address = client_addressself.server = serverself.setup()try:self.handle()finally:self.finish()def setup(self):passdef handle(self):passdef finish(self):pass

当一个handler初始化的时候就会调用self.setup(), 然后self.handle(), 最后self.finish()
至此,WSGIRequestHandler调用了自己的handler方法,调用了StreamRequestHandler的setup()和finish(),没有使用BaseHTTPRequestHandler中的处理方法
在调用WSGIRequestHandler的handler方法时

handler = ServerHandler(self.rfile, stdout, self.get_stderr(),self.get_environ())
handler.request_handler = self      # backpointer for logging
handler.run(self.server.get_app())

ServerHandler类继承自SimpleHandler,
SimpleHandler继承自BaseHandler,
当执行handler.run(self.server.get_app())时,就是调用
BaseHandler中的run方法

 def run(self, application):"""Invoke the application"""try:self.setup_environ()self.result = application(self.environ, self.start_response)self.finish_response()except:try:self.handle_error()except:# If we get an error handling an error, just give up already!self.close()raise   # ...and let the actual server figure it out.def setup_environ(self):"""Set up the environment for one request"""env = self.environ = self.os_environ.copy()self.add_cgi_vars()env['wsgi.input']        = self.get_stdin()env['wsgi.errors']       = self.get_stderr()env['wsgi.version']      = self.wsgi_versionenv['wsgi.run_once']     = self.wsgi_run_onceenv['wsgi.url_scheme']   = self.get_scheme()env['wsgi.multithread']  = self.wsgi_multithreadenv['wsgi.multiprocess'] = self.wsgi_multiprocessif self.wsgi_file_wrapper is not None:env['wsgi.file_wrapper'] = self.wsgi_file_wrapperif self.origin_server and self.server_software:env.setdefault(‘SERVER_SOFTWARE',self.server_software)def start_response(self, status, headers,exc_info=None):"""'start_response()' callable as specified by PEP 3333"""if exc_info:try:if self.headers_sent:# Re-raise original exception if headers sentraise exc_info[0](exc_info[1]).with_traceback(exc_info[2])finally:exc_info = None        # avoid dangling circular refelif self.headers is not None:raise AssertionError("Headers already set!")self.status = statusself.headers = self.headers_class(headers)status = self._convert_string_type(status, "Status")assert len(status)>=4,"Status must be at least 4 characters"assert status[:3].isdigit(), "Status message must begin w/3-digit code"assert status[3]==" ", "Status message must have a space after code"if __debug__:for name, val in headers:name = self._convert_string_type(name, "Header name")val = self._convert_string_type(val, "Header value")assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
return self.writedef finish_response(self):"""Send any iterable data, then close self and the iterableSubclasses intended for use in asynchronous servers willwant to redefine this method, such that it sets up callbacksin the event loop to iterate over the data, and to call'self.close()' once the response is finished."""try:if not self.result_is_file() or not self.sendfile():for data in self.result:# 调用了SimpleHandler中的_write方法self.write(data)self.finish_content()finally:self.close()

至此一个wsgi接口就处理完成

入口函数
./app/wsgiapp.py run() 函数

主要类
WSGIApplication(Application)
父类Application 位于./app/base.py
1.检查配置处理的wsgi.application位置,并初始化
2.加载相应配置文件
3.最重要为,该run函数

def run(self):try:Arbiter(self).run()         #文件初始化配置完成后运行类except RuntimeError as e:print("\nError: %s\n" % e, file=sys.stderr)sys.stderr.flush()sys.exit(1)

主要类
Arbiter
位于Arbiter.py文件

开始于run()方法
def run(self):"Main master loop."         #主入口函数self.start()util._setproctitle("master [%s]" % self.proc_name)    #self.proc_name 配置文件中的进程名称配置try:self.manage_workers()              #管理workers  while True:self.maybe_promote_master()sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None   #当当前SIG_QUEUE列表中,是否有呆处理的信号if sig is None:                                                #如果没有需要处理事务则读取select中fdself.sleep()                                               #监测管道是否有数据可读self.murder_workers()                                      #杀死闲置的workerself.manage_workers()continueif sig not in self.SIG_NAMES:self.log.info("Ignoring unknown signal: %s", sig)continuesigname = self.SIG_NAMES.get(sig)                              #获取配置的信号处理函数handler = getattr(self, "handle_%s" % signame, None)           #获取对应的handler处理函数if not handler:                                                     self.log.error("Unhandled signal: %s", signame)continueself.log.info("Handling signal: %s", signame)handler()                                                      #执行该注册函数self.wakeup()                                                  #向管道中写入值except StopIteration:self.halt()except KeyboardInterrupt:self.halt()except HaltServer as inst:self.halt(reason=inst.reason, exit_status=inst.exit_status)except SystemExit:raiseexcept Exception:self.log.info("Unhandled exception in main loop",exc_info=True)self.stop(False)if self.pidfile is not None:self.pidfile.unlink()sys.exit(-1)

调用了start方法

    def start(self):"""\Initialize the arbiter. Start listening and set pidfile if needed."""self.log.info("Starting gunicorn %s", __version__)   if 'GUNICORN_PID' in os.environ:self.master_pid = int(os.environ.get('GUNICORN_PID'))      #获取主pidself.proc_name = self.proc_name + ".2"                     self.master_name = "Master.2"self.pid = os.getpid()                                  #获取当前进程pidif self.cfg.pidfile is not None:pidname = self.cfg.pidfile                          #配置文件中设置的pidfile名称if self.master_pid != 0:pidname += ".2"self.pidfile = Pidfile(pidname)                     #创建pid文件self.pidfile.create(self.pid)self.cfg.on_starting(self)                              #配置on_starting的回调函数self.init_signals()                                     #初始化管道  会重新定向执行文件的输出if not self.LISTENERS:fds = Nonelisten_fds = systemd.listen_fds()if listen_fds:self.systemd = Truefds = range(systemd.SD_LISTEN_FDS_START,systemd.SD_LISTEN_FDS_START + listen_fds)elif self.master_pid:fds = []for fd in os.environ.pop('GUNICORN_FD').split(','):fds.append(int(fd))self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)  #创建监听的对象 listeners_str = ",".join([str(l) for l in self.LISTENERS])self.log.debug("Arbiter booted")self.log.info("Listening at: %s (%s)", listeners_str, self.pid)self.log.info("Using worker: %s", self.cfg.worker_class_str)# check worker class requirementsif hasattr(self.worker_class, "check_config"):                   #对于选择不同的并发模式  查看是否有check_config属性self.worker_class.check_config(self.cfg, self.log)self.cfg.when_ready(self)                                       #在配置中注册当配置文件加载完成后, 执行的回调函数  (配置文件中有)

其中比较重要的是
self.init_signals()

    def init_signals(self):"""\Initialize master signal handling. Most of the signalsare queued. Child signals only wake up the master."""# close old PIPEif self.PIPE:[os.close(p) for p in self.PIPE]# initialize the pipeself.PIPE = pair = os.pipe()for p in pair:util.set_non_blocking(p)util.close_on_exec(p)self.log.close_on_exec()# initialize all signals[signal.signal(s, self.signal) for s in self.SIGNALS]          #注册信号函数   当该信号触发时  会触发self.singal函数signal.signal(signal.SIGCHLD, self.handle_chld)                #注册子进程被杀死时, 父进程的处理函数self.handle_chiddef signal(self, sig, frame):if len(self.SIG_QUEUE) < 5:               self.SIG_QUEUE.append(sig)                                 #添加self.SIG_QUEUE列表中self.wakeup()                                              #向管道中发送消息

在master中注册了信号处理函数,这样就通过信号来处理master对子程序的管理

    def handle_ttin(self):"""\SIGTTIN handling.Increases the number of workers by one."""self.num_workers += 1                                          # 新增一个工作进程self.manage_workers()def handle_ttou(self):"""\SIGTTOU handling.Decreases the number of workers by one."""if self.num_workers <= 1:                                      # 杀死一个工作进程returnself.num_workers -= 1self.manage_workers()def handle_usr1(self):"""\SIGUSR1 handling.Kill all workers by sending them a SIGUSR1"""self.log.reopen_files()self.kill_workers(signal.SIGUSR1)def handle_usr2(self):                                             # 在主进程不停止服务的情况下,重写读取配置文件运行新的配置文件"""\SIGUSR2 handling.Creates a new master/worker set as a slave of the currentmaster without affecting old workers. Use this to do livedeployment with the ability to backout a change."""self.reexec()

此时,执行完成self.start()后,然后执行

            self.manage_workers()              #管理workers  while True:self.maybe_promote_master()sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None   #当当前SIG_QUEUE列表中,是否有呆处理的信号if sig is None:                                                #如果没有需要处理事务则读取select中fdself.sleep()                                               #监测管道是否有数据可读self.murder_workers()                                      #杀死闲置的workerself.manage_workers()continueif sig not in self.SIG_NAMES:self.log.info("Ignoring unknown signal: %s", sig)continuesigname = self.SIG_NAMES.get(sig)                              #获取配置的信号处理函数handler = getattr(self, "handle_%s" % signame, None)           #获取对应的handler处理函数if not handler:                                                     self.log.error("Unhandled signal: %s", signame)continueself.log.info("Handling signal: %s", signame)handler()                                                      #执行该注册函数self.wakeup()                                                  #向管道中写入值

其中self.manage_workers(),是管理工作进程的数量是跟配置的工作进程数量一致,不能多也不能少。

    def manage_workers(self):"""\Maintain the number of workers by spawning or killingas required."""if len(self.WORKERS.keys()) < self.num_workers:        #如果当前workers的数量少于配置的workers的数量则新生成到配置文件那么多workersself.spawn_workers()workers = self.WORKERS.items()workers = sorted(workers, key=lambda w: w[1].age)     #对当前workers列表进行排序, 按照worker的时间排序while len(workers) > self.num_workers:                #如果当前工作workers列表的数量多于设置workers,则杀死对于的workers(pid, _) = workers.pop(0)self.kill_worker(pid, signal.SIGTERM)             #杀死多于的workeractive_worker_count = len(workers)if self._last_logged_active_worker_count != active_worker_count:self._last_logged_active_worker_count = active_worker_countself.log.debug("{0} workers".format(active_worker_count),extra={"metric": "gunicorn.workers","value": active_worker_count,"mtype": "gauge"})

其中self.spawn_workers()就是生成工作进程

    def spawn_workers(self):"""Spawn new workers as needed.This is where a worker process leaves the main loopof the master process."""for i in range(self.num_workers - len(self.WORKERS.keys())):   # 判断当前已经启动的子进程数量与配置的差距self.spawn_worker()                           #生成子进程  time.sleep(0.1 * random.random()) 

self.spawn_worker()就是生成一个工作进程

    def spawn_worker(self):self.worker_age += 1worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,self.app, self.timeout / 2.0,self.cfg, self.log)                              #worker_class的初始化self.cfg.pre_fork(self, worker)                                             #配置文件中配置参数, 配置文件中注册回调函数pid = os.fork()                                                             #生成子进程if pid != 0:worker.pid = pidself.WORKERS[pid] = workerreturn pid# Process Childworker.pid = os.getpid()                                                    #获取子进程的pidtry:    util._setproctitle("worker [%s]" % self.proc_name)                      self.log.info("Booting worker with pid: %s", worker.pid)self.cfg.post_fork(self, worker)                                        #配置文件中配置参数, 配置文件中注册回调函数,当子进程生成后执行worker.init_process()                                                   #子进程worker的执行和初始化sys.exit(0)except SystemExit:raiseexcept AppImportError as e:self.log.debug("Exception while loading the application",exc_info=True)print("%s" % e, file=sys.stderr)sys.stderr.flush()sys.exit(self.APP_LOAD_ERROR)except:self.log.exception("Exception in worker process"),if not worker.booted:sys.exit(self.WORKER_BOOT_ERROR)sys.exit(-1)finally:self.log.info("Worker exiting (pid: %s)", worker.pid)try:worker.tmp.close()self.cfg.worker_exit(self, worker)except:self.log.warning("Exception during worker exit:\n%s",traceback.format_exc())

其中worker_class 就是配置文件中配置的worker进程的工作类
在gunicorn中有6种worker_class可供选择
1.sync
2.eventlet - Requires eventlet >= 0.9.7
3.gevent - Requires gevent >= 0.13
4.tornado - Requires tornado >= 0.2
5.gthread - Python 2 requires the futures package to be installed
6.gaiohttp - Requires Python 3.4 and aiohttp >= 0.21.5
其中,1为阻塞执行
我们先简要分析一下
SyncWorker继承自base.Worker


class Worker(object):SIGNALS = [getattr(signal, "SIG%s" % x)for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]PIPE = []def __init__(self, age, ppid, sockets, app, timeout, cfg, log):"""\This is called pre-fork so it shouldn't do anything to thecurrent process. If there's a need to make process widechanges you'll want to do that in ``self.init_process()``."""self.age = ageself.pid = "[booting]"self.ppid = ppidself.sockets = socketsself.app = appself.timeout = timeoutself.cfg = cfgself.booted = Falseself.aborted = Falseself.reloader = Noneself.nr = 0jitter = randint(0, cfg.max_requests_jitter)self.max_requests = cfg.max_requests + jitter or MAXSIZEself.alive = Trueself.log = logself.tmp = WorkerTmp(cfg)                                                   #新建一个临时文件def __str__(self):return "<Worker %s>" % self.piddef notify(self):"""\Your worker subclass must arrange to have this method calledonce every ``self.timeout`` seconds. If you fail in accomplishingthis task, the master process will murder your workers."""self.tmp.notify()                                                           #  更改临时文件的读写属性def run(self):"""\This is the mainloop of a worker process. You should overridethis method in a subclass to provide the intended behaviourfor your particular evil schemes."""raise NotImplementedError()def init_process(self):"""\If you override this method in a subclass, the last statementin the function should be to call this method withsuper(MyWorkerClass, self).init_process() so that the ``run()``loop is initiated."""# set environment' variables                                                    #子进程执行函数if self.cfg.env:                                                                #设置环境变量for k, v in self.cfg.env.items():os.environ[k] = vutil.set_owner_process(self.cfg.uid, self.cfg.gid,initgroups=self.cfg.initgroups)                          #设置进程拥有者# Reseed the random number generatorutil.seed()# For waking ourselves upself.PIPE = os.pipe()                                                           #打开管道for p in self.PIPE:util.set_non_blocking(p)                                                    #设置管道为非阻塞util.close_on_exec(p)                                                       #释放该状态# Prevent fd inheritance[util.close_on_exec(s) for s in self.sockets]util.close_on_exec(self.tmp.fileno())self.wait_fds = self.sockets + [self.PIPE[0]]                                  #等待的文件描述符self.log.close_on_exec()self.init_signals()                                                            #注册信号函数# start the reloaderif self.cfg.reload:                                                            #重新加载workerdef changed(fname):self.log.info("Worker reloading: %s modified", fname)self.alive = Falseself.cfg.worker_int(self)time.sleep(0.1)sys.exit(0)reloader_cls = reloader_engines[self.cfg.reload_engine]                   #配置文件配置参数  重启引擎self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,callback=changed)self.reloader.start()self.load_wsgi()                                                              #加载wsgiself.cfg.post_worker_init(self)                                               #配置文件注册函数 注册回调函数# Enter main run loopself.booted = Trueself.run()                                                                    #worker运行函数def load_wsgi(self):try:self.wsgi = self.app.wsgi()                                              #wsgi  callableexcept SyntaxError as e:if self.cfg.reload == 'off':raiseself.log.exception(e)# fix from PR #1228# storing the traceback into exc_tb will create a circular reference.# per https://docs.python.org/2/library/sys.html#sys.exc_info warning,# delete the traceback after use.try:exc_type, exc_val, exc_tb = sys.exc_info()self.reloader.add_extra_file(exc_val.filename)tb_string = six.StringIO()traceback.print_tb(exc_tb, file=tb_string)self.wsgi = util.make_fail_app(tb_string.getvalue())finally:del exc_tbdef init_signals(self):# reset signaling[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]# init new signalingsignal.signal(signal.SIGQUIT, self.handle_quit)signal.signal(signal.SIGTERM, self.handle_exit)signal.signal(signal.SIGINT, self.handle_quit)signal.signal(signal.SIGWINCH, self.handle_winch)signal.signal(signal.SIGUSR1, self.handle_usr1)signal.signal(signal.SIGABRT, self.handle_abort)# Don't let SIGTERM and SIGUSR1 disturb active requests# by interrupting system callsif hasattr(signal, 'siginterrupt'):  # python >= 2.6signal.siginterrupt(signal.SIGTERM, False)signal.siginterrupt(signal.SIGUSR1, False)if hasattr(signal, 'set_wakeup_fd'):signal.set_wakeup_fd(self.PIPE[1])def handle_usr1(self, sig, frame):             # 重新打开文件self.log.reopen_files()def handle_exit(self, sig, frame):              # 退出self.alive = Falsedef handle_quit(self, sig, frame):          self.alive = False# worker_int callbackself.cfg.worker_int(self)                 # 结束该工作进程time.sleep(0.1)sys.exit(0)def handle_abort(self, sig, frame):self.alive = Falseself.cfg.worker_abort(self)                 # 结束该进程sys.exit(1) 

self.tmp = WorkerTmp(cfg)
self.tmp.notify()
主要是通过新建一个临时文件来定时更改该文件的属性,然后主进程每次都检查该临时文件最新一次属性更改的时间,如果设置了timeout参数,则在超过该时间的临时文件所对应的工作进程就判定为已经死亡,主进程就会杀死该工作进程。

sync的类结果如下

class SyncWorker(base.Worker):def accept(self, listener):client, addr = listener.accept()                   # 接受新进来的请求client.setblocking(1)                              #  新连接设置为阻塞util.close_on_exec(client)                          self.handle(listener, client, addr)                # 处理该连接def wait(self, timeout):try:self.notify()ret = select.select(self.wait_fds, [], [], timeout)if ret[0]:if self.PIPE[0] in ret[0]:                # 区分出管道的读事件和请求到来的事件os.read(self.PIPE[0], 1)return ret[0]except select.error as e:if e.args[0] == errno.EINTR:return self.socketsif e.args[0] == errno.EBADF:if self.nr < 0:return self.socketselse:raise StopWaitingraisedef is_parent_alive(self):# If our parent changed then we shut down.if self.ppid != os.getppid():self.log.info("Parent changed, shutting down: %s", self)return Falsereturn Truedef run_for_one(self, timeout):listener = self.sockets[0]while self.alive:self.notify()# Accept a connection. If we get an error telling us# that no connection is waiting we fall down to the# select which is where we'll wait for a bit for new# workers to come give us some love.try:self.accept(listener)# Keep processing clients until no one is waiting. This# prevents the need to select() for every client that we# process.continueexcept EnvironmentError as e:if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,errno.EWOULDBLOCK):raiseif not self.is_parent_alive():returntry:self.wait(timeout)except StopWaiting:returndef run_for_multiple(self, timeout):while self.alive:self.notify()try:ready = self.wait(timeout)except StopWaiting:returnif ready is not None:for listener in ready:if listener == self.PIPE[0]:continuetry:self.accept(listener)         # 接受处理请求except EnvironmentError as e:if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,errno.EWOULDBLOCK):raiseif not self.is_parent_alive():returndef run(self):# if no timeout is given the worker will never wait and will# use the CPU for nothing. This minimal timeout prevent it.timeout = self.timeout or 0.5# self.socket appears to lose its blocking status after# we fork in the arbiter. Reset it here.for s in self.sockets:s.setblocking(0)if len(self.sockets) > 1:                        # 可能包含管道的描述符和serverself.run_for_multiple(timeout)else:self.run_for_one(timeout)def handle(self, listener, client, addr):req = Nonetry:if self.cfg.is_ssl:                                 # 是否配置了ssl证书client = ssl.wrap_socket(client, server_side=True,**self.cfg.ssl_options)parser = http.RequestParser(self.cfg, client)     req = six.next(parser)self.handle_request(listener, req, client, addr)        # 处理请求except http.errors.NoMoreData as e:self.log.debug("Ignored premature client disconnection. %s", e)except StopIteration as e:self.log.debug("Closing connection. %s", e)except ssl.SSLError as e:if e.args[0] == ssl.SSL_ERROR_EOF:self.log.debug("ssl connection closed")client.close()else:self.log.debug("Error processing SSL request.")self.handle_error(req, client, addr, e)except EnvironmentError as e:if e.errno not in (errno.EPIPE, errno.ECONNRESET):self.log.exception("Socket error processing request.")else:if e.errno == errno.ECONNRESET:self.log.debug("Ignoring connection reset")else:self.log.debug("Ignoring EPIPE")except Exception as e:self.handle_error(req, client, addr, e)finally:util.close(client)def handle_request(self, listener, req, client, addr):environ = {}resp = Nonetry:self.cfg.pre_request(self, req)                      # 执行配置文件中,在处理请求之前调用的处理函数request_start = datetime.now()resp, environ = wsgi.create(req, client, addr,              # 创建一个resp对象和环境值listener.getsockname(), self.cfg)# Force the connection closed until someone shows# a buffering proxy that supports Keep-Alive to# the backend.resp.force_close()self.nr += 1if self.nr >= self.max_requests:self.log.info("Autorestarting worker after current request.")self.alive = Falserespiter = self.wsgi(environ, resp.start_response)                           # 调用配置好的wsgi对象执行try:if isinstance(respiter, environ['wsgi.file_wrapper']):resp.write_file(respiter)else:for item in respiter:resp.write(item)                                                # 将处理后的结果进行返回resp.close()request_time = datetime.now() - request_startself.log.access(resp, req, environ, request_time)finally:if hasattr(respiter, "close"):respiter.close()except EnvironmentError:# pass to next try-except levelsix.reraise(*sys.exc_info())except Exception:if resp and resp.headers_sent:# If the requests have already been sent, we should close the# connection to indicate the error.self.log.exception("Error handling request")try:client.shutdown(socket.SHUT_RDWR)client.close()except EnvironmentError:passraise StopIteration()raisefinally:try:self.cfg.post_request(self, req, environ, resp)except Exception:self.log.exception("Exception in post_request hook")

当调用worker.init_process()时,
便调用了base.worker里的init_process()方法
而init_process()方法最后调用了self.run()方法
此时调用了SyncWorker类中的run()方法
然后执行了run_for_multiple()或者run_for_one()方法
根据传入调用配置好的wsgi接口然后执行,并将返回数据返回出去。
至此一个基本的gunicorn的工作流程基本完毕。

附上最简原理实现代码:

#coding:utf-8
import os
import sys
import socket
import time
import traceback
import errno
import signalclass Worker(object):def __init__(self, sock):self.sock = sockdef accept(self):client, addr = self.sock.accept()client.setblocking(True)self.handle(client, addr)def init_process(self):self.sock.setblocking(False)while True:try:time.sleep(1)self.accept()continueexcept Exception as e:msg = traceback.format_exc()with open("sub_"+str(os.getpid())+".txt","a") as f:f.write(msg+"\n")if hasattr(e, "errno"):if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):msg = traceback.format_exc()else:raisedef handle(self, client, addr):data = client.recv(1024)pid = os.getpid()data += str(pid)# print("receive:{} pid:{}".format(data, pid))client.send("back:"+data)client.close()class Server(object):def __init__(self):self.port = ("127.0.0.1", 8004)self.sock = socket.socket()self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.sock.bind(self.port)self.sock.setblocking(False)self.sock.listen(5)self.WORKERS = {}def run(self):self.init_signals()for i in range(2):self.spawn_worker()print(i)# self.spawn_worker()for k in self.WORKERS:print(k, self.WORKERS[k])while True:import timetime.sleep(3)try:pid, status = os.waitpid(-1, os.WNOHANG)print("kill  pid: {}, status: {}".format(pid, status))except os.error:print("error")def init_signals(self):signal.signal(signal.SIGTTIN, self.incr_one)signal.signal(signal.SIGTTOU, self.decr_one)def incr_one(self, signo, frame):self.spawn_worker()for k in self.WORKERS:print(k, self.WORKERS[k])def decr_one(self, signo, frame):for k in self.WORKERS:os.kill(k, signal.SIGKILL)breakdef spawn_worker(self):worker = Worker(self.sock)pid = os.fork()if pid != 0:worker.pid = pidself.WORKERS[pid] = workerreturn pidworker.pid = os.getpid()worker.init_process()sys.exit(0)if __name__ == "__main__":server = Server()server.run()

gunicorn源码分析相关推荐

  1. Django源码分析2:本地运行runserver分析

    django源码分析 本文环境python3.5.2,django1.10.x系列1.根据上一篇文章分析了,django-admin startproject与startapp的分析流程后,根据dja ...

  2. Django源码分析3:处理请求wsgi分析与视图View

    django源码分析 本文环境python3.5.2,django1.10.x系列 根据前上一篇runserver的博文,已经分析了本地调试服务器的大致流程,现在我们来分析一下当runserver运行 ...

  3. Flask源码分析(一)

    知识背景 Flask是python web框架,主要包含werkzeug和jinja2,前者是一个WSGI工具集,后者用来实现模板处理. WSGI,Werkzeug WSGI WSGI(Web Ser ...

  4. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  5. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  6. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  7. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  8. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  9. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

最新文章

  1. 解读《电力发展“十三五”规划》
  2. django ---- models继承
  3. scala mysql bit_Scala连接mysql数据库
  4. mysql 让一个存储过程定时作业的代码(转)
  5. socket如何定义端口号才能避免和其他程序冲突?
  6. oracle 64位客户端_LabVIEW读取Oracle数据库-开题
  7. 如何从WebSocket服务器收到的数据判断出有哪些客户端需要广播
  8. finereport字段显示设置_QA | 表单如何设置字段显示逻辑?
  9. 网站实现点击 “加入收藏 ”功能 - 代码篇
  10. 防盗链python_python一行代码,实现网页视频下载
  11. Java OCR tesseract 图像智能字符识别技术 Java实现
  12. PS流的格式和解析总结
  13. 帆软报表Tomcat 发布部署
  14. 2020年最全易语言安装与配置使用教程
  15. sp485ee 芯片调试,RE DE 一直上拉故障
  16. 清华大数据,365天我们持续在发声——数据院四周年系列报道之传播篇
  17. 移动营业厅前台设备如何安装+新手引导
  18. mysql 周平均值_SQL语句: 按周、月统计总值 和 平均值
  19. python如何提取奇数_Python 获取奇数和偶数
  20. 【历史上的今天】5 月 19 日:Java 之父诞生;中国首家互联网公司成立;CP/M 操作系统发明者出生

热门文章

  1. 给力!斩获 GitHub 14000 Star,两周创办开源公司获数百万美元融资
  2. 张亚勤世界互联网大会谈AI:将变革传统行业,催生新业态
  3. 阿里最新论文解读:考虑时空域影响的点击率预估模型DSTN
  4. 百度SLG拿下前锤子科技CTO钱晨,还要合并小鱼在家? | 极客头条
  5. TIOBE 2月编程语言排行榜:Python逼近C,Groovy重回TOP 20
  6. 精选机器学习开源项目Top10
  7. 林元庆创办的AiBee科技完成新融资,要赋能B端生意
  8. 小姐姐带你一起学:如何用Python实现7种机器学习算法(附代码)
  9. ​“后进生”数学学科上演完美逆袭,成最具竞争力专业之一
  10. 现在,Serverless 真的已经成熟了吗?