转载:http://blog.csdn.net/gsky1986/article/details/45491515

一、Listener

Listener线程,当Server处于运行状态时,其负责监听来自客户端的连接,并使用Select模式处理Accept事件。

同时,它开启了一个空闲连接(Idle Connection)处理例程,如果有过期的空闲连接,就关闭。这个例程通过一个计时器来实现。

当select操作调用时,它可能会阻塞,这给了其它线程执行的机会。当有accept事件发生,它就会被唤醒以处理全部的事件,处理事件是进行一个doAccept的调用。

doAccept:

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;      while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = connectionManager.register(channel);key.attach(c);  // so closeCurrentConnection can get the objectreader.addConnection(c);}}123456789101112131415

由于多个连接可能同时发起申请,所以这里采用了while循环处理。

这里最关键的是设置了新建立的socket为非阻塞,这一点是基于性能的考虑,非阻塞的方式尽可能的读取socket接收缓冲区中的数据,这一点保证了将来会调用这个socket进行接收的Reader和进行发送的Responder线程不会因为发送和接收而阻塞,如果整个通讯过程都比较繁忙,那么Reader和Responder线程的就可以尽量不阻塞在I/O上,这样可以减少线程上下文切换的次数,提高cpu的利用率。

最后,获取了一个Reader,将此连接加入Reader的缓冲队列,同时让连接管理器监视并管理这个连接的生存期。

获取Reader的方式如下:

    //最简单的负载均衡Reader getReader() {currentReader = (currentReader + 1) % readers.length;      return readers[currentReader];}12345

二、Reader

当一个新建立的连接被加入Reader的缓冲队列pendingConnections之后,Reader也被唤醒,以处理此连接上的数据接收。

      public void addConnection(Connection conn) throws InterruptedException {pendingConnections.put(conn);readSelector.wakeup();}1234

Server中配置了多个Reader线程,显然是为了提高并发服务连接的能力。

下面是Reader的主要逻辑:

while(true) {...       //取出一个连接,可能阻塞Connection conn = pendingConnections.take();       //向select注册一个读事件conn.channel.register(readSelector, SelectionKey.OP_READ, conn);...       //进行select,可能阻塞readSelector.select();...       //依次读取数据for(keys){doRead(key);}...
}12345678910111213141516

当Server还在运行时,Reader线程尽可能多地处理缓冲队列中的连接,注册每一个连接的READ事件,采用select模式来获取连接上有数据接收的通知。当有数据需要接收时,它尽最大可能读取select返回的连接上的数据,以防止Listener线程因为没有运行时间而发生饥饿(starving)。

如果Listener线程饥饿,造成的结果是并发能力急剧下降,来自客户端的新连接请求超时或无法建立。

注意在从缓冲队列中获取连接时,Reader可能会发生阻塞,因为它采用了LinkedBlockingQueue类中的take方法,这个方法在队列为空时会阻塞,这样Reader线程得以阻塞,以给其它线程执行的时间。

Reader线程的唤醒时机有两个: 
1. Listener建立了新连接,并将此连接加入1个Reader的缓冲队列; 
2. select调用返回。

在Reader的doRead调用中,其主要调用了readAndProcess方法,此方法循环处理数据,接收数据包的头部、上下文头部和真正的数据。 
这个过程中值得一提的是下面的这个channelRead方法:

  private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?channel.read(buffer) : channelIO(channel, null, buffer);    if (count > 0) {rpcMetrics.incrReceivedBytes(count);}    return count;}12345678910

channelRead会判断数据接收数组buffer中的剩余未读数据,如果大于一个临界值NIO_BUFFER_LIMIT,就采取分片的技巧来多次地读,以防止jdk对large buffer采取变为direct buffer的优化。

这一点,也许是考虑到direct buffer在建立时会有一些开销,同时在jdk1.6之前direct buffer不会被GC回收,因为它们分配在JVM的堆外的内存空间中。

到底这样优化的效果如何,没有测试,也就略过。也许是为了减少GC的负担。

在Reader读取到一个完整的RpcRequest包之后,会调用processOneRpc方法,此调用将进入业务逻辑环节。这个方法,会从接受到的数据包中,反序列化出RpcRequest的头部和数据,依此构造一个RpcRequest对象,设置客户端需要的跟踪信息(trace info),然后构造一个Call对象,如下图所示: 

此后,在Handler处理时,就以Call为单位,这是一个包含了与连接相关信息的封装对象。

有了Call对象后,将其加入Server的callQueue队列,以供Handler处理。因为采用了put方法,所以当callQueue满时(Handler忙),Reader会发生阻塞,如下所示:

callQueue.put(call);              // queue the call; maybe blocked here1

三、Handler

Handler就是根据rpc请求中的方法(Call)及参数,来调用相应的业务逻辑接口来处理请求。

一个Server中有多个Handler,对应多个业务接口,本篇不讨论具体业务逻辑。

handler的逻辑基本如下(略去异常和其它次要信息):

