在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。

对于tomcat io线程我们主要介绍:

  • IO 线程 overall 流程

  • IO 线程主要涉及的类以及作用

IO线程overall流程

对于 tomcat io 线程来说,overall 调用序列图如下 :

  • SocketProcessor.run()-->

    ConnectionHandler.process()-->

    AbstractProcessorLight.process()-->

    Http11Processor.service-->

    CoyoteAdapter.service() 一直到 container 调用标准 servlet API。

  • 涉及的关键类有

    SocketProcessor,ConnectionHandler

    AbstractProcessorLight,

    Http11Processor,CoyoteAdapter

SocketProcessor的核心代码逻辑如下

protected void doRun() {NioChannel socket = socketWrapper.getSocket();SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());Poller poller = NioEndpoint.this.poller;if (poller == null) {socketWrapper.close();return;}try {int handshake = -1;try {if (key != null) {if (socket.isHandshakeComplete()) {handshake = 0;} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {handshake = -1;} else {handshake = socket.handshake(key.isReadable(), key.isWritable());event = SocketEvent.OPEN_READ;}}} catch (IOException x) {handshake = -1;if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);} catch (CancelledKeyException ckx) {handshake = -1;}if (handshake == 0) {SocketState state = SocketState.OPEN;if (event == null) {state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);} else {state = getHandler().process(socketWrapper, event);}if (state == SocketState.CLOSED) {poller.cancelledKey(key, socketWrapper);}} else if (handshake == -1 ) {poller.cancelledKey(key, socketWrapper);} else if (handshake == SelectionKey.OP_READ){socketWrapper.registerReadInterest();} else if (handshake == SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}} catch (CancelledKeyException cx) {poller.cancelledKey(key, socketWrapper);} catch (VirtualMachineError vme) {ExceptionUtils.handleThrowable(vme);} catch (Throwable t) {log.error(sm.getString("endpoint.processing.fail"), t);poller.cancelledKey(key, socketWrapper);} finally {socketWrapper = null;event = null;if (running && !paused && processorCache != null) {processorCache.push(this);}}
}
  • 首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。

  • 如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。

  • 如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。

  • 最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。

  • 另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。

ConnectionHandler的核心代码逻辑如下

