在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。

对于tomcat io线程我们主要介绍:

  • IO 线程 overall 流程

  • IO 线程主要涉及的类以及作用

IO线程overall流程

对于 tomcat io 线程来说,overall 调用序列图如下 :

  • SocketProcessor.run()-->

    ConnectionHandler.process()-->

    AbstractProcessorLight.process()-->

    Http11Processor.service-->

    CoyoteAdapter.service() 一直到 container 调用标准 servlet API。

  • 涉及的关键类有SocketProcessor,ConnectionHandlerAbstractProcessorLight,Http11Processor,CoyoteAdapter

SocketProcessor的核心代码逻辑如下

protected void doRun() {    NioChannel socket = socketWrapper.getSocket();    SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());    Poller poller = NioEndpoint.this.poller;    if (poller == null) {        socketWrapper.close();        return;    }    try {        int handshake = -1;        try {            if (key != null) {                if (socket.isHandshakeComplete()) {                    handshake = 0;                } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {                    handshake = -1;                } else {                    handshake = socket.handshake(key.isReadable(), key.isWritable());                    event = SocketEvent.OPEN_READ;                }            }        } catch (IOException x) {            handshake = -1;            if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);        } catch (CancelledKeyException ckx) {            handshake = -1;        }        if (handshake == 0) {            SocketState state = SocketState.OPEN;            if (event == null) {                state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);            } else {                state = getHandler().process(socketWrapper, event);            }            if (state == SocketState.CLOSED) {                poller.cancelledKey(key, socketWrapper);            }        } else if (handshake == -1 ) {            poller.cancelledKey(key, socketWrapper);        } else if (handshake == SelectionKey.OP_READ){            socketWrapper.registerReadInterest();        } else if (handshake == SelectionKey.OP_WRITE){            socketWrapper.registerWriteInterest();        }    } catch (CancelledKeyException cx) {        poller.cancelledKey(key, socketWrapper);    } catch (VirtualMachineError vme) {        ExceptionUtils.handleThrowable(vme);    } catch (Throwable t) {        log.error(sm.getString("endpoint.processing.fail"), t);        poller.cancelledKey(key, socketWrapper);    } finally {        socketWrapper = null;        event = null;        if (running && !paused && processorCache != null) {            processorCache.push(this);        }    }}
  • 首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。

  • 如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。

  • 如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。

  • 最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。

  • 另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。

ConnectionHandler的核心代码逻辑如下

