Python-协程-阻塞IO-非阻塞IO-同步IO-异步IO


一、协程

协程又称为微线程 CPU 是无法识别协程的,只能识别是线程,协程是由开发人员自己控制的。协程可以在单线程下实现并发的效果(实际计算还是串行的方式)。

如果使用线程在多个函数之间进行上下文切换,那么这个上下文的逻辑位置是保存在 CPU 中的,而协程也有上下文切换的操作,但是协程的上下文逻辑位置不是通过 CPU 保存的,所以使用协程的好处就是更少的占用了 CPU。

线程之间修改共享数据时,需要锁;而协程不需要,因为协程在线程中是串行的方式来修改数据的,所以不需要锁。

协程可以做到高并发、高扩展、低成本资源(一个 CPU 上万个协程都没问题)。

协程的缺点:因为是在单线程中,所以无法利用多核 CPU 的资源;协程如果要使用多核 CPU 的话,那么就需要先启多个进程,在每个进程下启一个线程,然后在线程下在启协程。

在单线程下实现的并发效果,就是协程。

在单线程中的协程,实现并发:

import timedef consumer(name):    print("--->starting eating baozi...")    while True:        new_baozi = yield   #返回到conn.__next__()        print("[%s] is eating baozi %s" % (name, new_baozi))        # time.sleep(1)

def producer():    r = con.__next__()  #执行生成器    r = con2.__next__()    n = 0    while n 5:        n += 1        con.send(n) #返回到conn.__next__()后,通过send发送参数给yield        con2.send(n)        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)

if __name__ == '__main__':    con = consumer("c1")  #定义成生成器    con2 = consumer("c2")    p = producer()

执行结果:--->starting eating baozi...--->starting eating baozi...[c1] is eating baozi 1[c2] is eating baozi 1[producer] is making baozi 1[c1] is eating baozi 2[c2] is eating baozi 2[producer] is making baozi 2[c1] is eating baozi 3[c2] is eating baozi 3[producer] is making baozi 3[c1] is eating baozi 4[c2] is eating baozi 4[producer] is making baozi 4[c1] is eating baozi 5[c2] is eating baozi 5[producer] is making baozi 5

##可以感觉到瞬间就执行完成了,我们通过自己写的方式实现了协程并发的效果(效果只是假象)##之所以产生了并发的效果,是因为执行代码时没有任何卡顿的地方,如果使用sleep的话,就不一样了。
import timeimport queuedef consumer(name):    print("--->starting eating baozi...")    while True:        new_baozi = yield        print("[%s] is eating baozi %s" % (name, new_baozi))        # time.sleep(1)def producer():    r = con.__next__()    r = con2.__next__()    n = 0    while n 5:        n += 1        con.send(n)        con2.send(n)        time.sleep(1)       ##这里加一个sleep        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)

if __name__ == '__main__':    con = consumer("c1")    con2 = consumer("c2")    p = producer()

执行结果:--->starting eating baozi...--->starting eating baozi...[c1] is eating baozi 1[c2] is eating baozi 1[producer] is making baozi 1[c1] is eating baozi 2[c2] is eating baozi 2[producer] is making baozi 2[c1] is eating baozi 3[c2] is eating baozi 3[producer] is making baozi 3[c1] is eating baozi 4[c2] is eating baozi 4[producer] is making baozi 4[c1] is eating baozi 5[c2] is eating baozi 5[producer] is making baozi 5

##加了sleep后,执行明显就卡顿了;##假如此时协程1通过CPU处理后,有一个IO(耗时)的操作(比如传数据),这个耗时需要30秒,##那么我们当前代码就得等上30秒后,才会用CPU去处理另一个协程的代码。##但是要知道,处理IO操作时是不需要CPU的,那么此时CPU有30秒时间期间没有给其他协程做计算。##实际的协程并发就是,当处理协程1时,遇到30秒的IO操作时不需要等待30秒就切换到其他协程去做计算,##这样就实现了并发的效果。

##正常情况下每个协程IO的操作时间都不同,那么我们什么时候进行上下文的切换来切换到之前的协程呢,##提前和延后都不合适,提前了IO操作还没完成,不能进行下一步计算,延后的话就对该协程造成了延迟,##所以就需要来识别IO操作什么时候完成,当完成之后立刻切换到之前的协程进行下一步CPU的计算。
greenlet模块

#之前我们是自己用yield的方式自己实现的协程;而greenlet模块是已经封装好的协程。#greenlet需要在cmd中通过 pip install greenlet 来安装

from greenlet import  greenlet

def test1():    print (12)    gr2.switch()    #切换到函数test2()    print(34)    gr2.switch()    #这里会切换到test2中的gr1.switch()位置,继续向下执行

def test2():    print (56)    #切换到test1(),这里切换到test1后不是从头开始执行函数了,    #而是从记录了gr2.switch() 的位置继续执行    gr1.switch()    print(78)

gr1 = greenlet(test1)   #启动一个协程gr2 = greenlet(test2)gr1.switch()   #调用test1开始执行,类似yield的next切换

执行结果:12563478

## 执行效果和yield类似,需要手动定义switch()才会切换
gevent模块#cmd中pip install gevent#gevent用于协程之间的自动切换

import gevent

def foo():    print ('Running in foo')    gevent.sleep(2) #gevent.sleep用来模拟IO操作,这里模拟2秒钟    print ('Explicit context switch to foo again')

def bar():    print ('Explicit context to bar')    gevent.sleep(1)    print ('Implicit Context switch back to bar')

