Tomcat NIO(9)-IO线程-Overall流程和关键类
在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。
对于tomcat io线程我们主要介绍:
IO 线程 overall 流程
IO 线程主要涉及的类以及作用
IO线程overall流程
对于 tomcat io 线程来说,overall 调用序列图如下 :
SocketProcessor.run()-->
ConnectionHandler.process()-->
AbstractProcessorLight.process()-->
Http11Processor.service-->
CoyoteAdapter.service() 一直到 container 调用标准 servlet API。
涉及的关键类有
SocketProcessor,ConnectionHandler
AbstractProcessorLight,
Http11Processor,CoyoteAdapter
SocketProcessor的核心代码逻辑如下
protected void doRun() {NioChannel socket = socketWrapper.getSocket();SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());Poller poller = NioEndpoint.this.poller;if (poller == null) {socketWrapper.close();return;}try {int handshake = -1;try {if (key != null) {if (socket.isHandshakeComplete()) {handshake = 0;} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {handshake = -1;} else {handshake = socket.handshake(key.isReadable(), key.isWritable());event = SocketEvent.OPEN_READ;}}} catch (IOException x) {handshake = -1;if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);} catch (CancelledKeyException ckx) {handshake = -1;}if (handshake == 0) {SocketState state = SocketState.OPEN;if (event == null) {state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);} else {state = getHandler().process(socketWrapper, event);}if (state == SocketState.CLOSED) {poller.cancelledKey(key, socketWrapper);}} else if (handshake == -1 ) {poller.cancelledKey(key, socketWrapper);} else if (handshake == SelectionKey.OP_READ){socketWrapper.registerReadInterest();} else if (handshake == SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}} catch (CancelledKeyException cx) {poller.cancelledKey(key, socketWrapper);} catch (VirtualMachineError vme) {ExceptionUtils.handleThrowable(vme);} catch (Throwable t) {log.error(sm.getString("endpoint.processing.fail"), t);poller.cancelledKey(key, socketWrapper);} finally {socketWrapper = null;event = null;if (running && !paused && processorCache != null) {processorCache.push(this);}}
}
首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。
如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。
如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。
最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。
另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。
ConnectionHandler的核心代码逻辑如下
private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.process",wrapper.getSocket(), status));}if (wrapper == null) {return SocketState.CLOSED;}S socket = wrapper.getSocket();Processor processor = connections.get(socket);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",processor, socket));}if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {return SocketState.OPEN;}if (processor != null) {getProtocol().removeWaitingProcessor(processor);} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {return SocketState.CLOSED;}ContainerThreadMarker.set();try {if (processor == null) {String negotiatedProtocol = wrapper.getNegotiatedProtocol();if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {UpgradeProtocol upgradeProtocol =getProtocol().getNegotiatedProtocol(negotiatedProtocol);if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());} else if (negotiatedProtocol.equals("http/1.1")) {// Explicitly negotiated the default protocol.// Obtain a processor below.} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",negotiatedProtocol));}return SocketState.CLOSED;}}}if (processor == null) {processor = recycledProcessors.pop();if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.processorPop",processor));}}if (processor == null) {processor = getProtocol().createProcessor();register(processor);}processor.setSslSupport(wrapper.getSslSupport(getProtocol().getClientCertProvider()));connections.put(socket, processor);SocketState state = SocketState.CLOSED;do {state = processor.process(wrapper, status);if (state == SocketState.UPGRADING) {UpgradeToken upgradeToken = processor.getUpgradeToken();ByteBuffer leftOverInput = processor.getLeftoverInput();if (upgradeToken == null) {UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());wrapper.unRead(leftOverInput);connections.put(socket, processor);} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));}return SocketState.CLOSED;}} else {HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();release(processor);processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));}wrapper.unRead(leftOverInput);wrapper.setUpgraded(true);connections.put(socket, processor);if (upgradeToken.getInstanceManager() == null) {httpUpgradeHandler.init((WebConnection) processor);} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.init((WebConnection) processor);} finally {upgradeToken.getContextBind().unbind(false, oldCL);}}if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {state = SocketState.LONG;}}}}} while ( state == SocketState.UPGRADING);if (state == SocketState.LONG) {longPoll(wrapper, processor);if (processor.isAsync()) {getProtocol().addWaitingProcessor(processor);}} else if (state == SocketState.OPEN) {connections.remove(socket);release(processor);wrapper.registerReadInterest();} else if (state == SocketState.SENDFILE) {// Sendfile in progress. If it fails, the socket will be// closed. If it works, the socket either be added to the// poller (or equivalent) to await more data or processed// if there are any pipe-lined requests remaining.} else if (state == SocketState.UPGRADED) {// Don't add sockets back to the poller if this was a// non-blocking write otherwise the poller may trigger// multiple read events which may lead to thread starvation// in the connector. The write() method will add this socket// to the poller if necessary.if (status != SocketEvent.OPEN_WRITE) {longPoll(wrapper, processor);}} else if (state == SocketState.SUSPENDED) {// Don't add sockets back to the poller.// The resumeProcessing() method will add this socket// to the poller.} else {connections.remove(socket);if (processor.isUpgrade()) {UpgradeToken upgradeToken = processor.getUpgradeToken();HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();InstanceManager instanceManager = upgradeToken.getInstanceManager();if (instanceManager == null) {httpUpgradeHandler.destroy();} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.destroy();} finally {try {instanceManager.destroyInstance(httpUpgradeHandler);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);}upgradeToken.getContextBind().unbind(false, oldCL);}}} else {release(processor);}}return state;} catch(java.net.SocketException e) {getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);} catch (java.io.IOException e) {getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);} catch (ProtocolException e) {getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);}catch (OutOfMemoryError oome) {getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);} finally {ContainerThreadMarker.clear();}connections.remove(socket);release(processor);return SocketState.CLOSED;
}
该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。
该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。
该实例核心方法为 process() 方法,代码比较多,这里总结关键点。
该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。
该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。
如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。
如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。
对于 SocketState.LONG 的返回值调用如下:
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) { if (!processor.isAsync()) { // This is currently only used with HTTP // Either: // - this is an upgraded connection // - the request line/headers have not been completely // readsocket.registerReadInterest();} } //SocketWrapper public void registerReadInterest() {getPoller().add(this, SelectionKey.OP_READ); }
longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。
对于 SocketState.OPEN 的返回值调用:
connections.remove(socket); release(processor); wrapper.registerReadInterest();
SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。
在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。
目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。
Tomcat NIO(9)-IO线程-Overall流程和关键类相关推荐
- io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类
在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 to ...
- 【JavaSE8 高级编程 IO/NIO】IO入门系列①之抽象基类节点流转换流 2019_8_16
IO输入输出 IO 实现体系概述 [文档级] ①IO基石 四抽象基类 [IS,OS / R,W]抽象基类简述 子类及其实现接口 字节(FIS,OIS)字符(BR,ISR)读 字节(FOS,OOS,PS ...
- Netty开发的基本流程及关键类说明
- 详解 Tomcat 的连接数与线程池
原文出处:编程迷思 前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文 ...
- 详解tomcat的连接数与线程池
前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...
- tomcat线程释放时间_详解tomcat的连接数与线程池
前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...
- NETGEAR拒绝连接请求_详解 Tomcat 的连接数与线程池
点击上方蓝色字体,选择"标星公众号" 优质文章,第一时间送达 关注公众号后台回复pay或mall获取实战项目资料视频 点击此链接:多套SpringCloud/SpringBoot实 ...
- tomcat的连接数与线程池
在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). Connector的主要功能,是接收连接请求,创建Reques ...
- java基础巩固-宇宙第一AiYWM:为了维持生计,四大基础之OS_Part_2整起~IO们那些事【包括五种IO模型:(BIO、NIO、IO多路复用、信号驱动、AIO);零拷贝、事件处理及并发等模型】
PART0.前情提要: 通常用户进程的一个完整的IO分为两个阶段(IO有内存IO.网络IO和磁盘IO三种,通常我们说的IO指的是后两者!):[操作系统和驱动程序运行在内核空间,应用程序运行在用户空间, ...
最新文章
- LabVIEW图像特征与机器视觉概念(理论篇—4)
- SpringBoot引用lombok让代码更简洁
- php ci hooks,CI框架 -- 核心文件 之 Hooks.php
- iOS开发使用半透明模糊效果方法整理
- 网络适配器无法建立连接_「图」KB4515384更新令人抓狂:网络适配器又无法启用...
- 快速傅立叶变换_FFT
- 设计模式和java实现
- 换了马甲也能认出“你” | 有了这个数据集,AI有望揪出变种勒索软件
- html如何修改字体黑体,css如何设置黑体样式?
- java区间并集_区间并集求解算法实现
- 天津大学matlab软件许可,天津大学《MATLAB基础和应用》课程教学大纲.PDF
- 机器博弈:非零和博弈下的叶值表剪枝
- c语言 异或 浮点数存储 分支结构
- 手把手教你升级Keil MDK的ARM编译器
- javapoet使用
- 第13章 项目合同管理
- 【机器人工具箱学习笔记】第七章 机械臂运动学
- 千年人参多少钱,如果真的有的话
- phpMyAdmin 尝试连接到 MySQL 服务器,但服务器拒绝连接.您应该检查配置文件中的
- apple pay集成_如何将Google Pay集成到您现有的Android应用中