在上一篇文章里我们主要介绍了 tomcat NIO 中 poller 线程的阻塞与唤醒,根据以前文章当 poller 线程监测到连接有数据可读事件的时候,会把原始 socket 的包装对象委托到 tomcat io 线程池中处理,包括解析请求行,请求头,调用 servlet API,处理异步等等,在这里我们主要介绍 tomcat io 线程。

对于tomcat io线程我们主要介绍:

  • IO 线程 overall 流程

  • IO 线程主要涉及的类以及作用


对于 tomcat io 线程来说,overall 调用序列图如下 :

  • SocketProcessor.run()-->




    CoyoteAdapter.service() 一直到 container 调用标准 servlet API。

  • 涉及的关键类有SocketProcessor,ConnectionHandlerAbstractProcessorLight,Http11Processor,CoyoteAdapter


protected void doRun() {    NioChannel socket = socketWrapper.getSocket();    SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());    Poller poller = NioEndpoint.this.poller;    if (poller == null) {        socketWrapper.close();        return;    }    try {        int handshake = -1;        try {            if (key != null) {                if (socket.isHandshakeComplete()) {                    handshake = 0;                } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {                    handshake = -1;                } else {                    handshake = socket.handshake(key.isReadable(), key.isWritable());                    event = SocketEvent.OPEN_READ;                }            }        } catch (IOException x) {            handshake = -1;            if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);        } catch (CancelledKeyException ckx) {            handshake = -1;        }        if (handshake == 0) {            SocketState state = SocketState.OPEN;            if (event == null) {                state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);            } else {                state = getHandler().process(socketWrapper, event);            }            if (state == SocketState.CLOSED) {                poller.cancelledKey(key, socketWrapper);            }        } else if (handshake == -1 ) {            poller.cancelledKey(key, socketWrapper);        } else if (handshake == SelectionKey.OP_READ){            socketWrapper.registerReadInterest();        } else if (handshake == SelectionKey.OP_WRITE){            socketWrapper.registerWriteInterest();        }    } catch (CancelledKeyException cx) {        poller.cancelledKey(key, socketWrapper);    } catch (VirtualMachineError vme) {        ExceptionUtils.handleThrowable(vme);    } catch (Throwable t) {        log.error(sm.getString("endpoint.processing.fail"), t);        poller.cancelledKey(key, socketWrapper);    } finally {        socketWrapper = null;        event = null;        if (running && !paused && processorCache != null) {            processorCache.push(this);        }    }}
  • 首先会处理 handshake,如果 handshake 没有问题则返回 handshake 的结果为 0。

  • 如果 handshake 过程处理正常没问题,则会通过调用 getHandler().process(socketWrapper, event) 方法从而来间接触发 ConnectionHandler 实例的 process() 方法,并返回期望原始 socket 的状态 SocketState 枚举。

  • 如果返回的 SocketState 为 CLOSED ,则调用 poller.cancelledKey() 方法,会把原始 sockte 关闭。

  • 最后会把 SocketProcessor 实例回收到缓存 processorCache 中,以便下次使用不需要重新创建对象,从而提高效率。

  • 另外 ConnectionHandler 是 global 对象,也就是说所有的连接处理均由这个对象处理。根据以前文章,该实例中有一个 Map 对象,key 为SocketWrapper 对象类型,对应的 value 为 Http11Processor 类型。也就是说为连接中的每一个请求(request)都去分配了相应处理类 Http11Processor 实例,可以保存连接上请求的状态信息(例如解析请求行,请求头等数据)。