gevent.joinall([    gevent.spawn(foo),  #spawn是生成的意思,这里生成协程foo    gevent.spawn(bar),])

执行结果:Running in fooExplicit context to bar上面两行内容相当于并发一起执行的

Implicit Context switch back to bar #这里在上2行内容后隔了1秒被执行

Explicit context switch to foo again## 这里在上1行内容后隔了1秒被执行## 通过gevent.spawn(foo)执行了协程,执行到函数def foo()中的geven.sleep后,## 就会切换到另一个协程,另一个协程执行到geven.sleep后也会切换,## 所以当前两个协程只要IO操作还没执行完成就会不断的切换来确认IO是否执行完成;## 因为def bar中只sleep(1),执行的较快,## 所以'Implicit Context switch back to bar'## 就先于'Explicit context switch to foo again'被打印出来。
import gevent

def foo():    print ('Running in foo')    gevent.sleep(2)    print ('Explicit context switch to foo again')

def bar():    print ('Explicit context to bar')    gevent.sleep(1)    print ('Implicit Context switch back to bar')

def func3():    print ('Running func3')    gevent.sleep(0) #这里设置0秒只是为了进行协程间的切换    print ('Running func3 again')

gevent.joinall([    gevent.spawn(foo),    gevent.spawn(bar),    gevent.spawn(func3),])

执行结果:Running in fooExplicit context to barRunning func3Running func3 againImplicit Context switch back to barExplicit context switch to foo again

##这里总体执行时间用了2秒左右的时间,实现了并发的效果。##协程之间的切换是轮询的,也就是串行的方式切换。##gevent模块中封装了手动切换的内容,利用手动切换的代码来实现自动切换的。
下载网页

from urllib import request

def f(url):    print('GET: %s' % url)    resp = request.urlopen(url)  #建立一个实例resp,用来请求指定的链接    data = resp.read()  #将请求的链接读取出来,并赋值给data; 这个data就是下载下来的网页    f = open('url.html','wb')    f.write(data)   #将读取的内容写入到文件中    f.close()    print('%d bytes received from %s.' % (len(data), url))

f("https://www.baidu.com")

##通过上面代码来获取www.baidu.com网页

执行结果:GET: https://www.baidu.com227 bytes received from https://www.baidu.com.

这里的内容不对,是百度设置了反爬虫。下面我们弄个其他网页


用基本的爬虫功能,爬取了网页的内容,爬虫网页肯定不是一个网页的去爬,肯定是大范围的,所以可以利用协程来进行大范围爬网页。

串行爬虫:

from urllib import requestimport time

def f(url):    print('GET: %s' % url)    resp = request.urlopen(url)    data = resp.read()    print('%d bytes received from %s.' % (len(data), url))

urls = ['https://www.python.org/',       'https://www.yahoo.com/',       'https://github.com/']

time_start = time.time()

for url in urls:    f(url)

print ("同步cost",time.time() - time_start)

执行结果:D:\python3.6.4\python.exe E:/python/代码练习/A1.pyGET: https://www.python.org/48844 bytes received from https://www.python.org/.GET: https://www.yahoo.com/529760 bytes received from https://www.yahoo.com/.GET: https://github.com/52239 bytes received from https://github.com/.异步cost 22.685065507888794

##通过串行的方式来爬取网页,可以看到时间大概是22秒
并行爬虫:from urllib import requestimport gevent,time

def f(url):    print('GET: %s' % url)    resp = request.urlopen(url)    data = resp.read()    print('%d bytes received from %s.' % (len(data), url))

async_time = time.time()#这里加了参数,启动三个协程都执行f这个函数gevent.joinall([        gevent.spawn(f, 'https://www.python.org/'),        gevent.spawn(f, 'https://www.yahoo.com/'),        gevent.spawn(f, 'https://github.com/'),])

print ('异步cost',time.time()- async_time)

执行结果:D:\python3.6.4\python.exe E:/python/代码练习/A1.pyGET: https://www.python.org/48844 bytes received from https://www.python.org/.GET: https://www.yahoo.com/530851 bytes received from https://www.yahoo.com/.GET: https://github.com/52239 bytes received from https://github.com/.异步cost 21.431119441986084

##看到时间是21秒的样子(根据网速会有波动),而且执行过程中发现用的也是串行的方式来执行的。##依然使用串行的方式来执行是因为,gevent没有识别urllib的IO操作
from urllib import requestimport gevent,time

from gevent import monkeymonkey.patch_all()#通过monkey.patch_all()可以自动识别urllib中有可能是IO的所有操作,#然后在操作之前打一个标记,实现阻塞的效果(类似gevent.sleep)的效果。#所以一旦gevent发现阻塞的效果,就会进行协程之间的切换,然后就可以实现协程并行的效果了。

def f(url):    print('GET: %s' % url)    resp = request.urlopen(url)    data = resp.read()    print('%d bytes received from %s.' % (len(data), url))

async_time = time.time()

gevent.joinall([        gevent.spawn(f, 'https://www.python.org/'), #这里加了参数,启动三个协程都执行f这个函数        gevent.spawn(f, 'https://www.yahoo.com/'),        gevent.spawn(f, 'https://github.com/'),])

print ('异步cost',time.time()- async_time)

执行结果:GET: https://www.python.org/GET: https://www.yahoo.com/GET: https://github.com/52239 bytes received from https://github.com/.48844 bytes received from https://www.python.org/.519508 bytes received from https://www.yahoo.com/.异步cost 6.967620849609375##可以看到明显执行耗费的时间变短了很多。

2、socket + 协程

server端:

import sysimport socketimport timeimport gevent

from gevent import socket, monkey

monkey.patch_all()

def server(port):    s = socket.socket()    s.bind(('0.0.0.0', port))    s.listen(500)   #最大500个连接    while True:        cli, addr = s.accept()  #等待请求;默认收到请求是交给协程,不过下面设置交给协程。        gevent.spawn(handle_request, cli)   #启动协程并调用函数,将请求连接的实例cli交给handle_request函数

def handle_request(conn):    try:        while True:            data = conn.recv(1024)            print("recv:", data)            conn.send(data)            if not data:                conn.shutdown(socket.SHUT_WR)                ##conn.shutdown将客户端关闭。socket.SHUT_WR发一个信号。                ##上一行代码可以用break代替

    except Exception as  ex:        print(ex)    finally:        conn.close()

if __name__ == '__main__':    server(8001)

client端:import socket

HOST = 'localhost'  # The remote hostPORT = 8001  # The same port as used by the servers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.connect((HOST, PORT))while True:    msg = bytes(input(">>:"), encoding="utf8")    s.sendall(msg)    data = s.recv(1024)    # print(data)

    print('Received', data)s.close()

从执行结果来看,我们启了 3 个 client 分别向 server 发送信息,然后在 server 端可以看到通过协程同时处理了 3 个 client 的数据。

三、事件驱动

在此之前协程之间的切换,会在 IO 操作完成之前在协程之间不断的切换,但目前依然没有解决要等待 IO 操作完成之后才切换的功能。

通常,我们写服务器处理模型的程序时,有以下几种模型:

  1. 每收到一个请求,创建一个新的进程,来处理该请求;
  2. 每收到一个请求,创建一个新的线程,来处理该请求;
  3. 每收到一个请求,放入一个事件列表,让主进程通过非阻塞 I/O 方式来处理请求(就是以事件驱动的方式来处理) 上面的几种方式,各有千秋, 第(1)种方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式

之前接触到 ThreadingTCPServer,这表示启动多线程 还有一个叫 ForkingTCPServer,启动多进程

现在主流的网络服务模型用的就是事件驱动

在 UI 编程中,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢?

方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:

  1. CPU 资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的 CPU 资源浪费;如果扫描鼠标点击的接口是阻塞的呢(当点击鼠标时,鼠标点击的那个接线程在处于阻塞状态,假如阻塞 5 秒,那么 5 秒之内鼠标就不能在动了,只能等待阻塞完成)。
  2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
  3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;所以,该方式是非常不好的。

方式二:就是事件驱动模型

目前大部分的 UI 编程都是事件驱动模型,如很多 UI 平台都会提供 onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:

  1. 有一个事件(消息)队列;
  2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
  3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如 onClick()、onKeyDown()(按下键盘)等;
  4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;

图中事件 1 假如是鼠标点击,那么鼠标点击一下就将该事件放入这个事件队列中;假如事件 2 是按下键盘事件,也将该事件放入事件队列中;

线程会循环的去处理事件队列中的事件。

将事件加入到事件列表,和提取事件处理相互是不影响的,事件的处理速度,并不影响事件的产生速度,这就是典型的生产者消费者模型。

比如我每秒点 10 次鼠标,但是你的处理速度是每秒 8 次,虽然你处理的慢,但是并不影响我继续点击鼠标。

事件驱动模型就是根据事件做出相应的反应,比如点下文档的'X'就关闭文档,点击 '-' 就最小化文档。

四、IO 多路复用

下面讨论的是基于 linux 的 IO

  • 用户空间与内核空间

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核(操作系统需要使用部分内存空间来运行,这就是内核空间),独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限(比如访问网卡、音响声卡都是通过内核访问的,而不是用户程序)。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到 0xBFFFFFFF),供各个进程使用,称为用户空间。

  • 进程切换

    • 进程切换就是上下文的切换
  • 进程阻塞: 正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态(比如 socket server 等不到 client 的数据就会阻塞)。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得 CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用 CPU 资源的。

  • 文件描述符 id

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于 UNIX、Linux 这样的操作系统。

文件描述符相当于一个索引,通过索引打开真正的内容。

  • 缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:

数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

打开一个文件默认不是在用户的内存空间,而是放入了内核的缓存中,然后在从内核的缓存拷贝到用户的内存空间;传数据也是一样,先是到内核缓存中,然后才会拷贝到用户的内存空间;使用内核是很耗 CPU 的,耗 CPU 是指拷贝到内存的这个指令,如果有大量数据需要从内核缓存拷贝到用户内存空间,那么就会有大量的指令会消耗 CPU 资源

访问网卡、声卡等只能通过内核实现,而用户空间是无法直接访问内核空间的,所以需要通过内核缓存的空间将内容拷贝到用户的内存空间,然后用户才可以使用。

五、IO 模式

刚才说了,对于一次 IO 访问(以 read 举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个 read 操作发生时,它会经历两个阶段:

  1. 等待数据准备 (Waiting for the data to be ready)(就是将数据放到内核缓存中)
  2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正是因为这两个阶段,linux 系统产生了下面五种网络模式的方案。

  • 阻塞 I/O(blocking IO)
  • 非阻塞 I/O(nonblocking IO)
  • I/O 多路复用( IO multiplexing)
  • 信号驱动 I/O( signal driven IO)
  • 异步 I/O(asynchronous IO)

注:由于 signal driven IO 在实际中并不常用,所以我这只提及剩下的四种 IO Model。

  • 阻塞 I/O(blocking IO)

在 linux 中,默认情况下所有的 socket 都是 blocking,一个典型的读操作流程大概是这样:recve 时接收端会阻塞,直到系统接收到数据,系统接收到数据后此时也是阻塞的,会从内核缓存 copy 到用户内存,然后返回一个 OK 才是用户真正接收到了数据。

当用户进程调用了 recvfrom 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据(对于网络 IO 来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的 UDP 包。这个时候 kernel 就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当 kernel 一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了(等数据的阶段和从内核拷贝给用户的阶段)。
  • 非阻塞 I/O(nonblocking IO) linux 下,可以通过设置 socket 使其变为 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call(receive 的动作),那么它马上就将数据拷贝到了用户内存,然后返回。
所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。
  • IO 多路复用 IO multiplexing 就是我们说的 select,poll,epoll,有些地方也称这种 IO 方式为 event driven IO。select/epoll 的好处就在于单个 process 就可以同时处理多个网络连接的 IO。它的基本原理就是 select,poll,epoll 这个 function 会不断的轮询所负责的所有 socket,当某个 socket 有数据到达了,就通知用户进程。

在单线程且又是阻塞模式下,是没法实现多个 IO 一起执行的,因为当接收数据时,一直没有接收到的话就会一直卡住。

在单线程下非阻塞模式下,假如此时有 10 个 IO,对这 10 个 IO 进行 for 循环来接收数据,先接收其中 2 个 IO 的数据,如果这 2 个 IO 没有接收到数据就会返回 err,此时就不会在阻塞了,然后继续进行 for 循环,此时其他 IO 如果有数据就会将数据接收过来,然后就这样不断的 receive,发现 err 就不阻塞,有数据则接收。使用非阻塞模式就可以处理多个 socket,对于用户来说就已经是并发了。但是要注意的是第一阶段不卡了,但是此时第二阶段依然会卡,如果从内核 copy 到用户内存的数据不大,则很快会 copy 完成,但是如果数据很大的话,第二阶段就会一直在 copy 数据,直到数据 copy 完成,但相应的在第二阶段卡的时间也会很久。

当用户进程调用了 select,那么整个进程会被 block,假如此时有 100 个 socket 的 IO,那么 kernel 会监视所有 select 负责的 socket,当任何一个 socket 中的数据准备好了(kernel 的数据准备好),select 就会返回。这个时候用户进程在调用 read 操作,将数据 kernelcopy 到用户进程。所以,I/O 多路复用的特点是通过一种机制 一个进程能同时等待多个文件描述符,而这些文件描述符(socket 连接)其中的任意一个进入读就绪状态,select()函数就可以返回。多路复用和阻塞模式的区别就是,阻塞模式监视一个 socket,有数据则接收;而多路复用就是可以通过 select 监视 N 个 socket,只要其中任何一个有数据,则进行 select 返回,然后 receive 数据(第二阶段数据过大的话,依然会有阻塞)。

假如此时有 10000 个 socket 连接,监视到有数据后 kernel 就会告诉返回给用户进程,但 kernel 不会告诉用户进程具体是哪个 socket 连接,所以用户就会循环着 10000 个 socket 连接,但是即使其中只有 2 个 socket 有数据,用户程序也会去循环着 10000 个 socket 连接,这就造成了大量的多余循环操作。

select

select 最早于 1983 年出现在 4.2BSD 中,它通过一个 select()系统调用来监视多个文件描述符的数组,当 select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select 的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量 TCP 连接处于非活跃状态,但调用 select()会对所有 socket 进行一次线性扫描,所以这也浪费了一定的开销。

poll

poll 在 1986 年诞生于 System V Release 3,它和 select 在本质上没有多大差别,但是 poll 没有最大文件描述符数量的限制。

poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和 poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行 IO 操作,那么下次调用 select()和 poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll

直到 Linux2.6 才出现了由内核直接支持的实现方法,那就是 epoll,它几乎具备了之前所说的一切优点,被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法。

epoll 可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll 同样只告知那些就绪的文件描述符,而且当我们调用 epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去 epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而 epoll 事先通过 epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个文件描述符,当进程调用 epoll_wait()时便得到通知。

epool 会告诉用户进程具体哪个 socket 连接有数据了,所以用户进程不需要在将所有 socket 连接全都循环一次才发现具体哪个有数据。

Windows 不支持 epool,支持 select

  • 异步 I/O linux 下的 asynchronous IO 其实用得很少。先看一下它的流程:用户进程发起 read 操作之后,立刻就可以开始去做其它的事(不需要等待 kernel 拷贝数据到用户)。而另一方面,从 kernel 的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block。然后,kernel 会等待数据准备完成,然后 kernel 主动将数据拷贝到用户内存,当这一切都完成之后,kernel 会给用户进程发送一个 signal,告诉它 read 操作完成了(没有任何阻塞)。
  • 小结
    • 同步 IO:阻塞、非阻塞、多路复用都属于同步 IO,因为他们都需要等待 kernel 到用户的数据 copy。同步 IO 都是需要用户进程去 kernel 接收数据。
    • 异步 IO:异步 I/O 不需要等待 kernel 到用户的数据 copy。异步是 kernel 主动将数据 copy 到用户内存。

异步因为实现比较复杂,所以使用的较少,使用较多的还是 epool 多路复用。

阻塞模式:

  • 原理:内核阻塞,直到接收到完整的数据后,在将数据 copy 给用户内存,copy 完成后会返回一个 OK 给当前 server 的进程,然后进程接触阻塞继续向下执行(执行代码)。
  • server 等待接收数据时,会处于阻塞的状态,当前进程不会再往下执行,除非接收到数据以后才会;假如当前有 3 个 IO,目前第 1 个 IO 处于接收数据状态,除非第 1 个 IO 内核接收完成,否则第 2 和第 3 个内核 IO 都不会开始数据的接收;内核准备好数据还要讲数据 copy 给用户内存。

非阻塞模式:

  • 原理:内核不阻塞,用户内存接收数据会阻塞(直到接收完成);多个 IO 时,内核会同时接收多个 IO 数据,用户进程会轮询的方式 read 内核是否准备好数据,如果没有准备好的话内核返回 err 给用户进程,此时用户进程会问内核其他 IO 是否准备好数据,没准备好内核返回 err 给用户进程,准备的话就将数据 copy 给用户内存。

  • server 等待接收数据,假如当前有 3 个 IO,内核的第 1 个 IO 数据没有准备好,用户进程向进程询问第 1 个 IO 数据是否准备好,内核返回 err 给用户进程,用户进程去问内核第 2 个 IO 数据是否准备好,准备好了就会将数据 copy 到用户内存,用户进程以此类推的循环去询问内核;需要不断的 read 和返回 err,开销较大。

IO 多路复用:

select:

  • 有多个 socket IO 时,通过 select 来负责所有 socket IO,然后内核会监视所有 select 负责的 socket,用户进程会循环所有 IO,当任何一个 socket IO 中的数据准备好了(相当于内核的数据准备好了)且刚好被用户进程循环到,select 就会返回 datagram ready 给用户进程(此时用户内存处于阻塞状态),用户进程将该 IO 数据 copy 到用户内存,copy 完成后返回 OK 给用户进程,告知解除用户内存,让其他 IO 在内核内存准备好的数据可以 copy 到用户内存(如果当前从内核 copy 到用户内存的数据较大的话,只能等待数据 copy 完成,也就是该阶段依然是阻塞,除非数据 copy 完成后,才会为其他 IO 的数据进行 copy 操作)。
  • 使用 select 不需要内核返回大量的 err,但是用户进程依然需要循环所有 IO,假如 10000 个 IO,其中只有 2 个 IO 数据准备好了,那么用户进程依然需要去循环这 10000 个 IO 来发现这两个已经准备好数据的 IO。select 还是存在大量循环的操作。

poll:

  • poll 和 select 基本相似,select 支持的文件描述符(相当于每个 IO 的索引地址,用户进程需要根据具体的地址来进行制定的数据操作)为 1024,poll 则没有文件描述符的限制。

epoll:

  • 不需要用户进程去循环,当内核数据准备好后会立即告知用户进程具体的哪个 socket IO 数据准备好了,且只会说一遍,不会再次告知,用户进程不需要循环所有 socket IO ,不过 epoll 的代码实现相当复杂。

异步 IO:

  • 用户进程发起 read 后就立刻开始做其他事情,不需要等内核将数据 copy 到用户;而内核接到 read 后会返回信息给用户进程,不会让用户进程产生阻塞(这样第二阶段不会阻塞),然后当内核准备好数据后,内核会主动将数据 copy 给用户内存(其他模式是用户主动从内核 copy 数据),当数据 copy 完成后内核会发送一个 signal 给用户进程,告诉用户进程 read 操作完成了(没有任何阻塞)。
  • 异步因为实现比较复杂,所以使用的较少,使用较多的还是 epool 多路复用。

各个 IO Model 的比较如下图所示:


六、select IO 多路复用

socket server 只有在非阻塞的模式下,才可以实现单线程下的多路复用

server端:

import selectimport socketimport queue

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False) #设置socket server为非阻塞模式

inputs = [server,]#程序运行时就需要监视连接,但server启动时不会有client马上或者正好连接过来,##所以这里启动server程序时监视server自己的连接#监视自己的连接后,就发现了有连接活动,然后就会阻塞状态outputs = []

readable,writeable,exceptional = select.select(inputs,outputs,inputs)#设置select## 第一个inputs是告诉操作系统监视哪些连接,任何一个有数据就会返回;## 第三个值,也就是第二个inputs用来监视被监视所有连接中,## 哪些有问题(比如100个连接其中有4个连接断开了),就会返回有问题的##将监视到第一个inputs的连接赋值给readable,将outputs(这个后面会说到)赋值给writeable,##将第二个inputs赋值给exceptional

print (readable,writeable,exceptional)  #运行server端后,在去运行client端,就会打印三个值

server.accept()  #设置非阻塞模式后,这里就不会在阻塞了,如果没有数据就会报错。

client端:

import socket

HOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))

while True:    msg = bytes(input(">>:"),encoding='utf-8')    c.sendall(msg)    data = c.recv(1024)

    print ('Received',data)c.close()

server端执行结果:[508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []##readable是client的连接;而writeable,exceptional则为两个空列表
修改server端:

import selectimport socketimport queue

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False)

inputs = [server,]

outputs = []

readable,writeable,exceptional = select.select(inputs,outputs,inputs)

print (readable,writeable,exceptional)

for r in readable:    # print (r)    conn,addr = server.accept()    print (conn,addr)   #可以看到conn实例,和addr的client地址    print ("recv:",conn.recv(1024))    ##目前代码到这里就会报错,因为server希望收到client的数据,但是client没有发送数据;    ##server没有收到数据,也不会阻塞,这里就会报错。    ##此时就希望select可以监视client的连接,当监视到client的对应的连接发送数据了以后,    ##server再去接收数据,这样接收数据就不为空了。
修改server端:

import selectimport socketimport queue

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False)

inputs = [server,]

outputs = []

while True: #死循环用来不断的去select监视    readable,writeable,exceptional = select.select(inputs,outputs,inputs)

    print (readable,writeable,exceptional)

    for r in readable:        # print (r)

        if r is server: #这里确定通过server自己已经建立好连接,处于阻塞状态            conn,addr = server.accept()  #与client建立连接            print ("新连接:",addr)            inputs.append(conn)            ##想要实现client发数据来时,server端能知道客户端发送数据过来,            ##就需要select能够再检测这个conn            ##将conn加入到inputs中,此时inputs=[server,conn],select发现有活动的连接就会返回,            ##但此时不知道活动的连接是谁;            ##所以select会循环inputs,如果发现是conn活动就说明数据发送过来了,            ##如果是server活动说明有其他client建立了新连接。        else:  #执行这里就说明r不等于server本身的连接,而是等于conn这个连接            data = conn.recv(1024)   #接收conn连接的数据            print ("收到数据:",data)

server端执行结果:

[412, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []新连接: ('127.0.0.1', 56201)[532, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 56201)>] [] []收到数据: b'123'[412, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []新连接: ('127.0.0.1', 56205)[536, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 56205)>] [] []收到数据: b'456'##两次client建立连接,分别发送数据123和456,从server执行结果也可以看到两个client的连接和发送的数据;##但此时去看client端的话,两个client端都是卡主的状态。
server端:

import selectimport socketimport queue

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False)

