原文在[1],翻译的在[2],这篇博客进一步解读一些代码,以及加了一些注释,并且所有代码都已经改成python3.x:

(斜线表示原文引用)

--------------------------------------------------------------------------------------------------------------------------------

Synchronous & Asynchronous Execution 同步&异步执行

The core idea of concurrency is that a larger task can be broken down into a collection of subtasks whose operation does not depend on the other tasks and thus can be run asynchronously instead of one at a time synchronously. A switch between the two executions is known as a context switch.

A context switch in gevent done through yielding. In this case example we have two contexts which yield to each other through invoking gevent.sleep(0).

并发的核心思想是一个更大的任务可以分解成多个子任务,其运行不依赖于其他任务的集合,因此可以异步运行 ,而不是一个在时间 同步。两个执行程序间的转换是一个关联转换。

在gevent中一个关联转换可以通过 yielding 来实现.在这个例子,两个程序的转换是通过调用 gevent.sleep(0).

import geventdef foo():print('Running in foo-foo函数')gevent.sleep(0)print('Explicit context switch to foo again-foo函数')def bar():print('Explicit context to bar-bar函数')gevent.sleep(0)print('Implicit context switch back to bar-bar函数')gevent.joinall([gevent.spawn(foo),gevent.spawn(bar),
])

It is illuminating to visualize the control flow of the program or walk through it with a debugger to see the context switches as they occur.

在调解器里面清楚地看到程序在两个转换之间是怎么运行的.

The real power of gevent comes when we use it for network and IO bound functions which can be cooperatively scheduled. Gevent has taken care of all the details to ensure that your network libraries will implicitly yield their greenlet contexts whenever possible. I cannot stress enough what a powerful idiom this is. But maybe an example will illustrate.

gevent真正的能力在于我们把它用于网络和IO相关的功能会很好的合作安排.

下面这个例子其实不是太好,select用法很不靠谱(略过)

import time
import gevent
from gevent import selectstart = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)def gr1():# Busy waits for a second, but we don't want to stick around...print('Started Polling: ', tic())select.select([], [], [], 2)print('Ended Polling: ', tic())def gr2():# Busy waits for a second, but we don't want to stick around...print('Started Polling: ', tic())select.select([], [], [], 2)print('Ended Polling: ', tic())def gr3():print("Hey lets do some stuff while the greenlets poll, at", tic())gevent.sleep(1)gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3),
])

A somewhat synthetic example defines a task function which is non-deterministic (i.e. its output is not guaranteed to give the same result for the same inputs). In this case the side effect of running the function is that the task pauses its execution for a random number of seconds.

一个比较综合的例子,定义一个task函数,它是不确定的(并不能保证相同的输入输出).在这种情况运行task函数的作用只是暂停其执行几秒钟的随机数.

import gevent
import random
import time
def task(pid):"""Some non-deterministic task"""gevent.sleep(random.randint(0,2)*0.001)print('Task', pid, 'done')def synchronous():for i in range(1,10):task(i)def asynchronous():threads = [gevent.spawn(task, i) for i in range(10)]gevent.joinall(threads)print('Synchronous:')
start=time.time()
synchronous()
end=time.time()
print("time interval=",end-start)
print('------------------------------------')
print('Asynchronous:')
start=time.time()
asynchronous()
end=time.time()
print("time interval=",end-start)

输出结果是:

Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
time interval= 0.010333538055419922
------------------------------------
Asynchronous:
Task 2 done
Task 5 done
Task 6 done
Task 8 done
Task 0 done
Task 1 done
Task 3 done
Task 4 done
Task 7 done
Task 9 done
time interval= 0.0026671886444091797
[Finished in 0.1s]

注意,这个异步的乱序在不同机子上是不一样的,仅仅在单机上是确定的。

In the synchronous case all the tasks are run sequentially, which results in the main programming blocking ( i.e. pausing the execution of the main program ) while each task executes.

在同步的情况所有任务都会顺序的运行,当每个任务执行的时候导致主程序 blocking.