private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.process",wrapper.getSocket(), status));}if (wrapper == null) {return SocketState.CLOSED;}S socket = wrapper.getSocket();Processor processor = connections.get(socket);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",processor, socket));}if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {return SocketState.OPEN;}if (processor != null) {getProtocol().removeWaitingProcessor(processor);} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {return SocketState.CLOSED;}ContainerThreadMarker.set();try {if (processor == null) {String negotiatedProtocol = wrapper.getNegotiatedProtocol();if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {UpgradeProtocol upgradeProtocol =getProtocol().getNegotiatedProtocol(negotiatedProtocol);if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());} else if (negotiatedProtocol.equals("http/1.1")) {// Explicitly negotiated the default protocol.// Obtain a processor below.} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",negotiatedProtocol));}return SocketState.CLOSED;}}}if (processor == null) {processor = recycledProcessors.pop();if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.processorPop",processor));}}if (processor == null) {processor = getProtocol().createProcessor();register(processor);}processor.setSslSupport(wrapper.getSslSupport(getProtocol().getClientCertProvider()));connections.put(socket, processor);SocketState state = SocketState.CLOSED;do {state = processor.process(wrapper, status);if (state == SocketState.UPGRADING) {UpgradeToken upgradeToken = processor.getUpgradeToken();ByteBuffer leftOverInput = processor.getLeftoverInput();if (upgradeToken == null) {UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());wrapper.unRead(leftOverInput);connections.put(socket, processor);} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));}return SocketState.CLOSED;}} else {HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();release(processor);processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));}wrapper.unRead(leftOverInput);wrapper.setUpgraded(true);connections.put(socket, processor);if (upgradeToken.getInstanceManager() == null) {httpUpgradeHandler.init((WebConnection) processor);} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.init((WebConnection) processor);} finally {upgradeToken.getContextBind().unbind(false, oldCL);}}if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {state = SocketState.LONG;}}}}} while ( state == SocketState.UPGRADING);if (state == SocketState.LONG) {longPoll(wrapper, processor);if (processor.isAsync()) {getProtocol().addWaitingProcessor(processor);}} else if (state == SocketState.OPEN) {connections.remove(socket);release(processor);wrapper.registerReadInterest();} else if (state == SocketState.SENDFILE) {// Sendfile in progress. If it fails, the socket will be// closed. If it works, the socket either be added to the// poller (or equivalent) to await more data or processed// if there are any pipe-lined requests remaining.} else if (state == SocketState.UPGRADED) {// Don't add sockets back to the poller if this was a// non-blocking write otherwise the poller may trigger// multiple read events which may lead to thread starvation// in the connector. The write() method will add this socket// to the poller if necessary.if (status != SocketEvent.OPEN_WRITE) {longPoll(wrapper, processor);}} else if (state == SocketState.SUSPENDED) {// Don't add sockets back to the poller.// The resumeProcessing() method will add this socket// to the poller.} else {connections.remove(socket);if (processor.isUpgrade()) {UpgradeToken upgradeToken = processor.getUpgradeToken();HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();InstanceManager instanceManager = upgradeToken.getInstanceManager();if (instanceManager == null) {httpUpgradeHandler.destroy();} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.destroy();} finally {try {instanceManager.destroyInstance(httpUpgradeHandler);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);}upgradeToken.getContextBind().unbind(false, oldCL);}}} else {release(processor);}}return state;} catch(java.net.SocketException e) {getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);} catch (java.io.IOException e) {getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);} catch (ProtocolException e) {getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);}catch (OutOfMemoryError oome) {getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);} finally {ContainerThreadMarker.clear();}connections.remove(socket);release(processor);return SocketState.CLOSED;
}
  • 该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。

  • 该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。

  • 该实例核心方法为 process() 方法,代码比较多,这里总结关键点。

  • 该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。

  • 该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。

  • 如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。

  • 如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。

  • 对于 SocketState.LONG 的返回值调用如下:

    protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
    if (!processor.isAsync()) {
    // This is currently only used with HTTP
    // Either:
    //  - this is an upgraded connection
    //  - the request line/headers have not been completely
    //    readsocket.registerReadInterest();}
    }
    //SocketWrapper
    public void registerReadInterest() {getPoller().add(this, SelectionKey.OP_READ);
    }
    
  • longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。

  • 对于 SocketState.OPEN 的返回值调用:

    connections.remove(socket);
    release(processor);
    wrapper.registerReadInterest();
    
  • SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。

  • 在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。

目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。

Tomcat NIO(9)-IO线程-Overall流程和关键类相关推荐

  1. io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类

    在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 to ...

  2. 【JavaSE8 高级编程 IO/NIO】IO入门系列①之抽象基类节点流转换流 2019_8_16

    IO输入输出 IO 实现体系概述 [文档级] ①IO基石 四抽象基类 [IS,OS / R,W]抽象基类简述 子类及其实现接口 字节(FIS,OIS)字符(BR,ISR)读 字节(FOS,OOS,PS ...

  3. Netty开发的基本流程及关键类说明

  4. 详解 Tomcat 的连接数与线程池

    原文出处:编程迷思 前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文 ...

  5. 详解tomcat的连接数与线程池

    前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...

  6. tomcat线程释放时间_详解tomcat的连接数与线程池

    前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...

  7. NETGEAR拒绝连接请求_详解 Tomcat 的连接数与线程池

    点击上方蓝色字体,选择"标星公众号" 优质文章,第一时间送达 关注公众号后台回复pay或mall获取实战项目资料视频 点击此链接:多套SpringCloud/SpringBoot实 ...

  8. tomcat的连接数与线程池

    在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). Connector的主要功能,是接收连接请求,创建Reques ...

  9. java基础巩固-宇宙第一AiYWM:为了维持生计,四大基础之OS_Part_2整起~IO们那些事【包括五种IO模型:(BIO、NIO、IO多路复用、信号驱动、AIO);零拷贝、事件处理及并发等模型】

    PART0.前情提要: 通常用户进程的一个完整的IO分为两个阶段(IO有内存IO.网络IO和磁盘IO三种,通常我们说的IO指的是后两者!):[操作系统和驱动程序运行在内核空间,应用程序运行在用户空间, ...

最新文章

  1. LabVIEW图像特征与机器视觉概念(理论篇—4)
  2. SpringBoot引用lombok让代码更简洁
  3. php ci hooks,CI框架 -- 核心文件 之 Hooks.php
  4. iOS开发使用半透明模糊效果方法整理
  5. 网络适配器无法建立连接_「图」KB4515384更新令人抓狂:网络适配器又无法启用...
  6. 快速傅立叶变换_FFT
  7. 设计模式和java实现
  8. 换了马甲也能认出“你” | 有了这个数据集,AI有望揪出变种勒索软件
  9. html如何修改字体黑体,css如何设置黑体样式?
  10. java区间并集_区间并集求解算法实现
  11. 天津大学matlab软件许可,天津大学《MATLAB基础和应用》课程教学大纲.PDF
  12. 机器博弈:非零和博弈下的叶值表剪枝
  13. c语言 异或 浮点数存储 分支结构
  14. 手把手教你升级Keil MDK的ARM编译器
  15. javapoet使用
  16. 第13章 项目合同管理
  17. 【机器人工具箱学习笔记】第七章 机械臂运动学
  18. 千年人参多少钱,如果真的有的话
  19. phpMyAdmin 尝试连接到 MySQL 服务器,但服务器拒绝连接.您应该检查配置文件中的
  20. apple pay集成_如何将Google Pay集成到您现有的Android应用中

热门文章

  1. win下如何用cmd转换记事本的编码格式
  2. day25.creat创建操作
  3. 想进入IT行业,该从哪里开始学习
  4. 正则验证,验证网站URL正则验证
  5. 什么样的企业是负责任的企业
  6. 【LeetCode】49.字母异位词分组 (三种解法开拓思路,java实现)
  7. java comparable null_Java中Comparable和Comparator你知多少?
  8. JIT引擎触发RowHammer可行性研究
  9. 国家公务员录用考试的一般流程
  10. 自建exchange邮箱怎么发送超大附件?