inputs = [server,]

outputs = []

while True:    readable,writeable,exceptional = select.select(inputs,outputs,inputs)

    print (readable,writeable,exceptional)

    for r in readable:        # print (r)

        if r is server:            conn,addr = server.accept()            print ("新连接:",addr)            inputs.append(conn)        else:            data = conn.recv(1024)            print ("收到数据:",data)            #conn.send(data)            #print ("send done!")

client端:import socket

HOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))

while True:    msg = bytes(input(">>:"),encoding='utf-8')    c.sendall(msg)    # data = c.recv(1024)    #    # print ('Received',data)c.close()

执行结果:执行步骤:       1、启动server端;       2、启动client1,并发送数据;       3、启动client2并发送数据;       4、client1再次发送数据,

此时发现server端报错了。

server报错内容如下:Traceback (most recent call last):  File "E:/python/代码练习/A1.py", line 31, in     data = conn.recv(1024)BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。#client1建立连接后,server端的data是用client1的连接接收数据;#client2建立连接后,此时server端用的是client2的连接接收数据;#此时server端data的conn依然是client2的连接,所以这时client1在发送数据的话,server端就会报错。
修改server端:

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False)

inputs = [server,]

outputs = []

while True:    readable,writeable,exceptional = select.select(inputs,outputs,inputs)

    print (readable,writeable,exceptional)

    for r in readable:        # print (r)

        if r is server:            conn,addr = server.accept()            print ("新连接:",addr)            inputs.append(conn)        else:            data = r.recv(1024)            #这里conn改成r,这时因为for循环时获取的是动态的活动连接,            #此时用client1发送数据,那么for循环整个inputs,            #然后发现活动的是client1的连接,所以此时r就等于client1的连接,            #然后用client1的连接来接收数据就不会出问题了。            print ("收到数据:",data)            # r.send(data)            # print ("send done!")