The important parts of the program are the gevent.spawn which wraps up the given function inside of a Greenlet thread. The list of initialized greenlets are stored in the array threads which is passed to the gevent.joinall function which blocks the current program to run all the given greenlets. The execution will step forward only when all the greenlets terminate.

程序重要的部分是包装起来的函数gevent.spawn , 它是Greenlet的线程. 初始化的greenlets储存在一个数组threads ,然后提交给 gevent.joinall函数,然后阻塞当前的程序去运行所有greenlets.只有当所有greenlets停止的时候程序才会继续运行.

The important fact to notice is that the order of execution in the async case is essentially random and that the total execution time in the async case is much less than the sync case. In fact the maximum time for the synchronous case to complete is when each tasks pauses for 2 seconds resulting in a 20 seconds for the whole queue. In the async case the maximum runtime is roughly 2 seconds since none of the tasks block the execution of the others.

要注意的是异步的情况程序是无序的,异步的执行时间是远少于同步的.事实上同步去完成每个任务停止2秒的话,结果是要20秒才能完成整个队列.在异步的情况最大的运行时间大概就是2秒,因为每个任务的执行都不会阻塞其他的任务.

A more common use case, fetching data from a server asynchronously, the runtime of fetch() will differ between requests given the load on the remote server.

一个更常见的情况,是从服务器上异步获取数据,请求之间 fetch() 的运行时间会给服务器带来不同的负载.

import gevent.monkey
gevent.monkey.patch_socket()
import time
import gevent
import urllib3
# import simplejson as json
from urllib import request
import urllib
import json
# resp = request.urlopen('http://www.baidu.com')# 这个是在发送http请求
def fetch(pid):response = request.urlopen('http://quan.suning.com/getSysTime.do')# resp = urllib.request.urlopen(url)ele_json = json.loads(response.read())datetime = ele_json['sysTime1']print("Process", pid, datetime)return ele_json['sysTime1']def synchronous():for i in range(1,10):fetch(i)def asynchronous():threads = []for i in range(1,10):threads.append(gevent.spawn(fetch, i))#传入参数i给函数fetchgevent.joinall(threads)print('------------------Synchronous------------------')
start=time.time()
synchronous()
end=time.time()
print("同步运行的时间间隔=",end-start)
print('------------------Asynchronous------------------')
start=time.time()
asynchronous()
end=time.time()
print("异步运行的时间间隔=",end-start)

运行结果是:

------------------Synchronous------------------
Process 1 20191126215800
Process 2 20191126215800
Process 3 20191126215800
Process 4 20191126215801
Process 5 20191126215800
Process 6 20191126215801
Process 7 20191126215801
Process 8 20191126215801
Process 9 20191126215801
同步运行的时间间隔= 0.420764684677124
------------------Asynchronous------------------
Process 6 20191126215801
Process 8 20191126215801
Process 5 20191126215801
Process 1 20191126215801
Process 2 20191126215801
Process 9 20191126215801
Process 3 20191126215801
Process 7 20191126215801
Process 4 20191126215801
异步运行的时间间隔= 0.0831596851348877
[Finished in 0.8s]

Determinism 确定性

As mentioned previously, greenlets are deterministic. Given the same inputs and they always produce the same output. For example lets spread a task across a multiprocessing pool compared to a gevent pool.

正如之前提到的,greenlets是确定性的.给相同的输入就总会提供相同的输出.例如展开一个任务来比较一个multiprocessing pool和一个gevent pool.

import timedef echo(i):time.sleep(0.001)return i# Non Deterministic Process Poolfrom multiprocessing.pool import Poolp = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]print( run1 == run2 == run3 == run4 )# Deterministic Gevent Poolfrom gevent.pool import Poolp = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]print( run1 == run2 == run3 == run4 )

这段代码中的imap_unordered表示:

不保证返回的结果顺序与进程添加的顺序一致。

[3]中提到了上面代码中的非阻塞式版本
Pool.apply_async()和Pool.map_async() 

这段代码的意思就是示范下异步的不确定性。

Even though gevent is normally deterministic, sources of non-determinism can creep into your program when you begin to interact with outside services such as sockets and files. Thus even though green threads are a form of "deterministic concurrency", they still can experience some of the same problems that POSIX threads and processes experience.