private final Map connections = new ConcurrentHashMap<>();private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);public SocketState process(SocketWrapperBase wrapper, SocketEvent status) {    if (getLog().isDebugEnabled()) {        getLog().debug(sm.getString("abstractConnectionHandler.process",                wrapper.getSocket(), status));    }    if (wrapper == null) {        return SocketState.CLOSED;    }    S socket = wrapper.getSocket();    Processor processor = connections.get(socket);    if (getLog().isDebugEnabled()) {        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",                processor, socket));    }    if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {        return SocketState.OPEN;    }    if (processor != null) {        getProtocol().removeWaitingProcessor(processor);    } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {        return SocketState.CLOSED;    }    ContainerThreadMarker.set();    try {        if (processor == null) {            String negotiatedProtocol = wrapper.getNegotiatedProtocol();            if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {                UpgradeProtocol upgradeProtocol =                        getProtocol().getNegotiatedProtocol(negotiatedProtocol);                if (upgradeProtocol != null) {                    processor = upgradeProtocol.getProcessor(                            wrapper, getProtocol().getAdapter());                } else if (negotiatedProtocol.equals("http/1.1")) {                    // Explicitly negotiated the default protocol.                    // Obtain a processor below.                } else {                    if (getLog().isDebugEnabled()) {                        getLog().debug(sm.getString(                            "abstractConnectionHandler.negotiatedProcessor.fail",                            negotiatedProtocol));                    }                    return SocketState.CLOSED;                }            }        }        if (processor == null) {            processor = recycledProcessors.pop();            if (getLog().isDebugEnabled()) {                getLog().debug(sm.getString("abstractConnectionHandler.processorPop",                        processor));            }        }        if (processor == null) {            processor = getProtocol().createProcessor();            register(processor);        }        processor.setSslSupport(                wrapper.getSslSupport(getProtocol().getClientCertProvider()));        connections.put(socket, processor);        SocketState state = SocketState.CLOSED;        do {            state = processor.process(wrapper, status);            if (state == SocketState.UPGRADING) {                UpgradeToken upgradeToken = processor.getUpgradeToken();                ByteBuffer leftOverInput = processor.getLeftoverInput();                if (upgradeToken == null) {                    UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");                    if (upgradeProtocol != null) {                        processor = upgradeProtocol.getProcessor(                                wrapper, getProtocol().getAdapter());                        wrapper.unRead(leftOverInput);                        connections.put(socket, processor);                    } else {                        if (getLog().isDebugEnabled()) {                            getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));                        }                        return SocketState.CLOSED;                    }                } else {                    HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();                    release(processor);                    processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);                    if (getLog().isDebugEnabled()) {                        getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));                    }                    wrapper.unRead(leftOverInput);                    wrapper.setUpgraded(true);                    connections.put(socket, processor);                    if (upgradeToken.getInstanceManager() == null) {                        httpUpgradeHandler.init((WebConnection) processor);                    } else {                        ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);                        try {                            httpUpgradeHandler.init((WebConnection) processor);                        } finally {                            upgradeToken.getContextBind().unbind(false, oldCL);                        }                    }                    if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {                        if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {                            state = SocketState.LONG;                        }                    }                }            }        } while ( state == SocketState.UPGRADING);        if (state == SocketState.LONG) {            longPoll(wrapper, processor);            if (processor.isAsync()) {                getProtocol().addWaitingProcessor(processor);            }        } else if (state == SocketState.OPEN) {            connections.remove(socket);            release(processor);            wrapper.registerReadInterest();        } else if (state == SocketState.SENDFILE) {            // Sendfile in progress. If it fails, the socket will be            // closed. If it works, the socket either be added to the            // poller (or equivalent) to await more data or processed            // if there are any pipe-lined requests remaining.        } else if (state == SocketState.UPGRADED) {            // Don't add sockets back to the poller if this was a            // non-blocking write otherwise the poller may trigger            // multiple read events which may lead to thread starvation            // in the connector. The write() method will add this socket            // to the poller if necessary.            if (status != SocketEvent.OPEN_WRITE) {                longPoll(wrapper, processor);            }        } else if (state == SocketState.SUSPENDED) {            // Don't add sockets back to the poller.            // The resumeProcessing() method will add this socket            // to the poller.        } else {            connections.remove(socket);            if (processor.isUpgrade()) {                UpgradeToken upgradeToken = processor.getUpgradeToken();                HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();                InstanceManager instanceManager = upgradeToken.getInstanceManager();                if (instanceManager == null) {                    httpUpgradeHandler.destroy();                } else {                    ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);                    try {                        httpUpgradeHandler.destroy();                    } finally {                        try {                            instanceManager.destroyInstance(httpUpgradeHandler);                        } catch (Throwable e) {                            ExceptionUtils.handleThrowable(e);                            getLog().error(sm.getString("abstractConnectionHandler.error"), e);                        }                        upgradeToken.getContextBind().unbind(false, oldCL);                    }                }            } else {                release(processor);            }        }        return state;    } catch(java.net.SocketException e) {        getLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);    } catch (java.io.IOException e) {        getLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);    } catch (ProtocolException e) {        getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);    }    catch (OutOfMemoryError oome) {        getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);    } catch (Throwable e) {        ExceptionUtils.handleThrowable(e);        getLog().error(sm.getString("abstractConnectionHandler.error"), e);    } finally {        ContainerThreadMarker.clear();    }    connections.remove(socket);    release(processor);    return SocketState.CLOSED;}
  • 该实例中有一个 Map 对象,key 为原始 scoket 的包装对象 SocketWrapper 类型,value 为 Http11Processor 类型。为每个连接中的请求都分配处理类 Http11Processor 实例,可以保存连接上的请求状态信息(例如解析请求行,请求头等数据)。
  • 该实例有 recycledProcessors 对象,用来保持已经回收的 Http11Processor 实例,避免下次使用重新创建对象,提高效率。
  • 该实例核心方法为 process() 方法,代码比较多,这里总结关键点。
  • 该实例寻找 Map 中原始 socket 的包装对象关联的连接处理类 Http11Processor 实例,如果没有则创建新实例,并将 socket 包装对象与其关联到 Map 里,下次同一个连接中有数据的时候可以重用,而且也可以保存连接状态信息在 Http11Processor 实例中。
  • 该实例调用 Http11Processor 实例的 process 方法并返回 SocketState。
  • 如果返回的状态是代表 upgrade 协议(例如websocket连接等),则处理 upgrade 协议,这里不对 upgrade 协议详细展开。
  • 如果回的状态为 SocketState.LONG ,则代表要么是数据(请求行/请求头)没有解析完(因为 client 端没有发送完请求行/请求头数据),要么是执行了 servlet 的异步请求。
  • 对于 SocketState.LONG 的返回值调用如下:

    protected void longPoll(SocketWrapperBase> socket, Processor processor) {if (!processor.isAsync()) {// This is currently only used with HTTP// Either://  - this is an upgraded connection//  - the request line/headers have not been completely//    read          socket.registerReadInterest();     }}//SocketWrapperpublic void registerReadInterest() {      getPoller().add(this, SelectionKey.OP_READ);}
  • longPoll() 方法在 servlet 非异步请求(请求行/请求头数据没有解析完)的情况,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程的事件队列里,让 poller 线程继续监听 client 端可读事件。
  • 对于 SocketState.OPEN 的返回值调用:

  • SocketState.OPEN 一般代表 servlet API 调用正常,返回 OPEN 表示该连接为长连接,不关闭原始 socket 。所以在 Map中会去移除 socket 和Http11Processor 的对应关系,来释放当前 Http11Processor 实例以便后续重用。由于是长连接,所以和异步处理方式一样,对 socket 包装对象注册 OP_READ 事件,并添加到 poller 线程事件队列中,让 poller 线程继续监听 client 端可读事件。
  • 在最后的 else 分支中代表返回的期望状态为 CLOSED ,表示该连接需要关闭,则在 Map 中移除 socket 和 Http11Processor 的对应关系,然后会释放当前 Http11Processor 实例以便后续重用。根据上面 ConnectionHanlder 的分析,如果返回的 SocketState 枚举的结果为 CLOSED,则会去调用 poller.cancelledKey() 方法,从而把原始 socket 关闭。

目前先写到这里,下一篇文章里我们继续介绍 tomcat io 线程涉及的其他关键类。