private final Map connections = new ConcurrentHashMap<>();private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase wrapper, SocketEvent status) {    if (getLog().isDebugEnabled()) {        getLog().debug(sm.getString("abstractConnectionHandler.process",                wrapper.getSocket(), status));    }    if (wrapper == null) {        return SocketState.CLOSED;    }    S socket = wrapper.getSocket();    Processor processor = connections.get(socket);    if (getLog().isDebugEnabled()) {        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",                processor, socket));    }    if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {        return SocketState.OPEN;    }    if (processor != null) {        getProtocol().removeWaitingProcessor(processor);    } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {        return SocketState.CLOSED;    }    ContainerThreadMarker.set();    try {        if (processor == null) {            String negotiatedProtocol = wrapper.getNegotiatedProtocol();            if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {                UpgradeProtocol upgradeProtocol =                        getProtocol().getNegotiatedProtocol(negotiatedProtocol);                if (upgradeProtocol != null) {                    processor = upgradeProtocol.getProcessor(                            wrapper, getProtocol().getAdapter());                } else if (negotiatedProtocol.equals("http/1.1")) {                    // Explicitly negotiated the default protocol.                    // Obtain a processor below.                } else {                    if (getLog().isDebugEnabled()) {                        getLog().debug(sm.getString(                            "abstractConnectionHandler.negotiatedProcessor.fail",                            negotiatedProtocol));                    }                    return SocketState.CLOSED;                }            }        }        if (processor == null) {            processor = recycledProcessors.pop();            if (getLog().isDebugEnabled()) {                getLog().debug(sm.getString("abstractConnectionHandler.processorPop",                        processor));            }        }        if (processor == null) {            processor = getProtocol().createProcessor();            register(processor);        }        processor.setSslSupport(                wrapper.getSslSupport(getProtocol().getClientCertProvider()));        connections.put(socket, processor);        SocketState state = SocketState.CLOSED;        do {            state = processor.process(wrapper, status);            if (state == SocketState.UPGRADING) {                UpgradeToken upgradeToken = processor.getUpgradeToken();                ByteBuffer leftOverInput = processor.getLeftoverInput();                if (upgradeToken == null) {                    UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");                    if (upgradeProtocol != null) {                        processor = upgradeProtocol.getProcessor(                                wrapper, getProtocol().getAdapter());                        wrapper.unRead(leftOverInput);                        connections.put(socket, processor);                    } else {                        if (getLog().isDebugEnabled()) {                            getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));                        }                        return SocketState.CLOSED;                    }                } else {                    HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();                    release(processor);                    processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);                    if (getLog().isDebugEnabled()) {                        getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));                    }                    wrapper.unRead(leftOverInput);                    wrapper.setUpgraded(true);                    connections.put(socket, processor);                    if (upgradeToken.getInstanceManager() == null) {                        httpUpgradeHandler.init((WebConnection) processor);                    } else {                        ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);                        try {                            httpUpgradeHandler.init((WebConnection) processor);                        } finally {                            upgradeToken.getContextBind().unbind(false, oldCL);                        }                    }                    if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {                        if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {                            state = SocketState.LONG;                        }                    }                }            }        } while ( state == SocketState.UPGRADING);        if (state == SocketState.LONG) {            longPoll(wrapper, processor);            if (processor.isAsync()) {                getProtocol().addWaitingProcessor(processor);            }        } else if (state == SocketState.OPEN) {            connections.remove(socket);            release(processor);            wrapper.registerReadInterest();        } else if (state == SocketState.SENDFILE) {            // Sendfile in progress. If it fails, the socket will be            // closed. If it works, the socket either be added to the            // poller (or equivalent) to await more data or processed            // if there are any pipe-lined requests remaining.        } else if (state == SocketState.UPGRADED) {            // Don't add sockets back to the poller if this was a            // non-blocking write otherwise the poller may trigger            // multiple read events which may lead to thread starvation            // in the connector. The write() method will add this socket            // to the poller if necessary.            if (status != SocketEvent.OPEN_WRITE) {                longPoll(wrapper, processor);            }        } else if (state == SocketState.SUSPENDED) {            // Don't add sockets back to the poller.            // The resumeProcessing() method will add this socket            // to the poller.        } else {            connections.remove(socket);            if (processor.isUpgrade()) {                UpgradeToken upgradeToken = processor.getUpgradeToken();                HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();                InstanceManager instanceManager = upgradeToken.getInstanceManager();                if (instanceManager == null) {                    httpUpgradeHandler.destroy();                } else {                    ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);                    try {                        httpUpgradeHandler.destroy();                    } finally {                        try {                            instanceManager.destroyInstance(httpUpgradeHandler);                        } catch (Throwable e) {                            ExceptionUtils.handleThrowable(e);                            getLog().error(sm.getString("abstractConnectionHandler.error"), e);                        }                        upgradeToken.getContextBind().unbind(false, oldCL);                    }                }            } else {                release(processor);            }        }        return state;    } catch(java.net.SocketException e) {        getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);    } catch (java.io.IOException e) {        getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);    } catch (ProtocolException e) {        getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);    }    catch (OutOfMemoryError oome) {        getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);    } catch (Throwable e) {        ExceptionUtils.handleThrowable(e);        getLog().error(sm.getString("abstractConnectionHandler.error"), e);    } finally {        ContainerThreadMarker.clear();    }    connections.remove(socket);    release(processor);    return SocketState.CLOSED;}
  • 该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。
  • 该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。
  • 该实例核心方法为 process() 方法,代码比较多,这里总结关键点。
  • 该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。
  • 该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。
  • 如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。
  • 如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。
  • 对于 SocketState.LONG 的返回值调用如下:

    protected void longPoll(SocketWrapperBase> socket, Processor processor) {if (!processor.isAsync()) {// This is currently only used with HTTP// Either://  - this is an upgraded connection//  - the request line/headers have not been completely//    read          socket.registerReadInterest();     }}//SocketWrapperpublic void registerReadInterest() {      getPoller().add(this, SelectionKey.OP_READ);}
  • longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。
  • 对于 SocketState.OPEN 的返回值调用:

    connections.remove(socket);release(processor);wrapper.registerReadInterest();
  • SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。
  • 在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。

目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。

io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类相关推荐

  1. Tomcat NIO(9)-IO线程-Overall流程和关键类

    在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 to ...

  2. 详解磁盘IO、网络IO、零拷贝IO、BIO、NIO、AIO、IO多路复用(select、poll、epoll)

    文章很长,但是很用心! 文章目录 1. 什么是I/O 2. 磁盘IO 3. 网络IO 4. IO中断与DMA 5. 零拷贝IO 6. BIO 7. NIO 8. IO多路复用 8.1 select 8 ...

  3. 死磕Java之NIO与IO

    死磕Java之NIO与IO 当学习Java NIO与IO时,你是否会有这样的想法:什么时候使用NIO,什么使用IO呢?本篇文章将会分析两者的不同,它们的用例,以及和影响代码的设计. 01 NIO与IO ...

  4. io读取一个文件再写入socket技术_JAVA中IO与NIO面试题

    BIO.NIO有什么区别? BIO:Block IO 同步阻塞式 IO,就是我们平常使用的传统 IO,它的特点是模式简单使用方便,并发处理能力低. NIO:New IO 同步非阻塞 IO,是传统 IO ...

  5. 【NIO】IO多路复用

    上节,我们讲解了阻塞和非阻塞,我们今天讲第三种IO模型,这就是IO多路复用. 引入多路复用的原因 实际上,在服务端与客户端一对一通信的时候,同步阻塞式地读写并不会有太大的问题,最典型的就是两个对等机器 ...

  6. java基础巩固-宇宙第一AiYWM:为了维持生计,四大基础之OS_Part_2整起~IO们那些事【包括五种IO模型:(BIO、NIO、IO多路复用、信号驱动、AIO);零拷贝、事件处理及并发等模型】

    PART0.前情提要: 通常用户进程的一个完整的IO分为两个阶段(IO有内存IO.网络IO和磁盘IO三种,通常我们说的IO指的是后两者!):[操作系统和驱动程序运行在内核空间,应用程序运行在用户空间, ...

  7. IO、文件、NIO【三】

    http://blog.csdn.net/silentbalanceyh/article/details/5252285 (这一个章节将讲到Java里面比较重要的一个章节,这里说一句抱歉,因为最近换工 ...

  8. IO、文件、NIO【草案三】

    (这一个章节将讲到Java里面比较重要的一个章节,这里说一句抱歉,因为最近换工作的原因,一直没有时间继续书写教程,不过接下来我会一直坚持写下去的哈,希望大家能够支持.这个章节主要涉及到常用的文件读写, ...

  9. 17.IO、文件、NIO【草案三】

    (这一个章节将讲到Java里面比较重要的一个章节,这里说一句抱歉,因为最近换工作的原因,一直没有时间继续书写教程,不过接下来我会一直坚持写下去的哈,希望大家能够支持.这个章节主要涉及到常用的文件读写, ...

最新文章

  1. kattle的java安装,Kettle自定义JDK版本(附Linux下安装部署步骤)
  2. python定义一个字典、存储雇员号和姓名_【一点资讯】python后端开发工程师考证试题...
  3. step-by-step多文件WEB批量上传(swfupload)的完美解决方案
  4. php数据库连接程序,常用的数据库连接程序
  5. SQLSTATE[HY000] [2013] Lost connection to MySQL...
  6. 关于eclipse反编译插件不起作用问题的解决
  7. 《推荐系统实践》算法纯享(附代码链接)(一)—— 评价指标篇
  8. 11月全国程序员平均工资出炉,网友:我丢了同行的脸
  9. ccxprocess用不用自启_你电脑上开机自启的软件都有哪些?
  10. 计算机主板显卡型号怎么看,如何查看电脑显卡?4个方法教会你
  11. 计算机键盘怎么打字,用电脑键盘打字的小技巧 怎么用电脑键盘打字
  12. 【论文阅读】TimbreTron : A WaveNet (Cycle GAN(CQT(audio ))) pipeline for musical timbre transfer
  13. 南京大学交叉培养计算机与金融招生人数,教务处组织召开计算机与金融工程实验班师生见面会...
  14. 查看kafka的版本
  15. 应用程序如何从X86快速换到ARM架构 | 瑞迅科技工控一体机解读
  16. 视觉SLAM十四讲:运动方程
  17. 在做竞品分析时遇到的一些坑
  18. 图的邻接矩阵和邻接表
  19. 城南花未开,老程已不在;
  20. Android-圆形头像

热门文章

  1. Python 数据结构与算法——二叉搜索树的实现
  2. 数组指针 vs 指针数组
  3. 隐藏网络计算机,如何在网络中隐藏自己的计算机名称
  4. python 干什么工作具有明显优势-科多大数据告诉你Python为什么这么牛?学习python有什么优势?...
  5. python与excel-再见Excel!我开源了一款与Python深度集成的神器级IDE
  6. 一张图学会python高清图-一张图让你学会Python
  7. 如何自学python知乎-初次接触python,怎么样系统的自学呢?
  8. android 4实例分析,OpenGL Shader实例分析(4)闪光效果
  9. win10一直正在检查更新_win10一直存在的烦人问题,终于被彻底解决!你会选择更新么?...
  10. 面试题 02.01. 移除重复节点(链表删除操作模板)