The perennial problem involved with concurrency is known as a race condition. Simply put is when two concurrent threads / processes depend on some shared resource but also attempt to modify this value. This results in resources whose values become time-dependent on the execution order. This is a problem, and in general one should very much try to avoid race conditions since they result program behavior which is globally non-deterministic.*

The best approach to this is to simply avoid all global state all times. Global state and import-time side effects will always come back to bite you!

Spawning Threads(下面开始的一些东西的意思是你可以像使用gevent一样使用Greenlet)

gevent provides a few wrappers around Greenlet initialization. Some of the most common patterns are:

gevent提供了一些Greenlet初始化的封装.部分比较常用的模块是:


import gevent
from gevent import Greenletdef foo(message, n):"""Each thread will be passed the message, and n argumentsin its initialization."""gevent.sleep(n)print(message)# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)# Wrapper for creating and runing a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)threads = [thread1, thread2, thread3]# Block until all threads complete.
gevent.joinall(threads)

Hello
I live!

In addition to using the base Greenlet class, you may also subclass Greenlet class and overload the _run method.

除了用Greenlet的基类,你也可以用Greenlet的子类,重载_run 方法.


from gevent import Greenletclass MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n)g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

运行结果是:

Hi there!

Greenlet State 状态

Like any other segment of code, Greenlets can fail in various ways. A greenlet may fail to throw an exception, fail to halt or consume too many system resources.

像其他编程,Greenlets会以不同的方式失败.一个greenlet可能会抛出一个异常, 失败会使程序停止或者消耗系统很多资源.

The internal state of a greenlet is generally a time-dependent parameter. There are a number of flags on greenlets which let you monitor the state of the thread

greenlet内部的状态通常是一个按时间变化的参数.以下几个状态让你可以监听线程的状态.

  • started -- Boolean, indicates whether the Greenlet has been started. 表明是否Greenlet已经开始
  • ready() -- Boolean, indicates whether the Greenlet has halted. 表明是否Greenlet已经停止
  • successful() -- Boolean, indicates whether the Greenlet has halted and not thrown an exception. 表明是否Greenlet已经停止并且没有抛出异常
  • value -- arbitrary, the value returned by the Greenlet. 任意,Greenlet返回的值
  • exception -- exception, uncaught exception instance thrown inside the greenlet  异常,greenlet内部实例没有被捕抓的异常

import geventdef win():return 'You win!'def fail():raise Exception('You fail at failing.')winner = gevent.spawn(win)
loser = gevent.spawn(fail)print(winner.started) # True
print(loser.started)  # True# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:gevent.joinall([winner, loser])
except Exception as e:print('This will never be reached')print(winner.value) # 'You win!'
print(loser.value)  # Noneprint(winner.ready()) # True
print(loser.ready())  # Trueprint(winner.successful()) # True
print(loser.successful())  # False# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.print(loser.exception)# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

实验结果:
True
True
You win!
None
True
True
True
False
You fail at failing.

Program Shutdown 程序关闭

Greenlets that fail to yield when the main program receives a SIGQUIT may hold the program's execution longer than expected. This results in so called "zombie processes" which need to be killed from outside of the Python interpreter.

当主程序接受到一个SIGQUIT的时候,Greenlets的失败可能会让程序的执行比预想中长时间.这样的结果称为"zombie processes" ,需要让Python解析器以外的程序杀掉.

A common pattern is to listen SIGQUIT events on the main program and to invoke gevent.shutdown before exit.

一个常用的模块是在主程序中监听SIGQUIT事件和退出前调用 gevent.shutdown .

import gevent
import signaldef run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal(signal.SIGQUIT, gevent.shutdown)#这行代码是对协程运行失败进行监听,这行代码已经无法运行了,因为不存在signal.SIGQUITthread = gevent.spawn(run_forever)thread.join()

Timeouts 超时设定

Timeouts are a constraint on the runtime of a block of code or a Greenlet.

超时是对一推代码或者一个Greenlet运行时间的一种约束.


