为什么我们需要EventMachine?

我们通常说的Ruby解释器里的Ruby线程是Green Thread:即程序里面的线程不会真正映射到操作系统的线程,而是由语言运行平台自身来调度,并且这种线程的调度不是并行的。

关于Ruby的并发问题这里有一个权威的解释:http://www.igvita.com/2008/11/13/concurrency-is-a-myth-in-ruby

这篇文章提到造成这种情况的原因主要是由于Ruby解释器和VM中GIL(Global Interpreter Lock,如下图所示)的存在,使得Ruby始终无法真正的享受多核带来的好处,尽管在Ruby1.9的解释器已经能够使用多个系统级别的线程,但是GIL为了保证我们代码的线程安全,只允许同一时刻运行一个单一的线程。当然,事情并不是绝对的,下图最右的JRuby则把线程调度的工作交给了JVM从而实现任务的并发执行。

所以,基于Ruby的复杂应用大多采用了这样的一种策略:使用推迟(defer)并发(parallelize)的方法来处理程序中的网络I/O部分,而不是引入线程到应用程序中。

EventMachine 就是一个基于Reactor设计模式的、用于网络编程和并发编程的框架。Reactor模式描述了一种服务处理器——它接受事件并将其分发给已注册的事件处理。这种模式的好处就是清晰的分离了时间分发和处理事件的应用程序逻辑,而不需引入多线程来把代码复杂化。

EventMachine 本身提供了操作方便的网络套接字和隐藏底层操作的编程接口,这使得EM在CloudFoundry中被广泛的使用着。接下来,我们将对其机制进行一个简单的说明。

Reactor Pattern

“The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.”——wiki

p.s. 这样的工作方式有些类似于Observer Pattern,但后者只监听一个固定主题的消息。

上述定义可以用下面的图示来描述,其中灰色的部分就是Reactor:

Demultiplexer:是单进程阻塞式的主事件循环(event loop)。只要它没有被阻塞,它就能够将请求交给event dispatcher。

Dispatcher:负责event handler的注册和取消注册,并将来自Demultiplexer的请求交给关联的event handler。

Event handler:是最终处理请求的部分。

1、一个最简单基于EM的HttpServer的例子

require 'rubygems'
require 'eventmachine'class Echo < EM::Connection  def receive_data(data)  send_data(data)  end
endEM.run do  EM.start_server("0.0.0.0", 10000, Echo)
end

在另一个窗口,输入hello,服务器返回hello:

telnet localhost 10000Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

仔细看一下上面的程式:EM帮我们启动了一个server,监听端口10000,而Echo实例(继承了Connection,用来处理连接)则重写了receive_data方法来实现服务逻辑。

而EM.run实际上就是启动了Reactor,它会一直运行下去直到stop被调用之后EM#run之后的代码才会被执行到。Echo类的实例实际上是与一个File Descriptor注册在了一起(Linux把一切设备都视作文件,包括socket),一旦该fd上有事件发生,Echo的实例就会被调用来处理相应事件。

在CloudFoundry中,组件的启动大多是从EM.run开始的,并且出于在多路复用I/O操作时提高效率和资源利用率的考虑,CF组件往往会在EM.run之前先调用EM.epoll (EM默认使用的select调用),比如:

EM.epollEM.run {...NATS.start(:uri => @nats_uri) do# do somethingend...}

2、EM定时器(Timer)

EM中有两种定时器,add_timer添加的是一次性定时器,add_periodic_timer添加的是周期性定时器。

require 'eventmachine'
EM.run dop = EM::PeriodicTimer.new(1) doputs "Tick ..."endEM::Timer.new(5) doputs "BOOM"p.cancelendEM::Timer.new(8) doputs "The googles, they do nothing"EM.stopend
end#输出:
Tick...
Tick...
Tick...
Tick...
BOOM
The googles, they do nothing

细节:我们在第一个EM::PeriodicTimer代码块中,传入了另外一个代码块:puts “Tick”。这里实际上告诉了 EM 每隔1秒触发一个事件,然后才调用puts代码块,这里的puts代码块就是回调


3、推迟和并发处理

EM#defer和EM#next_tick发挥作用的地方分别是:1、长任务应该放到后台运行;2、一旦这些任务被转移到后台,Reactor能够立刻回来工作。。

EM#defer方法

负责把一个代码块(block)调度到EM的线程池中执行(这里固定提供了20个线程),而defer的Callback参数指定的方法将会在主线程(即Reactor线程)中执行,并接收 后台线程的返回值作为Callback块的参数。

require 'eventmachine'
require 'thread'EM.run doEM.add_timer(2) doputs "Main #{Thread.current}"EM.stop_event_loopendEM.defer doputs "Defer #{Thread.current}"end
endDefer #<Thread:0x7fa871e33e08>
#两秒后
Main #<Thread:0x7fa87449b370> 

执行示意图如下:

EM#defer+Callback的用法:

require 'rubygems'
require 'eventmachine'EM.run doop = proc do2+2endcallback = proc do |count|puts "2 + 2 == #{count}"EM.stopendEM.defer(op, callback)
end# the return value of op is passed to callback
#2 + 2 == 4

EM#next_tick方法

负责将一个代码块调度到Reactor的下一次迭代中执行,执行任务的是Reactor主线程。所以,next_tick部分的代码不会立刻执行到,具体的调度是由EM完成的。

require 'eventmachine'  EM.run do  EM.add_periodic_timer(1) do  puts "Hai"  end EM.add_timer(5) do  EM.next_tick do  EM.stop_event_loop  end  end
end 

这里Reactor执行的过程用是同步的,所以太长的Reactor任务会长时间阻塞Reactor进程。EventMachine中有一个最基本原则我们必须记住:Never block the Reactor!

正是由于上述原因,next_tick的一个很常见的用法是递归的调用方式,将一个长的任务分配到Reactor的不同迭代周期去执行。

正常的循环代码:

 n = 0while n < 1000  do_somethingn += 1end

使用next_tick来处理:

require 'rubygems'
require 'eventmachine'EM.run don = 0  do_work = proc{  if n < 1000  do_somethingn += 1 EM.next_tick(do_work)elseEM.stopend}EM.next_tick(do_work)
end

next_tick中的block执行如红色的Task 1所示:

如上图所示那样,next_tick使单进程的Reactor给其他任务运行的机会——我们不想阻塞住Reactor,但我们也不愿引入Ruby线程,所以才有了这种方法。

next_tick在CloudFoundry中应用非常广泛,比如下面Router启动的一部分代码:

  # Setup a start sweeper to make sure we have a consistent view of the world.EM.next_tick do# Announce our existenceNATS.publish('router.start', @hello_message)# Don't let the messages pile up if we are in a reconnecting stateEM.add_periodic_timer(START_SWEEPER) dounless NATS.client.reconnecting?NATS.publish('router.start', @hello_message)endendend

与next_tick同样作用的方法还有 EM.schedule,后者会不停地判断当前线程是不是Reactor线程。

next_tick还有个用处:当你通过defer方法把一个代码端调度到线程池中执行,然后又需要在主线程中使用EM::HttpClient来发一个出站连接,这时你就可以在前面的代码段里使用next_tick创建这个连接。

4、EM提供的轻量级的并发机制

EvenMachine内置了两钟轻量级的并发处理机制:Deferrables和SpawnedProcesses。

EM::Deferrable

如果在一个类中include了EM::Deferrable,就可以把Callback和Errback关联到这个类的实例。

一旦执行条件被触发,Callback和Errback会按照与实例关联的顺序执行起来。

对应实例的#set_deferred_status方法就用来负责触发机制:

当该方法的参数是:succeeded,则触发callbacks;而如果参数是:failed,则触发errbacks。触发之后,这些回调将会在主线程立即得到执行。当然你还可以在回调中(callbacks和errbacks)再次调用#set_deferred_status,改变状态。

require 'eventmachine'class MyDeferrableinclude EM::Deferrabledef go(str)puts "Go #{str} go"end
endEM.run dodf = MyDeferrable.newdf.callback do |x|df.go(x)EM.stopendEM.add_timer(1) dodf.set_deferred_status :succeeded, "SpeedRacer"end
end#1s 之后:
Go SpeedRacer go

EM::SpawnedProcess

这个方法的设计思想是:允许我们创建一个进程,把一个代码段绑定到这个进程上。然后我们就可以在某个时刻,让spawned实例被#notify方法触发,从而执行关联好的代码段。

它与Deferrable的不同之处就在于,这个block并不会立刻被执行到。

require 'rubygems'
require 'eventmachine'EM.run dos = EM.spawn do |val|puts "Received #{val}"endEM.add_timer(1) dos.notify "hello"endEM.add_periodic_timer(1) doputs "Periodic"endEM.add_timer(3) doEM.stopend
end#1s之后同时输出前两个,第二秒后输出Periodic
Periodic
Received hello
Periodic

注意这两种机制的使用方式是不一样的:一个是作为内部类include 进去进而使用其定义的callback 来执行的;而另一个是直接使用spawn 实例通过notify 来触发代码块来执行的。

5、使用EM简化网络编程

网络编程的简单化是EM的一大特色。拿前面的Echo 举例子,我们通过十分简洁的代码就可以实现一个HttpServer 的功能。

我们其实还有更简洁的方式来完成这个工作,比如使用module :

require 'eventmachine'module Echodef receive_data(data)send_data(data)end
endEM.run doEM.start_server("0.0.0.0", 10000, Echo)
end

以及直接使用block :

require 'eventmachine'EM.run doEM.start_server("0.0.0.0", 10000) do |srv|def srv.receive_data(data)send_data(data)endend
end

事实上,每次你新建一个连接,一个新的包含了你代码的匿名类就会被创建。理论上,不同的连接不能互相交换信息,这一点很重要。不过EM 实际上在设计时以及解决这个问题:

require 'rubygems'
require 'eventmachine'class Pass < EM::Connectionattr_accessor :a, :bdef receive_data(data)send_data "#{@a} #{data.chomp} #{b}"end
endEM.run doEM.start_server("127.0.0.1", 10000, Pass) do |conn|conn.a = "Goodbye"conn.b = "world"end
end

通过给start_server 添加一个块,EM 会把把Pass 的实例传进去(这个操作发生时实例已经被初始化但是客户端数据还没收到)。这样我们可以用这种方法为每个实例set 值。

下面我们 使用EventMachine 建立一个客户端,这是非常简单的事情:

require 'rubygems'
require 'eventmachine'class Connector < EM::Connectiondef post_initputs "Getting /"send_data "GET / HTTP/1.1\r\nHost: MagicBob\r\n\r\n"enddef receive_data(data)puts "Received #{data}"puts "Received #{data.length} bytes"end
endEM.run doEM.connect('127.0.0.1', 10000, Connector)
end

除了使用EM#connect 之外,客户端同服务器端的代码是一样的。其实EM::Connection 类中还有很多有用的方法等着你实现:

post_init    当实例创建好,连接还没有完全建立的时候调用。一般用来做初始化
connection_completed    连接完全建立好的时候调用
receive_data(data)    当收到另一端的数据时调用。数据是成块接收的
unbind    当客户端断开连接的时候调用

此外,还有#close_connection#close_connection_after_writing 这两个方法供用户断开连接。

下面给出一个更完整的例子,设置最大连接次数:

require 'rubygems'
require 'eventmachine'module LineCounterMaxLinesPerConnection = 10def post_initputs "Received a new connection"@data_received = ""@line_count = 0enddef receive_data data@data_received << datawhile @data_received.slice!( /^[^\n]*[\n]/m )@line_count += 1send_data "received #{@line_count} lines so far\r\n"@line_count == MaxLinesPerConnection and close_connection_after_writingendendendEventMachine::run {host,port = "192.168.0.100", 8090EventMachine::start_server host, port, LineCounterputs "Now accepting connections on address #{host}, port #{port}..."EventMachine::add_periodic_timer( 10 ) { $stderr.write "*" }}

6、EventMachine的并发处理能力测试

“基于ruby事件驱动的服务器非常适合轻量级的请求,但对于长时间的请求,则性能不佳”。我们下面的例子将告诉你这样的认识其实是不对的。(需要用eventmachine_httpserver来处理http请求和发送响应)

require 'rubygems'
require 'eventmachine'
require 'evma_httpserver'class Handler  < EventMachine::Connectioninclude EventMachine::HttpServerdef process_http_requestresp = EventMachine::DelegatedHttpResponse.new( self )sleep 2 # Simulate a 2s long running requestresp.status = 200resp.content = "Hello World!"resp.send_responseend
endEventMachine::run {EventMachine::start_server("0.0.0.0", 8080, Handler)puts "Listening..."
}# Benchmarking results:
#
# > ab -c 5 -n 10 "http://127.0.0.1:8080/"
# > Concurrency Level:      5
# > Time taken for tests:   20.6246 seconds
# > Complete requests:      10

这是一个最简单的HTTPserver,我们通过ab(ApacheBench)测试:并发数设置为5(-c 5),请求数设置为10(-n10)。耗时略大于20秒。正如上面所说

,Reactor同步地处理每个请求,相当于并发数设置为1。因此,10个请求,每个请求耗时2秒

require 'rubygems'
require 'eventmachine'
require 'evma_httpserver'class Handler  < EventMachine::Connectioninclude EventMachine::HttpServerdef process_http_requestresp = EventMachine::DelegatedHttpResponse.new( self )# Block which fulfills the requestoperation = proc dosleep 2 # simulate a 2s long running requestresp.status = 200resp.content = "Hello World!"end# Callback block to execute once the request is fulfilledcallback = proc do |res|resp.send_responseend# Let the thread pool (20 Ruby threads) handle requestEM.defer(operation, callback)end
endEventMachine::run {EventMachine::start_server("0.0.0.0", 8081, Handler)puts "Listening..."
}

好了,现在我们使用EM的线程池子来“并发”处理请求。结果还是10个请求,并发数设置为5。总共耗时仅仅4秒有余。就是这样,我们的这个server并

发地处理了10个请求。我们还可以通过这个方法来验证下线程池中的线程数量。

前面讲过的Deferable机制其实是以一种没有线程开销的情况下实现并发处理的方法。这种机制的一个典型场景就是,你在一个server中需要去请求另外一个server。

require 'rubygems'
require 'eventmachine'
require 'evma_httpserver'class Handler  < EventMachine::Connectioninclude EventMachine::HttpServerdef process_http_requestresp = EventMachine::DelegatedHttpResponse.new( self )# query our threaded server (max concurrency: 20). this part is deferablehttp = EM::Protocols::HttpClient.request(:host=>"localhost",:port=>8081,:request=>"/")# once download is complete, send it to clienthttp.callback do |r|resp.status = 200resp.content = r[:content]resp.send_responseendend
endEventMachine::run {EventMachine::start_server("0.0.0.0", 8082, Handler)puts "Listening..."
}# Benchmarking results:
#
# > ab -c 20 -n 40 "http://127.0.0.1:8082/"
# > Concurrency Level:      20
# > Time taken for tests:   4.41321 seconds
# > Complete requests:      40

从测试结果我们可以看到,这个server在4s多的时间里处理了40个请求(因为并发量是20,前面监听8081的服务器sleep是2s)。这就是EM的魅力:

当你的工作推迟或者阻塞在socket上,Reactor循环将继续处理其他的请求。当Deferred的工作完成之后,产生一个成功的信息并由reactor返回响应。

参考资料:

EventMachine Introduction:http://everburning.com/news/eventmachine-introductions/

EM官方tutorials:https://github.com/eventmachine/eventmachine/wiki/Tutorials

以及这篇著名的博客:http://www.igvita.com/2008/05/27/ruby-eventmachine-the-speed-demon/

Research on EventMachine相关推荐

  1. 各大知名企业的Research展示

    大公司為了要拉開彼此的差距, 除了專注於目前的產品外, 都會為了未來做準備, 而這些研究通常都會做一個 Research 的專區來呈現成果, 如下述列表: Google Research Yahoo! ...

  2. MediaPipe:Google Research 开源的跨平台多媒体机器学习模型应用框架

    作者 | MediaPipe 团队 来源 | TensorFlow(ID:tensorflowers) [导读]我爱计算机视觉(aicvml)CV君推荐道:"虽然它是出自Google Res ...

  3. Google Research进军蛋白质结构预测:为Pfam数据库新增680万标注数据

      视学算法报道   编辑:LRS [新智元导读]用深度学习模型来预测蛋白质的结构和功能已经取得了不小的进展,但还缺乏优质的数据.最近Google开源了一个模型ProtENN,提供了680万条蛋白质结 ...

  4. Genome Research | TransBorrow:通过借用不同拼接工具的拼接结果来引导完成转录组拼接

    今天给大家介绍的是山东大学(威海)柳军涛课题组和沙特阿卜杜拉国王科技大学(KAUST)高欣教授课题组(http://sfb.kaust.edu.sa)发表在Genome Research的一篇文章,& ...

  5. Fundamental Research:根系分泌物通过调控土壤微生物影响碳周转的机理

    写在前面 根际,是一个物理,化学,生物作用都非常活跃的一个区域.在这里,植物是中坚力量,根系分泌物作为植物在根际的化学信号,能源物质和土壤微生物密切相关.从植物的角度来讲,根际可以保护植物抵御不同逆境 ...

  6. Bio-protocol与Cell Research达成合作:共同提升科研的可重复性

    2021年11月,Bio-protocol与被誉为"国刊之光" Cell Research期刊正式达成合作,以期促进生命科学研究的透明化和可重复性. 由于科学实验的复杂性,重复实验 ...

  7. Water Research:南科大夏雨+唐圆圆-台风对深圳沿海微塑料和微塑圈的影响

    台风扰动重置沿海微塑料分布并统一微塑料生物膜组成 Typhoon-induced turbulence redistributed microplastics in coastal areas and ...

  8. Water Research:南土所褚海燕组揭示冰川源水体和沉积物细菌群落显著分异

    Water Research: 中科院南京土壤所褚海燕组揭示冰川源水体和沉积物细菌群落显著分异 冰川源高寒河流共存水体和沉积物细菌群落环境驱动机制显著不同 Co-existing water and ...

  9. 香港中文大学Center for Gut Microbiota Research招聘启事

    香港中文大学Center for Gut Microbiota Research招聘启事 一.香港中文大学Center for Gut Microbiota Research中心简介 该中心致力于各种 ...

最新文章

  1. android学汇资料总整理
  2. 【DP】【线段树】基站选址(luogu 2605/金牌导航 数据结构优化DP-2)
  3. java excel处理框架,Java三方—-excel框架之POI的使用一
  4. java中的垃圾回收机
  5. 控制台没有消息循环_【干货】思科设备报错消息汇总大全~
  6. (转)HDOJ 4006 The kth great number(优先队列)
  7. JS表格插件(学习使用)
  8. 校园网客户端没有linux版,Linux版校园网客户端公测中(已添加GUI输入窗口)njit-clent...
  9. pyv8的使用基本方法
  10. 仿直播礼物涂鸦/屏幕礼物涂鸦动画
  11. 修改Android模拟器的IMEI号
  12. 关于网络存储技术和存储的协议
  13. elastix2.5vtigercrm5.2.1来电弹屏和点击呼叫的配置
  14. Windows10 关闭传递优化
  15. SaaS软件真的需要客户运营吗?
  16. 怎样和控制欲很强的家人相处-不受他人影响
  17. 今年的情人节,给心爱的她一个不一样的礼物吧
  18. Android ItemTouchHelper实现RecyclerView交互动画
  19. overleaf模板导入后中文无法识别问题解决方案
  20. TSC打印机打印条形码和二维码,java实现方式

热门文章

  1. UV-VIS与DRS紫外漫反射
  2. 从无知无畏到虚心借鉴——我的信息技术之路之四
  3. MFS2603AMDA0AD MFS2613AMBA0AD低功耗ASIL B安全系统基础芯片
  4. android tbs 内核加载失败_腾讯TBS初始化失败,加载失败问题(踩坑记录 64位手机无法加载x5)...
  5. 相见恨晚的68句话——每句话可以品半辈子,你有同感吗?
  6. matlab直线拟合的程序,MATLAB最小二乘法拟合直线的程序
  7. Matlab画阶跃函数
  8. iframe IE 透明BUG allowtransparency=”true”
  9. 最值得关注的八家创业公司
  10. 机器学习赛事(四):快来一起挖掘幸福感