public void run() {SERVER.set(Server.this);ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);      while (running) {        try {          final Call call = callQueue.take(); // pop the queue; maybe blocked hereCurCall.set(call);          try {            if (call.connection.user == null) {value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);} else {value = call.connection.user.doAs(...);}} catch (Throwable e) {            //略 ... }CurCall.set(null);          synchronized (call.connection.responseQueue) {responder.doRespond(call);}}123456789101112131415161718192021222324

可见,Handler从callQueue中取出一个Call,然后调用这个Server.call方法,最后调用Responder的doResponde方法将结果发送给客户端。

Server.call方法:

    public Writable call(RPC.RpcKind rpcKind, String protocol,Writable rpcRequest, long receiveTime) throws Exception {      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,receiveTime);}12345

四、Responder

一个Server只有1个Responder线程。

此线程不断进行如下几个重要调用以和Handler协调并发送数据:

//这个wait是同步作用,具体见下面分析waitPending();
...//开始select,或许会阻塞writeSelector.select(PURGE_INTERVAL);
...//如果selectKeys有数据,就依次异步发送数据for(selectorKeys){doAsyncWrite(key);
}
...//当到达丢弃时间,会从selectedKeys构造calls,并依次丢弃for(Call call : calls) {doPurge(call, now);
}123456789101112131415