import gevent
from gevent import Timeoutseconds = 10timeout = Timeout(seconds)
timeout.start()#下面的excetp Timeout会监听上面的timeout设定
#-------------------------------------------------
def wait():gevent.sleep(10)try:gevent.spawn(wait).join()
except Timeout:print 'Could not complete'

Or with a context manager in a with a statement.

或者是带着一个语境的管理在一个with的状态.

import gevent
from gevent import Timeouttime_to_wait = 5 # secondsclass TooLong(Exception):passwith Timeout(time_to_wait, TooLong):gevent.sleep(10)

In addition, gevent also provides timeout arguments for a variety of Greenlet and data stucture related calls. For example:

另外,gevent同时也提供timeout的参数给各种Greenlet和数据结构相关的调用.例如:

import gevent
from gevent import Timeoutdef wait():gevent.sleep(2)timer = Timeout(1).start()
thread1 = gevent.spawn(wait)#---------------上面的timer被下面的try使用,然后except补充try中的timer-------------------------------
try:thread1.join(timeout=timer)
except Timeout:print('Thread 1 timed out')# --timer = Timeout.start_new(1)#换个定时器
thread2 = gevent.spawn(wait)
#----------------------------------------------
try:thread2.get(timeout=timer)
except Timeout:print('Thread 2 timed out')# --try:gevent.with_timeout(1, wait)
except Timeout:print('Thread 3 timed out')

Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

Data Structures 数据结构

Events 事件(根据[4][6]中的说法,AsyncResult不实用,可以用于协程之间的通信)

Events are a form of asynchronous communication between Greenlets.

事件是Greenlets内部一种异步通讯的形式.

import gevent
from gevent.event import AsyncResulta = AsyncResult()def setter():"""After 3 seconds set wake all threads waiting on the value ofa."""gevent.sleep(3)a.set()def waiter():"""After 3 seconds the get call will unblock."""a.get() # blockingprint 'I live!'gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter),
])

A extension of the Event object is the AsyncResult which allows you to send a value along with the wakeup call. This is sometimes called a future or a deferred, since it holds a reference to a future value that can be set on an arbitrary time schedule.

Event对象的一个扩展AsyncResult,可以让你发送一个值连同唤醒调用.这样有时候调用一个将来或者一个延迟,然后它就可以保存涉及到一个将来的值可以用于任意时间表.

import gevent
from gevent.event import AsyncResult
a = AsyncResult()def setter():"""After 3 seconds set the result of a."""gevent.sleep(3)a.set('Hello!')def waiter():"""After 3 seconds the get call will unblock after the setterputs a value into the AsyncResult."""print a.get()gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter),
])

Queues 队列(关于Queues的用法可以参考[5])

Queues are ordered sets of data that have the usual put / get operations but are written in a way such that they can be safely manipulated across Greenlets.

Queues是一组数据的排序,有常用的 put / get操作,但也可以以另一种方式写入,就是当他们在Greenlets之间可以安全地操作.

For example if one Greenlet grabs an item off of the queue, the same item will not grabbed by another Greenlet executing simultaneously.

import gevent
from gevent.queue import Queuetasks = Queue()#消耗队列中的数据
def worker(n):while not tasks.empty():task = tasks.get()print('Worker %s got task %s' % (n, task))gevent.sleep(0)print('Quitting time!')#放入队列中
def boss():for i in xrange(1,25):tasks.put_nowait(i)gevent.spawn(boss).join()gevent.joinall([gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'nancy'),
])

例如如果一个Greenlet在队列中取出一个元素,同样的元素就不会被另一个正在执行的Greenlet取出.

实验结果:

Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!

Queues can also block on either put or get as the need arises.

Queues也可以在 put或者get的时候阻塞,如果有必要的话.

Each of the put and get operations has a non-blocking counterpart, put_nowait and get_nowait which will not block, but instead raise either gevent.queue.Emptyor gevent.queue.Full in the operation is not possible.

每个putget操作都有一个对应的非阻塞的函数:put_nowaitget_nowait,但在操作中抛出gevent.queue.Empty或者gevent.queue.Full是不可能的.