server端:

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False)

msg_dic = {}    #建立空字典,用于存储每个client连接发过来的数据

inputs = [server,]

outputs = []

while True:    readable,writeable,exceptional = select.select(inputs,outputs,inputs)

    print (readable,writeable,exceptional)

    for r in readable:        # print (r)

        if r is server:            conn,addr = server.accept()            print ("新连接:",addr)            inputs.append(conn)            msg_dic[conn] = queue.Queue()   #初始化一个队列,后面存要返回给客户端的数据

        else:            data = r.recv(1024)            print ("收到数据:",data)            msg_dic[r].put(data)   #将data这个数据放入字典中对应的key,也就是r是key,data是value            outputs.append(r)   #放入返回的连接队列            ##当前将r这个连接已经放入outputs中了,            ##所以下次select时会检查outputs中这个连接            ##这里并没有在接收到数据时直接发送数据回去,其实使用send也可以

    for w in writeable: #要返回给客户端的连接列表        data_to_client = msg_dic[w].get() #此时w等于r,都是一个连接实例        w.send(data_to_client)  #返回给客户端原数据

        outputs.remove(w)   #删除当前连接的数据,确保下次循环时不返回上次处理完连接的数据

client端:import socket

HOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))

while True:    msg = bytes(input(">>:"),encoding='utf-8')    c.sendall(msg)    data = c.recv(1024)    print ('Received',data)c.close()

