io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类
对于tomcat io线程我们主要介绍:
IO 线程 overall 流程
IO 线程主要涉及的类以及作用
IO线程overall流程
对于 tomcat io 线程来说,overall 调用序列图如下 :
SocketProcessor.run()-->
ConnectionHandler.process()-->
AbstractProcessorLight.process()-->
Http11Processor.service-->
CoyoteAdapter.service() 一直到 container 调用标准 servlet API。
- 涉及的关键类有SocketProcessor,ConnectionHandlerAbstractProcessorLight,Http11Processor,CoyoteAdapter
SocketProcessor的核心代码逻辑如下
protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector()); Poller poller = NioEndpoint.this.poller; if (poller == null) { socketWrapper.close(); return; } try { int handshake = -1; try { if (key != null) { if (socket.isHandshakeComplete()) { handshake = 0; } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) { handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { poller.cancelledKey(key, socketWrapper); } } else if (handshake == -1 ) { poller.cancelledKey(key, socketWrapper); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { poller.cancelledKey(key, socketWrapper); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error(sm.getString("endpoint.processing.fail"), t); poller.cancelledKey(key, socketWrapper); } finally { socketWrapper = null; event = null; if (running && !paused && processorCache != null) { processorCache.push(this); } }}
首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。
如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。
如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。
最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。
另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。
ConnectionHandler的核心代码逻辑如下
private final Map connections = new ConcurrentHashMap<>();private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { return SocketState.CLOSED; } S socket = wrapper.getSocket(); Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) { return SocketState.OPEN; } if (processor != null) { getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { UpgradeToken upgradeToken = processor.getUpgradeToken(); ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); release(processor); processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper)); } wrapper.unRead(leftOverInput); wrapper.setUpgraded(true); connections.put(socket, processor); if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) { if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) { state = SocketState.LONG; } } } } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } return state; } catch(java.net.SocketException e) { getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e); } catch (OutOfMemoryError oome) { getLog().error(sm.getString("abstractConnectionHandler.oome"), oome); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } connections.remove(socket); release(processor); return SocketState.CLOSED;}
- 该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。
- 该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。
- 该实例核心方法为 process() 方法,代码比较多,这里总结关键点。
- 该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。
- 该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。
- 如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。
- 如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。
对于 SocketState.LONG 的返回值调用如下:
protected void longPoll(SocketWrapperBase> socket, Processor processor) {if (!processor.isAsync()) {// This is currently only used with HTTP// Either:// - this is an upgraded connection// - the request line/headers have not been completely// read socket.registerReadInterest(); }}//SocketWrapperpublic void registerReadInterest() { getPoller().add(this, SelectionKey.OP_READ);}
- longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。
对于 SocketState.OPEN 的返回值调用:
connections.remove(socket);release(processor);wrapper.registerReadInterest();
- SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。
在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。
目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。
io 错误: socket closed_Tomcat NIO(9)IO线程Overall流程和关键类相关推荐
- Tomcat NIO(9)-IO线程-Overall流程和关键类
在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 to ...
- 详解磁盘IO、网络IO、零拷贝IO、BIO、NIO、AIO、IO多路复用(select、poll、epoll)
文章很长,但是很用心! 文章目录 1. 什么是I/O 2. 磁盘IO 3. 网络IO 4. IO中断与DMA 5. 零拷贝IO 6. BIO 7. NIO 8. IO多路复用 8.1 select 8 ...
- 死磕Java之NIO与IO
死磕Java之NIO与IO 当学习Java NIO与IO时,你是否会有这样的想法:什么时候使用NIO,什么使用IO呢?本篇文章将会分析两者的不同,它们的用例,以及和影响代码的设计. 01 NIO与IO ...
- io读取一个文件再写入socket技术_JAVA中IO与NIO面试题
BIO.NIO有什么区别? BIO:Block IO 同步阻塞式 IO,就是我们平常使用的传统 IO,它的特点是模式简单使用方便,并发处理能力低. NIO:New IO 同步非阻塞 IO,是传统 IO ...
- 【NIO】IO多路复用
上节,我们讲解了阻塞和非阻塞,我们今天讲第三种IO模型,这就是IO多路复用. 引入多路复用的原因 实际上,在服务端与客户端一对一通信的时候,同步阻塞式地读写并不会有太大的问题,最典型的就是两个对等机器 ...
- java基础巩固-宇宙第一AiYWM:为了维持生计,四大基础之OS_Part_2整起~IO们那些事【包括五种IO模型:(BIO、NIO、IO多路复用、信号驱动、AIO);零拷贝、事件处理及并发等模型】
PART0.前情提要: 通常用户进程的一个完整的IO分为两个阶段(IO有内存IO.网络IO和磁盘IO三种,通常我们说的IO指的是后两者!):[操作系统和驱动程序运行在内核空间,应用程序运行在用户空间, ...
- IO、文件、NIO【三】
http://blog.csdn.net/silentbalanceyh/article/details/5252285 (这一个章节将讲到Java里面比较重要的一个章节,这里说一句抱歉,因为最近换工 ...
- IO、文件、NIO【草案三】
(这一个章节将讲到Java里面比较重要的一个章节,这里说一句抱歉,因为最近换工作的原因,一直没有时间继续书写教程,不过接下来我会一直坚持写下去的哈,希望大家能够支持.这个章节主要涉及到常用的文件读写, ...
- 17.IO、文件、NIO【草案三】
(这一个章节将讲到Java里面比较重要的一个章节,这里说一句抱歉,因为最近换工作的原因,一直没有时间继续书写教程,不过接下来我会一直坚持写下去的哈,希望大家能够支持.这个章节主要涉及到常用的文件读写, ...
最新文章
- kattle的java安装,Kettle自定义JDK版本(附Linux下安装部署步骤)
- python定义一个字典、存储雇员号和姓名_【一点资讯】python后端开发工程师考证试题...
- step-by-step多文件WEB批量上传(swfupload)的完美解决方案
- php数据库连接程序,常用的数据库连接程序
- SQLSTATE[HY000] [2013] Lost connection to MySQL...
- 关于eclipse反编译插件不起作用问题的解决
- 《推荐系统实践》算法纯享(附代码链接)(一)—— 评价指标篇
- 11月全国程序员平均工资出炉,网友:我丢了同行的脸
- ccxprocess用不用自启_你电脑上开机自启的软件都有哪些?
- 计算机主板显卡型号怎么看,如何查看电脑显卡?4个方法教会你
- 计算机键盘怎么打字,用电脑键盘打字的小技巧 怎么用电脑键盘打字
- 【论文阅读】TimbreTron : A WaveNet (Cycle GAN(CQT(audio ))) pipeline for musical timbre transfer
- 南京大学交叉培养计算机与金融招生人数,教务处组织召开计算机与金融工程实验班师生见面会...
- 查看kafka的版本
- 应用程序如何从X86快速换到ARM架构 | 瑞迅科技工控一体机解读
- 视觉SLAM十四讲:运动方程
- 在做竞品分析时遇到的一些坑
- 图的邻接矩阵和邻接表
- 城南花未开,老程已不在;
- Android-圆形头像
热门文章
- Python 数据结构与算法——二叉搜索树的实现
- 数组指针 vs 指针数组
- 隐藏网络计算机,如何在网络中隐藏自己的计算机名称
- python 干什么工作具有明显优势-科多大数据告诉你Python为什么这么牛?学习python有什么优势?...
- python与excel-再见Excel!我开源了一款与Python深度集成的神器级IDE
- 一张图学会python高清图-一张图让你学会Python
- 如何自学python知乎-初次接触python,怎么样系统的自学呢?
- android 4实例分析,OpenGL Shader实例分析(4)闪光效果
- win10一直正在检查更新_win10一直存在的烦人问题,终于被彻底解决!你会选择更新么?...
- 面试题 02.01. 移除重复节点(链表删除操作模板)