In this example we have the boss running simultaneously to the workers and have a restriction on the Queue that it can contain no more than three elements. This restriction means that the put operation will block until there is space on the queue. Conversely the get operation will block if there are no elements on the queue to fetch, it also takes a timeout argument to allow for the queue to exit with the exception gevent.queue.Empty if no work can found within the time frame of the Timeout.

在这个例子中,我们有一个boss同时给工人任务,有一个限制是说队列中不能超过3个元素(这个3应该是受限制于worker数量),这个限制意味着:当队伍中没有空间,put操作会阻塞.相反的,如果队列中没有元素可取,get操作会阻塞,也可以加入一个timeout的参数来允许队列带着一个异常gevent.queue.Empty退出,如果在Timeout时间范围内没有工作.

import gevent
from gevent.queue import Queue, Emptytasks = Queue(maxsize=3)def worker(n):try:while True:task = tasks.get(timeout=1) # decrements queue size by 1print('Worker %s got task %s' % (n, task))gevent.sleep(0)except Empty:print('Quitting time!')def boss():"""Boss will wait to hand out work until a individual worker isfree since the maxsize of the task queue is 3."""for i in xrange(1,10):tasks.put(i)print('Assigned all work in iteration 1')for i in xrange(10,20):tasks.put(i)print('Assigned all work in iteration 2')gevent.joinall([gevent.spawn(boss),gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'bob'),
])

Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!

(下面这两段代码没什么用)

import geventclass Actor(gevent.Greenlet):def __init__(self):self.inbox = queue.Queue()Greenlet.__init__(self)def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)

In a use case:

import gevent
from gevent.queue import Queue
from gevent import Greenletclass Pinger(Actor):def receive(self, message):print messagepong.inbox.put('ping')gevent.sleep(0)class Ponger(Actor):def receive(self, message):print messageping.inbox.put('pong')gevent.sleep(0)ping = Pinger()
pong = Ponger()ping.start()
pong.start()ping.inbox.put('start')
gevent.joinall([ping, pong])

Real World Applications

Gevent ZeroMQ

ZeroMQ is described by its authors as "a socket library that acts as a concurrency framework". It is a very powerful messaging layer for building concurrent and distributed applications.

ZeroMQ根据其作者的描述是"一个socket库作为一个并发性的框架".它是非常强大的消息传送层在建立并发性结构和分布式应用的时候.

ZeroMQ provides a variety of socket primitives, the simplest of which being a Request-Response socket pair. A socket has two methods of interest send and recv, both of which are normally blocking operations. But this is remedied by a briliant library by Travis Cline which uses gevent.socket to poll ZeroMQ sockets in a non-blocking manner. You can install gevent-zeromq from PyPi via: pip install gevent-zeromq

ZeroMQ提供了各种socket基元,最简单的就是一对Request-Response socket. 一个socket有2个有用方法 sendrecv,两者一般都会有阻塞操作.但这已经被一个作者叫Travis Cline,基于gevent 写的briliant库补救了.socket 属于 ZeroMQ sockets 是一种不会阻塞的方式.你可以安装 gevent-zeromq 从 PyPi 取到: pip install gevent-zeromq

(下面这个代码没有太大价值,留意bind和connect的成对写法)


# Note: Remember to ``pip install pyzmq gevent_zeromq``
import gevent
from gevent_zeromq import zmq# Global Context
context = zmq.Context()def server():server_socket = context.socket(zmq.REQ)server_socket.bind("tcp://127.0.0.1:5000")for request in range(1,10):server_socket.send("Hello")print('Switched to Server for ', request)# Implicit context switch occurs hereserver_socket.recv()def client():client_socket = context.socket(zmq.REP)client_socket.connect("tcp://127.0.0.1:5000")for request in range(1,10):client_socket.recv()print('Switched to Client for ', request)# Implicit context switch occurs hereclient_socket.send("World")publisher = gevent.spawn(server)
client    = gevent.spawn(client)gevent.joinall([publisher, client])