server执行结果:[420, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []新连接: ('127.0.0.1', 58058)[536, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 58058)>] [] []收到数据: b'hello'[] [536, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 58058)>] [][536, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 58058)>] [] []收到数据: b'aa'[] [536, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 58058)>] [][420, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []新连接: ('127.0.0.1', 58073)[544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 58073)>] [] []收到数据: b'bbb'##可以看到已经可以实现多连接并发了。
server端:

import selectimport socketimport queue

server = socket.socket()server.bind(('localhost',9000))server.listen(1000)

server.setblocking(False)

msg_dic = {}    #建立空字典,用于存储每个client连接发过来的数据

inputs = [server,]

outputs = []

while True:    readable,writeable,exceptional = select.select(inputs,outputs,inputs)

    print (readable,writeable,exceptional)

    for r in readable:        # print (r)

        if r is server:            conn,addr = server.accept()            print ("新连接:",addr)            inputs.append(conn)            msg_dic[conn] = queue.Queue()   #初始化一个队列,后面存要返回给客户端的数据

        else:            data = r.recv(1024)            print ("收到数据:",data)            msg_dic[r].put(data)    #将data这个数据放入字典中对应的key,也就是r是key,data是value            outputs.append(r)   #放入返回的连接队列            ##当前将r这个连接已经放入outputs中了,所以下次select时会检查outputs中这个连接

    for w in writeable: #要返回给客户端的连接列表        data_to_client = msg_dic[w].get() #此时w等于r,都是一个连接实例        w.send(data_to_client)  #返回给客户端原数据

        outputs.remove(w)   #删除当前连接的数据,确保下次循环时不返回上次处理完连接的数据

    for e in exceptional:   #如果连接断开,那么就需要从inputs和outpus中移除        if e in outputs:            outputs.remove(e)   #移除outputs中的连接

        inputs.remove(e)    #移除inputs中的连接

        del msg_dic[e]  #移除队列中的数据