当Handler调用doRespond方法后,handler处理的结果被加入responseQueue的队尾,而不是立即发送回客户端:

    void doRespond(Call call) throws IOException {      synchronized (call.connection.responseQueue) {call.connection.responseQueue.addLast(call);        if (call.connection.responseQueue.size() == 1) {          //注意这里isHandler = true,表示可能会向select注册写事件以在Responder主循环中通过select处理数据发送proce***esponse(call.connection.responseQueue, true);}}}123456789

上面的synchronized 可以看出,responseQueue是争用资源,相应的:

Handler是生产者,将结果加入队列; 
Responder是消费者,从队列中取出结果并发送。

proce***esponse将启动Responder进行发送,首先从responseQueue中以非阻塞方式取出一个call,然后以非阻塞方式尽力发送call.rpcResponse,如果发送完毕,则返回。

当还有剩余数据未发送,将call插入队列的第一个位置,由于isHandler参数,在来自Handler的调用中传入为true,所以会唤醒writeSelector,并注册一个写事件,其中incPending()方法,是为了在向selector注册写事件时,阻塞Responder线程,后面有分析。

            call.connection.responseQueue.addFirst(call);            if (inHandler) {              // set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();              try {                // Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {                //Its ok. channel might be closed else where.done = true;} finally {decPending();}}12345678910111213141516171819

再回到Responder的主循环,看看如果向select注册了写事件会发生什么:

          //执行这句时,如果Handler调用的responder.doResonde()正在向select注册写事件,这里就会阻塞//目的很显然,是为了下句的select能获取数据并立即返回,这就减少了阻塞发生的次数waitPending();     // If a channel is being registered, wait.//这里用超时阻塞来select,是为了能够在没有数据发送时,定期唤醒,以处理长期未得到处理的CallwriteSelector.select(PURGE_INTERVAL);Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();          while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();            try {              if (key.isValid() && key.isWritable()) {                  //异步发送doAsyncWrite(key);}} catch (IOException e) {LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);}}12345678910111213141516171819

重点内容都做了注释,不再赘述。可以看出,既考虑同步,又考虑性能,这是值得学习的地方。

五、总结

本篇着重分析了hadoop的rpc调用中server部分,可以看出,这是一个精良的设计,考虑的很细。

  1. 关于同步: 
    Listener生产,Reader消费;Reader生产,Handler消费,Handler生产,Responder消费。 
    所以它们之间必须同步,hadoop实现中,既有利用BlockingQueue的put&take操作实现阻塞,以达到同步目的,也对争用资源使用synchronized来实现同步。

  2. 关于缓冲: 
    其中几个缓冲队列也值得关注,因为此Server的并发请求会特别多,而Handler在执行call进行业务逻辑时,会慢,所以必须建立请求和处理之间的缓冲。 
    另外,处理和发送之间也同样会出现速率不匹配的现象,同样需要缓冲。

  3. 关于线程模型: 
    Listener单线程,Reader多线程,Handler多线程,Responder单线程,为什么会这样设计?

    Listener采用select模式处理accept事件,一个客户端在一段时间内一般只建立有限次的连接,而且连接的建立是比较快的,所以单线程足够应付,建立后直接丢给Reader,从而自己很从容地应付新连接。

    Handler多线程,业务逻辑是大头,又可能牵扯计算密集或I/O密集,如果线程少,过长的业务逻辑可能就会让大部分的Handler线程阻塞,这样轻快的业务逻辑也必须排队而得不到及时处理,如果Reader收集的请求队列长时间处于满的状态,整个通讯必然恶化,所以这是典型的需要并发的时刻,这个时刻的上下文切换是必须的,不纠结,并发为重。

    Responder是单线程,显然,Responder会比较轻松,因为虽然请求很多,但经过Reader->Handler的缓冲和Handler的处理时间,上一批能发送完的结果已经发送了。Responder更多的是搜集并处理那些长结果,并通过高效select模式来获取结果并发送。 
    这里,Handler在业务逻辑调用完毕直接调用了responder.doRespond发送,是因为这是个立即返回的调用,这个调用的耗时是很少的,所以不必让Handler因为发送而阻塞,进一步充分发挥了Handler多线程的能力,降低了线程切换的机会,发挥了其多线程并发的优势,同时又为responder减负,以增强Responder单线程作战的信心。

  4. 关于锁 
    因为同步需求,所以加锁是必不可少的,synchronized方式加锁,可以显著减少这种复杂的对象之间同步的复杂性,减少错误的发生。

【可以转载,注明出处】 
【版权所有@foreach_break 博客地址http://blog.csdn.net/gsky1986】

转载于:https://blog.51cto.com/mengphilip/1641945

高性能server分析 - Hadoop的RpcServer相关推荐

  1. 专訪阿里陶辉:大规模分布式系统、高性能server设计经验分享

    http://www.csdn.net/article/2014-06-27/2820432 摘要:先后就职于在国内知名的互联网公司,眼下在阿里云弹性计算部门做架构设计与核心模块代码的编写,主要负责云 ...

  2. 使用SQL Server分析服务定位目标用户

    如何定位目标用户,在任何一个业务单元中都是一个很重要的话题,尤其在预算有限的情况下,如何获得活动的最大收益,目标用户的定位都是很重要的手段. 本文将介绍如何通过SQL Server分析服务(SSAS) ...

  3. 搜索门户 Web Server分析

    搜索门户 Web Server分析 google 招聘了不少原来工作于apache php开发团队的人才,把web服务也搞得有点花稍,版本众多,且能在一个域名下联合使用.他们认为对php和mysql的 ...

  4. Hadoop-HBASE案例分析-Hadoop学习笔记二

    之前有幸在MOOC学院抽中小象学院hadoop体验课.  这是小象学院hadoop2.X概述第八章的笔记  主要介绍HBase,一个分布式数据库的应用案例. 案例概况: 1)时间序列数据库(OpenT ...

  5. 【备忘】2017零基础自学云计算分析hadoop/storm/spark大数据开发视频教程

    day01 软件安装.Linux相关.shell     day02 自动化部署高级文本命令     day03 集群部署zookeeper     day04 并发动态大数据机制.Java反射.动态 ...

  6. 逐行分析Hadoop的HelloWorld

    2019独角兽企业重金招聘Python工程师标准>>> 学写代码的时候,我们总是先从helloworld开始写起,那么学习Hadoop,我们也必不可少的从helloworld开始,那 ...

  7. ubuntu server安装hadoop和spark,并设置集群

    安装server请看本人的上一篇博客 Ubuntu Server 20.04.2 安装 先前准备工作 创建 hadoop用户 创建用户之后,输入一下指令重启 shutdown -r now 登录 ha ...

  8. 源码解析Spring Boot2默认数据库连接池HikariCP(高性能原因分析)

    现在市面上的数据库连接池非常多,其中HikariCP被Sping Boot2选中为默认的数据库连接池,且体积仅有152kb 为何选择HikariCP? 高性能,可以PK掉其它所有连接池,这个原因就足够 ...

  9. Linux高性能server规划——处理池和线程池

    进程池和线程池 池的概念 由于server的硬件资源"充裕".那么提高server性能的一个非常直接的方法就是以空间换时间.即"浪费"server的硬件资源.以 ...

最新文章

  1. Android Log等级的介绍
  2. Java 判断密码是否是大小写字母、数字、特殊字符中的至少三种
  3. nodejs mac启动相关命令
  4. 另一个串口verilog 代码
  5. 熊猫read_csv()–将CSV文件读取到DataFrame
  6. 交换机命令---华为路由器配置
  7. HTTP协议中POST方法和GET方法有那些区别?
  8. C#中string[]数组和liststring泛型的相互转换 【转】
  9. java ioutils 写入文件_IOUtils和FileUtils的学习笔记
  10. vue + element插件Popover弹出框
  11. 数据结构 查找 的思维导图
  12. UnboundLocalError: local variable ‘count‘ referenced before assignment
  13. 夜间模式 css,网站夜间模式的实现
  14. 安卓3dtouch测试软件,不必羡慕iPhone!安卓手机可以这样实现“3D Touch”
  15. MATLAB实现最小二乘法
  16. 项目添加到服务器报错,基于github+travis自动部署vue项目到远端服务器
  17. 当AI降临教育——阳光还是风暴?
  18. 解决APP打开后闪退的问题
  19. wxPython的基础教程
  20. java模拟post发送文件

热门文章

  1. 昨天还在 for 循环里写加号拼接字符串的那个同事,今天已经不在了
  2. 高频面试题:Spring 如何解决循环依赖?
  3. 刚刚,arXiv论文数破200万!没有arXiv,就没有21世纪的科研突破
  4. 机器学习中需要了解的 5 种采样方法
  5. 深度学习必懂的13种概率分布
  6. 漫画:什么是八皇后问题?
  7. 送书!送书!送书!重要的事情说三遍
  8. python多进程的使用(导包、创建子进程、启动子进程)
  9. python文件操作(open()、write()、read()、readline()、readlines()、seek()、os)
  10. Python RE库的贪婪匹配和最小匹配