Switched to Server for  1
Switched to Client for  1
Switched to Server for  2
Switched to Client for  2
Switched to Server for  3
Switched to Client for  3
Switched to Server for  4
Switched to Client for  4
Switched to Server for  5
Switched to Client for  5
Switched to Server for  6
Switched to Client for  6
Switched to Server for  7
Switched to Client for  7
Switched to Server for  8
Switched to Client for  8
Switched to Server for  9
Switched to Client for  9

Simple Telnet Servers(常见工作场景不需要自己去搭建这样的Telnet服务器的)


# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``from gevent.server import StreamServerdef handle(socket, address):socket.send("Hello from a telnet!\n")for i in range(5):socket.send(str(i) + '\n')socket.close()server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI Servers

Gevent provides two WSGI servers for serving content over HTTP. Henceforth called wsgi and pywsgi:

Gevent提供2个WSGI服务器用于HTTP.称为wsgipywsgi:

  • gevent.wsgi.WSGIServer
  • gevent.pywsgi.WSGIServer

In earlier versions of gevent before 1.0.x, gevent used libevent instead of libev. Libevent included a fast HTTP server which was used by gevent's wsgi server.

在1.0x前的gevent版本,gevent用libevent代替libev.Libevent包含一个快的HTTP服务用于gevent的wsgi服务器.

In gevent 1.0.x there is no http server included. Instead gevent.wsgi it is now an alias for the pure Python server in gevent.pywsgi.

在gevent1.0.x这里没有http服务器包含,gevent.wsgi现在已经是纯Python服务器gevent.pywsgi的别名.

Streaming Servers 流式服务器

If you are using gevent 1.0.x, this section does not apply

如果你用的是gevent1.0.x,这部分是不适用的.

For those familiar with streaming HTTP services, the core idea is that in the headers we do not specify a length of the content. We instead hold the connection open and flush chunks down the pipe, prefixing each with a hex digit indicating the length of the chunk. The stream is closed when a size zero chunk is sent.

和那些流式HTTP服务器相似,核心意见是在headers中,我们不指定内容的长度.我们用保持连接打到管道接收缓冲块,在每一块的前缀用十六进制表明块的长度,当块的长度是0发送的时候,流就会关闭.

实验结果:
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

8
<p>Hello

9
World</p>

0

The above HTTP connection could not be created in wsgi because streaming is not supported. It would instead have to buffered.

以上的HTTP连接,不能用于创建wsgi,因为流是不支持的,我们用缓冲的方法.

from gevent.wsgi import WSGIServerdef application(environ, start_response):status = '200 OK'body = '<p>Hello World</p>'headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()

Using pywsgi we can however write our handler as a generator and yield the result chunk by chunk.

使用pywsgi我们把处理当作一个发生器,一块接一块的返回结果.

from gevent.pywsgi import WSGIServerdef application(environ, start_response):status = '200 OK'headers = [('Content-Type', 'text/html')]start_response(status, headers)yield "<p>Hello"yield "World</p>"WSGIServer(('', 8000), application).serve_forever()

But regardless, performance on Gevent servers is phenomenal compared to other Python servers. libev is a very vetted technology and its derivative servers are known to perform well at scale.

To benchmark, try Apache Benchmark ab or see this Benchmark of Python WSGI Servers for comparison with other servers.

但是不管怎样,gevent跟其他python服务器对比是非凡的.libev是一个非常vetted的技术,和它派生出来的服务器已被认可是表现良好的.

用基准问题测试,尝试 Apache Benchmark ab 看这个 Benchmark of Python WSGI Servers 和其他服务的比较.

$ ab -n 10000 -c 100 http://127.0.0.1:8000/

Long Polling(下面这个代码在现实场景需要拆分成两份分别运行)

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as jsondata_source = Queue()def producer():while True:data_source.put_nowait('Hello World')gevent.sleep(1)def ajax_endpoint(environ, start_response):status = '200 OK'headers = [('Content-Type', 'application/json')]try:datum = data_source.get(timeout=5)except Empty:datum = []start_response(status, headers)return json.dumps(datum)gevent.spawn(producer)WSGIServer(('', 8000), ajax_endpoint).serve_forever()

Websockets(下面这个实验可以用flask-socketio复现)

Websocket example which requires gevent-websocket.(pip install gevent-websocket)