client端:import socket

HOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))

while True:    msg = bytes(input(">>:"),encoding='utf-8')    c.sendall(msg)    data = c.recv(1024)    print ('Received',data)c.close()
  • 之前讲的有些算是底层的东西,如果要了解 epoll 底层的东西可以访问连接:http://www.cnblogs.com/alex3714/articles/5248247.html
  • Windows 不支持 epoll
封装好的epoll(不是底层代码)这里封装好的代码在linux中会自动使用epoll,但在Windows中因为不支持epoll会自动使用select。

server端:

import selectorsimport socket

sel = selectors.DefaultSelector()   #生成一个select对象

def accept(sock, mask):    conn, addr = sock.accept()  # 建立连接    print('accepted', conn, 'from', addr)    conn.setblocking(False)     #将连接设置为非阻塞模式    sel.register(conn, selectors.EVENT_READ, read)    ## 不立刻收数据,将新建立的连接(conn)注册到sel这个对象中;    ## 新连接活动,再一次while循环后,如果数据发过来就回调read函数;    ## accept函数执行完成后,回到第while循环中的callback(key.fileobj, mask)

def read(conn, mask):    data = conn.recv(1000)  # 接收数据    if data:    #如果有数据        print('echoing', repr(data), 'to', conn)        conn.send(data)  # 返回数据    else:   #如果没有数据,表示client断开连接了        print('closing', conn)        sel.unregister(conn)    #取消注册(从相关的列表中删除链接信息)        conn.close()    #关闭连接

sock = socket.socket()sock.bind(('localhost', 10000))sock.listen(100)sock.setblocking(False)sel.register(sock, selectors.EVENT_READ, accept)## 将写好的socket这个server注册到sel对象中,让其监听socket## 只要来一个新连接就会回调函数accept(不是在这调用,这只是定义)

while True:    #根据系统调用epoll或select;默认是阻塞模式,有活动连接就返回活动的连接列表    events = sel.select()    for key, mask in events:    #循环活动的连接列表        callback = key.data     #callback相当于定义函数(不是调用)        callback(key.fileobj, mask)     #调用函数传参数        ## key.fileobj是文件句柄,相当于还没有建立好连接的实例

client端:import socket

HOST = 'localhost'PORT = 10000c = socket.socket()c.connect((HOST,PORT))

while True:    msg = bytes(input(">>:"),encoding='utf-8')    c.sendall(msg)    data = c.recv(1024)    print ('Received',data)c.close()

server执行结果:

accepted 472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 53817)> from ('127.0.0.1', 53817)echoing b'111' to 472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 53817)>accepted 476, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 53822)> from ('127.0.0.1', 53822)echoing b'22' to 476, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 53822)>echoing b'1111111' to 472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 53817)>echoing b'222222222' to 476, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 53822)>##多个client并发,发送数据
server端:import selectorsimport socket

sel = selectors.DefaultSelector()

def accept(sock, mask):    conn, addr = sock.accept()  # 建立连接    print('accepted', conn, 'from', addr)    conn.setblocking(False)     #将连接设置为非阻塞模式    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):    data = conn.recv(1000)    if data:        print('echoing', repr(data), 'to', conn)        conn.send(data)    else:        print('closing', conn)        sel.unregister(conn)        conn.close()

sock = socket.socket()sock.bind(('localhost', 10000))sock.listen(100)sock.setblocking(False)sel.register(sock, selectors.EVENT_READ, accept)

while True:    events = sel.select()    for key, mask in events:        callback = key.data        callback(key.fileobj, mask)

client端:import socketimport sys

messages = [ b'This is the message. ',             b'It will be sent ',             b'in parts.',             ]  #定义3条数据server_address = ('localhost', 10000)

# Create a TCP/IP socketsocks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),          socket.socket(socket.AF_INET, socket.SOCK_STREAM),          socket.socket(socket.AF_INET, socket.SOCK_STREAM),          socket.socket(socket.AF_INET, socket.SOCK_STREAM),          ] #定义4个client去连接server;  这里可以定义N个client

# Connect the socket to the port where the server is listeningprint('connecting to %s port %s' % server_address)for s in socks:    s.connect(server_address)   #4个client去连接server

for message in messages:

    # Send messages on both sockets    for s in socks:        print('%s: sending "%s"' % (s.getsockname(), message) )        #s.getsockname()是客户端的地址(是服务器返回的)        s.send(message) #4个链接发送数据,一共3条数据,4个client一共发送了12次

    # Read responses on both sockets    for s in socks:        data = s.recv(1024) #发完数据后就接收数据。        print( '%s: received "%s"' % (s.getsockname(), data) )        if not data:    #如果没有数据            print('closing socket', s.getsockname() )            ## 打印客户端要关闭了

server端执行结果:accepted 428, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54652)> from ('127.0.0.1', 54652)accepted 436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54653)> from ('127.0.0.1', 54653)accepted 440, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54654)> from ('127.0.0.1', 54654)accepted 444, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54655)> from ('127.0.0.1', 54655)echoing b'This is the message. ' to 428, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54652)>echoing b'This is the message. ' to 440, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54654)>echoing b'This is the message. ' to 436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54653)>echoing b'This is the message. ' to 444, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54655)>echoing b'It will be sent ' to 428, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54652)>echoing b'It will be sent ' to 440, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54654)>echoing b'It will be sent ' to 436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54653)>echoing b'It will be sent ' to 444, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54655)>echoing b'in parts.' to 428, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54652)>echoing b'in parts.' to 440, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54654)>echoing b'in parts.' to 436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54653)>echoing b'in parts.' to 444, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54655)>closing 440, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54654)>closing 436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54653)>closing 444, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54655)>closing 428, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000), raddr=('127.0.0.1', 54652)>
修改client:

import socketimport sys

messages = [ b'This is the message. ',             b'It will be sent ',             b'in parts.',             ]server_address = ('localhost', 10000)

socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(4)]#定义4个client去连接server;  这里可以定义N个client

print('connecting to %s port %s' % server_address)for s in socks:    s.connect(server_address)

for message in messages:

    for s in socks:        print('%s: sending "%s"' % (s.getsockname(), message) )

        s.send(message)

    for s in socks:        data = s.recv(1024)        print( '%s: received "%s"' % (s.getsockname(), data) )        if not data:            print('closing socket', s.getsockname() )

执行结果与上面代码相同,只是定义4个client方式变了
  • 在 linux 中运行,linux 默认默认支持 socket 连接为 1024,需要修改一下
  • 这里最大不止 65535,也可以改成 100000 等数字,不是按照端口数量来限制的
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(3000)]#client的代码修改一下,修改client链接数量(默认linux支持1024)

linux使用epoll,最后执行很快

python gevent模块 下载_Python协程阻塞IO非阻塞IO同步IO异步IO相关推荐

  1. python gevent模块 下载_Python中的多任务,并行,并发,多线程,多进程,协程区别...

    多任务 CPU承担了所有的计算任务.一个CPU在一个时间切片里只能运行一个程序.当我们想同时运行多于一个程序的时候,就是多任务,例如同时运行微信,QQ,浏览器等等.多任务的目的是提升程序的执行效率,更 ...

  2. python从零开始到放弃_Python 协程从零开始到放弃

    0x00 前言 很久以前就听说 Python 的 async/await 很厉害,但是直到现在都没有用过,一直都在用多线程模型来解决各种问题.最近看到隔壁的 Go 又很火,所以决定花时间研究下 Pyt ...

  3. python xlrd模块下载_python xlrd模块介绍

    转载自:http://www.cnblogs.com/lhj588/archive/2012/01/06/2314181.html 一.安装xlrd模块 到python官网下载 二.使用介绍 1.导入 ...

  4. python os模块下载_Python OS模块目录文件处理

    Python编程语言优势特点比较突出,在Python语言中,有一种标准模块叫OS模块,Python OS模块包含普遍的操作系统功能,如果你希望你的程序能够与平台无关的话,这个模块尤为重要,它允许一个程 ...

  5. python gevent模块 下载_【python安全攻防】包、模块、类、对象

    终于又到了一周一度的整理博客的时间了,博主平时课余时间看书,周末统一整理,坚持周更真是爱了爱了 - 今天要说的是python面向对象这一部分的内容,今天这是基础篇的第二篇,也是最后一篇. 说来基础篇还 ...

  6. python re模块下载_python re模块详解

    1.正则表达式基础 1.1正则表达式概念 正则表达式并不是Python的一部分.正则表达式是用于处理字符串的强大工具,拥有自己独特的语法以及一个独立的处理引擎,效率上可能不如str自带的方法,但功能十 ...

  7. python paramiko模块下载_Python自动化运维实战:使用Python管理网络设备

    现在,我们已经知道如何在不同的操作系统中使用和安装Python以及如何使用EVE-NG搭建网络拓扑.在本章中,我们将学习如何使用目前常用的网络自动化库自动完成各种网络任务.Python可以在不同的网络 ...

  8. python pymysql模块下载_Python Pymysql模块

    Python Pymysql的使用 Pymysql的作用 简单来说:pymsql是Python中操作MySQL的模块,就是让我们通过python来实现对数据库的操作的 (1)pymysql模块的下载 ...

  9. python re模块下载_python: re模块

    在Python中可以使用正则表达式, Python提供re模块,包含所有正则表达式的功能.由于Python的字符串本身也用\转义,所以要特别注意:在字符串的前面加上 r 的前缀, 就不用考虑转义的问题 ...

最新文章

  1. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
  2. STM32添加项目所需要的工程文件
  3. 一个小度科技就估值200亿,百度现在被低估了吗?
  4. MFC获得当前应用程序目录的GetCurrentDirectory()和GetModuleFileName()函数
  5. 双非计算机专业考研西安交通大学,2020双非一战计算机专硕初试403经验贴
  6. MySQL Connector/C++入门教程(上)
  7. Java 面向对象编程 tricks
  8. 记录:pycharm的强大之处之两个文件代码的比对
  9. Go语言核心之美 3.5-JSON
  10. 程序员应该学习的一些数学知识
  11. 千呼万唤始出来,犹抱琵琶半遮面,揭开ArrayList的扩容机制的神秘面纱
  12. 关于PS课程中字体部分的学习总结
  13. 关于iTunes connect审核“Missing or invalid signature”的问题
  14. H5 前端模板 JS 用法
  15. 网易云音乐、QQ音乐等映射网络路径(下载选SQ或者hires即可 其他太大了……)
  16. Linux服务器绑定mac地址,linux下绑定mac地址
  17. 论AI小游戏是怎么练成的——『寻物大作战』原理揭秘
  18. 图片获取、格式转换与后台存储
  19. AirServer 7.2Mac 官方原版 完美激活
  20. 安川机器人焊枪切换设定方法_安川机器人焊枪使用注意事项

热门文章

  1. 南京农业大学计算机保研率,2016中国大学保研率排名出炉 江苏11所高校入百强...
  2. 怎么选?阿里P7 or 副处级干部?
  3. 真正开源的MongoDB的替代品,MangoDB!
  4. 还在忍受限速网盘?来搭建一套自己的私有网盘!
  5. 新年不宕机就等它了!戴尔官网高效编程电脑OptiPlex 直降2500,低至3099!
  6. 超有趣的几个Linux小命令
  7. 16进制ff转化为二进制_3秒钟快速转换十六进制为二进制
  8. CSS样式----文字样式
  9. 微信商城小程序操作为产品增加颜色尺寸长度等多规格内容
  10. numpy批量iou