Websocket例子需要 gevent-websocket.

# Simple gevent-websocket server
import json
import randomfrom gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandlerclass WebSocketApp(object):'''Send random data to the websocket'''def __call__(self, environ, start_response):ws = environ['wsgi.websocket']x = 0while True:data = json.dumps({'x': x, 'y': random.randint(1, 5)})ws.send(data)x += 1sleep(0.5)server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),handler_class=WebSocketHandler)
server.serve_forever()

HTML Page:

<html><head><title>Minimal websocket application</title><script type="text/javascript" src="jquery.min.js"></script><script type="text/javascript">$(function() {// Open up a connection to our servervar ws = new WebSocket("ws://localhost:10000/");// What do we do when we get a message?ws.onmessage = function(evt) {$("#placeholder").append('<p>' + evt.data + '</p>')}// Just update our conn_status field with the connection statusws.onopen = function(evt) {$('#conn_status').html('<b>Connected</b>');}ws.onerror = function(evt) {$('#conn_status').html('<b>Error</b>');}ws.onclose = function(evt) {$('#conn_status').html('<b>Closed</b>');}});</script></head><body><h1>WebSocket Example</h1><div id="conn_status">Not Connected</div><div id="placeholder" style="width:600px;height:300px;"></div></body>
</html>

Chat Server

The final motivating example, a realtime chat room. This example requires Flask ( but not neccesarily so, you could use Django, Pyramid, etc ). The corresponding Javascript and HTML files can be found here.

最后一个激励的例子,一个实时的聊天室.这个例子需要Flask(但不是必须的,你可以用Django, Pyramid等等).相应的Javascript 和 HTML 文件可以在这里找到.

自己註釋和解讀後的代碼在這裏:

https://github.com/appleyuchi/minichat

# Micro gevent chatroom.
# ----------------------from flask import Flask, render_template, requestfrom gevent import queue
from gevent.pywsgi import WSGIServerimport simplejson as jsonapp = Flask(__name__)
app.debug = Truerooms = {'topic1': Room(),'topic2': Room(),
}users = {}class Room(object):def __init__(self):self.users = set()self.messages = []def backlog(self, size=25):return self.messages[-size:]def subscribe(self, user):self.users.add(user)def add(self, message):for user in self.users:print useruser.queue.put_nowait(message)self.messages.append(message)class User(object):def __init__(self):self.queue = queue.Queue()@app.route('/')
def choose_name():return render_template('choose.html')@app.route('/<uid>')
def main(uid):return render_template('main.html',uid=uid,rooms=rooms.keys())@app.route('/<room>/<uid>')
def join(room, uid):user = users.get(uid, None)if not user:users[uid] = user = User()active_room = rooms[room]active_room.subscribe(user)print 'subscribe', active_room, usermessages = active_room.backlog()return render_template('room.html',room=room, uid=uid, messages=messages)@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):user = users[uid]room = rooms[room]message = request.form['message']room.add(':'.join([uid, message]))return ''@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):try:msg = users[uid].queue.get(timeout=10)except queue.Empty:msg = []return json.dumps(msg)if __name__ == "__main__":http = WSGIServer(('', 5000), app)http.serve_forever()

Reference:

[1]http://sdiehl.github.com/gevent-tutorial/

[2]https://www.cnblogs.com/bjdxy/archive/2012/11/27/2790854.html

[3]https://www.jianshu.com/p/b1934ff22b06

[4]https://blog.51cto.com/rfyiamcool/1538367

[5]https://blog.csdn.net/Croyance_M/article/details/100605415

[6]https://yq.aliyun.com/articles/402343

gevent-tutorial翻译和解读相关推荐

  1. DS/ML:《Top 19 Skills You Need to Know in 2023 to Be a Data Scientist,2023年成为数据科学家需要掌握的19项技能》翻译与解读

    DS/ML:<Top 19 Skills You Need to Know in 2023 to Be a Data Scientist,2023年成为数据科学家需要掌握的19项技能>翻译 ...

  2. LLMs:《Building LLM applications for production构建用于生产的LLM应用程序》翻译与解读

    LLMs:<Building LLM applications for production构建用于生产的LLM应用程序>翻译与解读 LLMs:构建用于生产的LLM应用程序的挑战与案例经验 ...

  3. Arcface v1 论文翻译与解读

    神罗Noctis 2019-10-13 16:14:39  543  收藏 4 展开 论文地址:http://arxiv.org/pdf/1801.07698v1.pdf 最新版本v3的论文翻译:Ar ...

  4. AI:Algorithmia《2021 enterprise trends in machine learning 2021年机器学习的企业趋势》翻译与解读

    AI:Algorithmia<2021 enterprise trends in machine learning 2021年机器学习的企业趋势>翻译与解读 目录 <2021 ent ...

  5. Paper:《NÜWA: Visual Synthesis Pre-training for Neural visUal World creAtion,女娲:用于神经视觉世界创造的视觉》翻译与解读

    Paper:<NÜWA: Visual Synthesis Pre-training for Neural visUal World creAtion,女娲:用于神经视觉世界创造的视觉>翻 ...

  6. AI:《DEEP LEARNING’S DIMINISHING RETURNS—深度学习的收益递减》翻译与解读

    AI:<DEEP LEARNING'S DIMINISHING RETURNS-深度学习的收益递减>翻译与解读 导读:深度学习的收益递减.麻省理工学院的 Neil Thompson 和他的 ...

  7. AI:Algorithmia《2020 state of enterprise machine learning—2020年企业机器学习状况》翻译与解读

    AI:Algorithmia<2020 state of enterprise machine learning-2020年企业机器学习状况>翻译与解读 目录 <2020 state ...

  8. Paper:《Spatial Transformer Networks》的翻译与解读

    Paper:<Spatial Transformer Networks>的翻译与解读 目录 <Spatial Transformer Networks>的翻译与解读 Abstr ...

  9. Paper:《A Few Useful Things to Know About Machine Learning—关于机器学习的一些有用的知识》翻译与解读

    Paper:<A Few Useful  Things to  Know About  Machine  Learning-关于机器学习的一些有用的知识>翻译与解读 目录 <A Fe ...

  10. Paper:《First Order Motion Model for Image Animation》翻译与解读

    Paper:<First Order Motion Model for Image Animation>翻译与解读 目录 <First Order Motion Model for ...

最新文章

  1. logstash解析系统的messages日志
  2. 32.C#--方法中使用out参数做登录判断
  3. weka源码编译步骤
  4. 通过 SSH 端口转发实现异地内网服务器互通
  5. 瓜子二手车在 Dubbo 版本升级、多机房方案方面的思考和实践
  6. css布局方式_网页布局都有哪种?一般都用什么布局?
  7. 删除vs的调试其他软件的功能
  8. 报名啦!旷视研究院解读COCO 2017物体检测夺冠算法 | 吃瓜社
  9. 数据结构与算法:十大排序算法之归并排序
  10. checksum建立的索引
  11. C# 判断字符串中文汉字
  12. js页面跳转并传值问题
  13. 复习用vue写tabbar
  14. 获取iv和encryptedData
  15. 「网站收录查询」百度收录批量查询工具主要有哪些?
  16. Robotframework(三)常用API介绍
  17. Docker资源控制的Cgroup--CPU权重等--Block IO、bps和iops 的限制详细操作
  18. PHP生成图形验证码
  19. sflow-rt 3.0 安装
  20. 页面布局整理汇总,让你彻底搞明白多种布局的关系

热门文章

  1. [转载]Unix 高手的另外 10 个习惯
  2. JS:a=b=c,a=?
  3. 计算机系统维护技术txt,计算机系统维护技术A卷
  4. python面向对象类_python:面向对象(类)
  5. axios 最全 请求拦截器 响应拦截器 配置公共请求头 超时时间 以及get delete post put 四种请求传参方式
  6. oracle ctl file constantnull,Oracle 的一些导入和导出方法
  7. 函的红头文件格式制作_Excel实现批量制作年会邀请函,这个方法,你一定不能错过...
  8. Tapestry框架概述
  9. 使用百度UEditor
  10. linux定时备份mysql数据库文件脚本