莫那·鲁道

惯于闲看花飞花落, 心念天际云卷云舒.

Coder, 欢迎留言 😆.


GitHub

深入理解-Tomcat(九)源码剖析之请求过程


前言

不知不觉,这已经是我们深入理解tomcat的第九篇文章了,我们在第八篇分析了tomcat的连接器,分析了连接器的的Connector,Http11Protocol,Http11ConnectionHandler,JIoEndpoint,Acceptor 等等这些有关连接器的类和组件,当时我们分析到Acceptor的run方法后就停止分析了,因为后面的代码与请求过程高度相关,而且请求过程这段代码时比较复杂的,需要很大的篇幅去讲述。废话不多说,今天我们就开始分析 http://localhost:8080 在tomcat中是如何运作的,是如何到达的Servlet的。

序列图

首先来一张楼主画的序列图,然后,我们这篇文章基本就按照我们的这张图来讲述了。这张图有47个层次的调用,上传到简书就变模糊了,因此楼主将图片放到了github上,大家可以看的清楚一点。

时序图-点击查看


楼主这次分析,会先启动tomcat, 然后在

1. JIoEndpoint 分析

上次我们分析连接器的时候提到了一个干实事不摸鱼的好员工,JIoEndpoint,该类在创建Http11Protocol对象的时候会一起被创建,可以说他们是依赖关系。并且 JIoEndpoint 也包含一个 Http11ConnectionHandler 协议连接处理器类,该类是 Http11Protocol 的静态内部类。而 Http11ConnectionHandler 由依赖 Http11Protocol,可以说三者是一种循环的关系,Http11Protocol 依赖着 JIoEndpoint,Http11ConnectionHandler 依赖着 Http11Protocol ,JIoEndpoint 依赖着 Http11ConnectionHandler 。而处理 HTTP BIO 模式的连接主要由 JIoEndpoint 和它的两个(共4个内部类和一个内部接口)内部类 Acceptor 和 SocketProcessor 完成的。下面是JIoEndpoint 的类结构图:

Acceptor 我们在上次分析连接器的时候已经分析过了,它其实是用于接收HTTP 请求的线程。用于从 SocketServer 接受请求。

SocketProcessor 则是我们今天分析的源头,因为从我们的第八篇文章中知道,Acceptor 将请求处理之后会交给 SocketProcessor 进行真正的处理。那么我们就来分析分析该类。

2. SocketProcessor 分析

首先该类是一个继承了 Runnable 的内部类,还记得在连接器启动的时候,会启动一个线程池,该连接池就是用于执行该线程的。那么我们就看看该类的实现:

 /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     *
     * 这个类相当于工作人员,但是只会在一个外部执行器线程池中使用
     */
    protected class SocketProcessor implements Runnable {

        protected SocketWrapper<Socket> socket = null;
        protected SocketStatus status = null;

        public SocketProcessor(SocketWrapper<Socket> socket) {
            if (socket==null) throw new NullPointerException();
            this.socket = socket;
        }

        public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
            this(socket);
            this.status = status;
        }

        @Override
        public void run() {
            boolean launch = false;
            synchronized (socket) {
                try {
                    SocketState state = SocketState.OPEN;

                    try {
                        // SSL handshake
                        serverSocketFactory.handshake(socket.getSocket()); // 什么都不做
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("endpoint.err.handshake"), t);
                        }
                        // Tell to close the socket
                        state = SocketState.CLOSED;
                    }

                    if ((state != SocketState.CLOSED)) {// open
                        if (status == null) { // status == null
                            state = handler.process(socket, SocketStatus.OPEN);// AbstractProtocol.process(); state 变为 close
                        } else { // handler == Http11Protocol$Http11ConnectionHandler
                            state = handler.process(socket,status); // state = closed
                        }
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket
                        if (log.isTraceEnabled()) {
                            log.trace("Closing socket:"+socket);
                        }
                        countDownConnection();// 进入该方法
                        try {
                            socket.getSocket().close(); // 关闭流
                        } catch (IOException e) {
                            // Ignore
                        }
                    } else if (state == SocketState.OPEN ||
                            state == SocketState.UPGRADING  ||
                            state == SocketState.UPGRADED){
                        socket.setKeptAlive(true);
                        socket.access();
                        launch = true; // 此时才走finally try 逻辑
                    } else if (state == SocketState.LONG) {
                        socket.access();
                        waitingRequests.add(socket);// 长连接,
                    }
                } finally {
                    if (launch) {
                        try {
                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
                        } catch (RejectedExecutionException x) {
                            log.warn("Socket reprocessing request was rejected for:"+socket,x);
                            try {
                                //unable to handle connection at this time
                                handler.process(socket, SocketStatus.DISCONNECT);
                            } finally {
                                countDownConnection();
                            }


                        } catch (NullPointerException npe) {
                            if (running) {
                                log.error(sm.getString("endpoint.launch.fail"),
                                        npe);
                            }
                        }
                    }
                }
            }
            socket = null; // 完成请求
            // Finish up this request
        }

    }

首先该类由2个属性,一个是 SocketWrapper ,看名字就知道他其实就是 Socket 的包装类,另一个是 SocketStatus, 也就是 Socket 的状态。我们看看该类的 run 方法的执行逻辑:

  1. 首先处理socket的SSL。实际上 DefaultServerSocketFactory(也就是我们默认的) 是空的,什么都不做。
  2. 判断socket状态,如果是null,则设置为open,执行 Http11ConnectionHandler 的 process 方法。
  3. 执行 Http11ConnectionHandler 的 process 方法会返回一个 SocketState,后面会根据该状态执行不同的逻辑,如果是关闭,则减去一个连接数,并且关闭流。如果是开或则升级状态,则进入finally块继续交给线程池执行。如果是长连接,则放入ConcurrentLinkedQueue 队列,供另一个线程 AsyncTimeout 执行(最后还是交给 SocketProcessor 执行 );

可以看到这个方法不是很复杂,并且我们能感觉到主要逻辑会在第二步,因此我们就进入到第二步的 process 方法中查看。

3. Http11ConnectionHandler process 方法剖析

进入handler 的process 方法,实际上是进入了 Http11ConnectionHandler 的父类 AbstractConnectionHandler 的 process 方法,该方法是个模板方法,让我们看看该方法:

      public SocketState process(SocketWrapper<S> socket,
                SocketStatus status) {
            Processor<S> processor = connections.remove(socket.getSocket()); // connections 是用于缓存长连接的socket

            if (status == SocketStatus.DISCONNECT && processor == null) { // 如果是断开连接状态且协议处理器为null
                //nothing more to be done endpoint requested a close
                //and there are no object associated with this connection
                return SocketState.CLOSED;
            }

            socket.setAsync(false);// 非异步

            try {
                if (processor == null) {
                    processor = recycledProcessors.poll();// 如果从缓存中没取到,从可以循环使用的 ConcurrentLinkedQueue 获取
                }
                if (processor == null) {
                    processor = createProcessor(); // 如果还没有,则创建一个
                }

                initSsl(socket, processor); // 设置SSL 属性,默认为null,可以配置

                SocketState state = SocketState.CLOSED;
                do {
                    if (status == SocketStatus.DISCONNECT &&
                            !processor.isComet()) {
                        // Do nothing here, just wait for it to get recycled
                        // Don't do this for Comet we need to generate an end
                        // event (see BZ 54022)
                    } else if (processor.isAsync() ||
                            state == SocketState.ASYNC_END) {
                        state = processor.asyncDispatch(status); // 如果是异步的
                    } else if (processor.isComet()) {
                        state = processor.event(status); // 事件驱动???
                    } else if (processor.isUpgrade()) {
                        state = processor.upgradeDispatch(); // 升级转发???
                    } else {
                        state = processor.process(socket); // 默认的 AbstractHttp11Processor.process
                    }
    
                    if (state != SocketState.CLOSED && processor.isAsync()) {
                        state = processor.asyncPostProcess();
                    }

                    if (state == SocketState.UPGRADING) {
                        // Get the UpgradeInbound handler
                        UpgradeInbound inbound = processor.getUpgradeInbound();
                        // Release the Http11 processor to be re-used
                        release(socket, processor, false, false);
                        // Create the light-weight upgrade processor
                        processor = createUpgradeProcessor(socket, inbound);
                        inbound.onUpgradeComplete();
                    }
                } while (state == SocketState.ASYNC_END ||
                        state == SocketState.UPGRADING);

                if (state == SocketState.LONG) {
                    // In the middle of processing a request/response. Keep the
                    // socket associated with the processor. Exact requirements
                    // depend on type of long poll
                    longPoll(socket, processor);
                } else if (state == SocketState.OPEN) {
                    // In keep-alive but between requests. OK to recycle
                    // processor. Continue to poll for the next request.
                    release(socket, processor, false, true);
                } else if (state == SocketState.SENDFILE) {
                    // Sendfile in progress. If it fails, the socket will be
                    // closed. If it works, the socket will be re-added to the
                    // poller
                    release(socket, processor, false, false);
                } else if (state == SocketState.UPGRADED) {
                    // Need to keep the connection associated with the processor
                    longPoll(socket, processor);
                } else {
                    // Connection closed. OK to recycle the processor.
                    if (!(processor instanceof UpgradeProcessor)) {
                        release(socket, processor, true, false);
                    }
                }
                return state;
            } catch(java.net.SocketException e) {
                // SocketExceptions are normal
                getLog().debug(sm.getString(
                        "abstractConnectionHandler.socketexception.debug"), e);
            } catch (java.io.IOException e) {
                // IOExceptions are normal
                getLog().debug(sm.getString(
                        "abstractConnectionHandler.ioexception.debug"), e);
            }
            // Future developers: if you discover any other
            // rare-but-nonfatal exceptions, catch them here, and log as
            // above.
            catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                // any other exception or error is odd. Here we log it
                // with "ERROR" level, so it will show up even on
                // less-than-verbose logs.
                getLog().error(
                        sm.getString("abstractConnectionHandler.error"), e);
            }
            // Don't try to add upgrade processors back into the pool
            if (!(processor instanceof UpgradeProcessor)) {
                release(socket, processor, true, false);
            }
            return SocketState.CLOSED;
        }

该方法很长,我们简略的说一下主要逻辑:

  1. 从 ConcurrentHashMap 中获取长连接的封装了soceket的处理类。
  2. 如果没有,则从循环使用的 ConcurrentLinkedQueue 队列中获取。如果还没有,则调用自己的 createProcessor 直接创建一个。
  3. 调用子类的 initSsl 方法,初始化 SSL 属性。如果配置文件没配置,则设置为null。
  4. 根据不同的属性调用不同的方法,优先判断是否异步,默认使用同步,也就是 Http11Processor.process 方法。
  5. 执行结束后,根据返回的不同的状态调用不同的方法,比如 longPoll,release,参数也不同,默认是 ` release(socket, processor, true, false)`,放入ConcurrentLinkedQueue (recycledProcessors)队列中。

我们重点关注 AbstractHttp11Processor.process 方法,该方法是处理socket的主要逻辑。

4. Http11Processor.process 方法解析

该方法其实是其父类 AbstractHttp11Processor 的方法,特别的长,不知道为什么tomcat的大师们为什么不封装一下,代码这么长真的不太好看。楼主认为一个方法最好不要超过50行。越短越好。楼主无奈,只好贴出楼主简化的代码,大家凑合着看,如果想看详细的源码,可以留言也可以去我的github clone;

    @Override
    public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
 		// Setting up the I/O
        setSocketWrapper(socketWrapper);
        getInputBuffer().init(socketWrapper, endpoint);// 设置输入流
        getOutputBuffer().init(socketWrapper, endpoint);// 设置输出流

        prepareRequest();// 准备请求内容

        adapter.service(request, response); // 真正处理的方法 CoyoteAdapter

        return SocketState.OPEN;
     }

该方法步骤:

  1. 设置socket。
  2. 设置输入流和输出流。
  3. 继续向下执行。执行 CoyoteAdapter 的service 方法。
  4. 返回状态(默认返回 OPEN)供上层判断。

我们重点关注 CoyoteAdapter 的service 方法;

5. CoyoteAdapter.service 方法解析

该方法同样很长,我们看看该方法的逻辑:

    /**
     * Service method.
     */
    @Override
    public void service(org.apache.coyote.Request req,
                        org.apache.coyote.Response res)
        throws Exception {

        Request request = (Request) req.getNote(ADAPTER_NOTES); // 实现了 servlet 标准的 Request
        Response response = (Response) res.getNote(ADAPTER_NOTES);

        if (request == null) {

            // Create objects
            request = connector.createRequest();
            request.setCoyoteRequest(req);
            response = connector.createResponse();
            response.setCoyoteResponse(res);

            // Link objects
            request.setResponse(response); // 互相关联
            response.setRequest(request);

            // Set as notes
            req.setNote(ADAPTER_NOTES, request);
            res.setNote(ADAPTER_NOTES, response);

            // Set query string encoding
            req.getParameters().setQueryStringEncoding // 解析 uri
                (connector.getURIEncoding());

        }

        if (connector.getXpoweredBy()) { // 网站安全狗IIS
            response.addHeader("X-Powered-By", POWERED_BY);
        }

        boolean comet = false;
        boolean async = false;

        try {

            // Parse and set Catalina and configuration specific
            // request parameters
            req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
            boolean postParseSuccess = postParseRequest(req, request, res, response); // 解析请求内容
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container 调用 容器
                connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); // 一个复杂的调用

                if (request.isComet()) {
                    if (!response.isClosed() && !response.isError()) {
                        if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
                            // Invoke a read event right away if there are available bytes
                            if (event(req, res, SocketStatus.OPEN)) {
                                comet = true;
                                res.action(ActionCode.COMET_BEGIN, null);
                            }
                        } else {
                            comet = true;
                            res.action(ActionCode.COMET_BEGIN, null);
                        }
                    } else {
                        // Clear the filter chain, as otherwise it will not be reset elsewhere
                        // since this is a Comet request
                        request.setFilterChain(null);
                    }
                }

            }
            AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
            if (asyncConImpl != null) {
                async = true;
            } else if (!comet) {
                request.finishRequest();
                response.finishResponse();
                if (postParseSuccess &&
                        request.getMappingData().context != null) {
                    // Log only if processing was invoked.
                    // If postParseRequest() failed, it has already logged it.
                    // If context is null this was the start of a comet request
                    // that failed and has already been logged.
                    ((Context) request.getMappingData().context).logAccess(
                            request, response,
                            System.currentTimeMillis() - req.getStartTime(),
                            false);
                }
                req.action(ActionCode.POST_REQUEST , null);
            }

        } catch (IOException e) {
            // Ignore
        } finally {
            req.getRequestProcessor().setWorkerThreadName(null);
            // Recycle the wrapper request and response
            if (!comet && !async) {
                request.recycle();
                response.recycle();
            } else {
                // Clear converters so that the minimum amount of memory
                // is used by this processor
                request.clearEncoders();
                response.clearEncoders();
            }
        }

    }

我们分析一下该方法的逻辑:

  1. 创建实现 Servlet 标准的 Request 和 Response。
  2. 将Request 和 Response 互相关联。
  3. 执行 postParseRequest 解析请求内容。
  4. 执行最重要的步骤:connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
  5. 执行一些清理工作。

6. CoyoteAdapter.postParseRequest 方法解析

我们看看他是如何解析请求的内容的,也就是 postParseRequest 方法的实现:

    protected boolean postParseRequest(org.apache.coyote.Request req,
                                       Request request,
                                       org.apache.coyote.Response res,
                                       Response response)
            throws Exception {

        // XXX the processor may have set a correct scheme and port prior to this point,
        // in ajp13 protocols dont make sense to get the port from the connector...
        // otherwise, use connector configuration
        if (! req.scheme().isNull()) {
            // use processor specified scheme to determine secure state
            request.setSecure(req.scheme().equals("https"));
        } else {
            // use connector scheme and secure configuration, (defaults to
            // "http" and false respectively)
            req.scheme().setString(connector.getScheme());
            request.setSecure(connector.getSecure());
        }

        // FIXME: the code below doesnt belongs to here,
        // this is only have sense
        // in Http11, not in ajp13..
        // At this point the Host header has been processed.
        // Override if the proxyPort/proxyHost are set
        String proxyName = connector.getProxyName();
        int proxyPort = connector.getProxyPort();
        if (proxyPort != 0) {
            req.setServerPort(proxyPort);
        }
        if (proxyName != null) {
            req.serverName().setString(proxyName);
        }

        // Copy the raw URI to the decodedURI
        MessageBytes decodedURI = req.decodedURI();
        decodedURI.duplicate(req.requestURI());

        // Parse the path parameters. This will:
        //   - strip out the path parameters
        //   - convert the decodedURI to bytes
        parsePathParameters(req, request);

        // URI decoding
        // %xx decoding of the URL
        try {
            req.getURLDecoder().convert(decodedURI, false);
        } catch (IOException ioe) {
            res.setStatus(400);
            res.setMessage("Invalid URI: " + ioe.getMessage());
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Normalization
        if (!normalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Character decoding
        convertURI(decodedURI, request);
        // Check that the URI is still normalized
        if (!checkNormalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI character encoding");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }

        // Set the remote principal
        String principal = req.getRemoteUser().toString();
        if (principal != null) {
            request.setUserPrincipal(new CoyotePrincipal(principal));
        }

        // Set the authorization type
        String authtype = req.getAuthType().toString();
        if (authtype != null) {
            request.setAuthType(authtype);
        }

        // Request mapping.
        MessageBytes serverName;
        if (connector.getUseIPVHosts()) {
            serverName = req.localName();
            if (serverName.isNull()) {
                // well, they did ask for it
                res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
            }
        } else {
            serverName = req.serverName();
        }
        if (request.isAsyncStarted()) {
            //TODO SERVLET3 - async
            //reset mapping data, should prolly be done elsewhere
            request.getMappingData().recycle();
        }

        boolean mapRequired = true;
        String version = null;

        while (mapRequired) {
            if (version != null) {
                // Once we have a version - that is it
                mapRequired = false;
            }
            // This will map the the latest version by default
            connector.getMapper().map(serverName, decodedURI, version,
                                      request.getMappingData());
            request.setContext((Context) request.getMappingData().context);
            request.setWrapper((Wrapper) request.getMappingData().wrapper);

            // Single contextVersion therefore no possibility of remap
            if (request.getMappingData().contexts == null) {
                mapRequired = false;
            }

            // If there is no context at this point, it is likely no ROOT context
            // has been deployed
            if (request.getContext() == null) {
                res.setStatus(404);
                res.setMessage("Not found");
                // No context, so use host
                Host host = request.getHost();
                // Make sure there is a host (might not be during shutdown)
                if (host != null) {
                    host.logAccess(request, response, 0, true);
                }
                return false;
            }

            // Now we have the context, we can parse the session ID from the URL
            // (if any). Need to do this before we redirect in case we need to
            // include the session id in the redirect
            String sessionID = null;
            if (request.getServletContext().getEffectiveSessionTrackingModes()
                    .contains(SessionTrackingMode.URL)) {

                // Get the session ID if there was one
                sessionID = request.getPathParameter(
                        SessionConfig.getSessionUriParamName(
                                request.getContext()));
                if (sessionID != null) {
                    request.setRequestedSessionId(sessionID);
                    request.setRequestedSessionURL(true);
                }
            }

            // Look for session ID in cookies and SSL session
            parseSessionCookiesId(req, request);
            parseSessionSslId(request);

            sessionID = request.getRequestedSessionId();

            if (mapRequired) {
                if (sessionID == null) {
                    // No session means no possibility of needing to remap
                    mapRequired = false;
                } else {
                    // Find the context associated with the session
                    Object[] objs = request.getMappingData().contexts;
                    for (int i = (objs.length); i > 0; i--) {
                        Context ctxt = (Context) objs[i - 1];
                        if (ctxt.getManager().findSession(sessionID) != null) {
                            // Was the correct context already mapped?
                            if (ctxt.equals(request.getMappingData().context)) {
                                mapRequired = false;
                            } else {
                                // Set version so second time through mapping the
                                // correct context is found
                                version = ctxt.getWebappVersion();
                                // Reset mapping
                                request.getMappingData().recycle();
                                break;
                            }
                        }
                    }
                    if (version == null) {
                        // No matching context found. No need to re-map
                        mapRequired = false;
                    }
                }
            }
            if (!mapRequired && request.getContext().getPaused()) {
                // Found a matching context but it is paused. Mapping data will
                // be wrong since some Wrappers may not be registered at this
                // point.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Should never happen
                }
                // Reset mapping
                request.getMappingData().recycle();
                mapRequired = true;
            }
        }

        // Possible redirect
        MessageBytes redirectPathMB = request.getMappingData().redirectPath;
        if (!redirectPathMB.isNull()) {
            String redirectPath = urlEncoder.encode(redirectPathMB.toString());
            String query = request.getQueryString();
            if (request.isRequestedSessionIdFromURL()) {
                // This is not optimal, but as this is not very common, it
                // shouldn't matter
                redirectPath = redirectPath + ";" +
                        SessionConfig.getSessionUriParamName(
                            request.getContext()) +
                    "=" + request.getRequestedSessionId();
            }
            if (query != null) {
                // This is not optimal, but as this is not very common, it
                // shouldn't matter
                redirectPath = redirectPath + "?" + query;
            }
            response.sendRedirect(redirectPath);
            request.getContext().logAccess(request, response, 0, true);
            return false;
        }

        // Filter trace method
        if (!connector.getAllowTrace()
                && req.method().equalsIgnoreCase("TRACE")) {
            Wrapper wrapper = request.getWrapper();
            String header = null;
            if (wrapper != null) {
                String[] methods = wrapper.getServletMethods();
                if (methods != null) {
                    for (int i=0; i<methods.length; i++) {
                        if ("TRACE".equals(methods[i])) {
                            continue;
                        }
                        if (header == null) {
                            header = methods[i];
                        } else {
                            header += ", " + methods[i];
                        }
                    }
                }
            }
            res.setStatus(405);
            res.addHeader("Allow", header);
            res.setMessage("TRACE method is not allowed");
            request.getContext().logAccess(request, response, 0, true);
            return false;
        }

        return true;
    }
  1. 设置 请求的消息类型。
  2. 设置代理名称和代理端口(如果配置文件有的话)。
  3. 解析URL路径参数。
  4. 转换 URI 的编码。
  5. 从Connector 容器中的Mapper 中取出对应的 Context 和 Servlet, 设置 Request 的 WebApp应用和Servlet。如果Context不存在,则返回404。
  6. 解析cookie,解析sessionId,设置 SessionId,
  7. 判断方法类型是否是 TRACE 类型的方法,如果是,则返回405.不允许该方法进入服务器。

7.connector.getService().getContainer().getPipeline().getFirst().invoke(request, response) 中的管道与阀门解析

好,解析完 postParseRequest 之后,我们再看看下面的步骤::connector.getService().getContainer().getPipeline().getFirst().invoke(request, response),这个链式调用可以说很自信,一点不怕 NPE。很明显,他们很自信,因为再启动过程中就已经设置这些属性了。而这段代码也引出了2个概念,管道和阀,getPipeline 获取管道 Pipeline,getFirst 获取第一个阀。我们先说说什么是管道?什么是阀门?

我们知道,Tomcat 是由容器组成的,容器从大到小的排列依次是:Server–>Service—->Engine—>Host—>Context—>Wrapper,那么当一个请求过来,从大容器到小容器,他们一个一个的传递,像接力比赛,如果是我们设计,我们可能只需要将让大容器持有小容器就好了,即能够传递了,但是,如果在传递的过程中我们需要做一些事情呢?比如校验,比如记录日志,比如记录时间,比如权限,并且我们要保证不能耦合,随时可去除一个功能。我们该怎么办?相信由经验的同学已经想到了。

那就是使用过滤器模式。

众多的过滤器如何管理?使用管道,将过滤器都放在管道中,简直完美!!!

那么Tomcat中 Pipeline 就是刚刚说的管道,过滤器就是阀门 Valve,每个 Pipeline 只持有第一个阀门,后面的就不管了,因为第一个会指向第二个,第二个会指向第三个,便于拆卸。

那么现在是时候看看我们的时序图了,我们先请出一部分:

从图中我们可以看出:请求从SocketProcessor 进来,然后交给 Http11ConnectionHandler,最后交给 CoyoteAdapter,开始触及容器的管道。CoyoteAdapter 持有一个 Connector 实例,我们再初始化的时候已经知道了,Connector 会获取他对应的容器(一个容器对应多个连接器)StandardService,StandardService会获取他的下级容器 StandardEngine,这个时候,就该获取管道了,管道管理着阀门。阀门接口 Pipeline 只有一个标准实现 StandardPipeline,每个容器都持有该管道实例(在初始化的时候就创建好了)。管道会获取第一个阀门,如果不存在,就返回一个基础阀门(每个容器创建的时候都会放入一个标准的对应的容器阀门作为其基础阀门)。再调用阀门的 invoke 方法,该方法在执行完自己的逻辑之后,便调用子容器的管道中的阀门的 invoke 方法。依次递归。

现在我们图也看了,原理也说了,现在该看代码了,也就是 StandardEnglineValve 阀门的 invoke 方法:

    /**
     * Select the appropriate child Host to process this request,
     * based on the requested server name.  If no matching Host can
     * be found, return an appropriate HTTP error.
     * 
     * 根据所请求的服务器名称选择适当的子主机来处理这个请求。如果找不到匹配的主机,则返回一个适当的HTTP错误。
     *
     */
    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Select the Host to be used for this Request
        Host host = request.getHost();
        if (host == null) {
            response.sendError
                (HttpServletResponse.SC_BAD_REQUEST,
                 sm.getString("standardEngine.noHost", 
                              request.getServerName()));
            return;
        }
        if (request.isAsyncSupported()) {
            request.setAsyncSupported(host.getPipeline().isAsyncSupported());
        }

        // Ask this Host to process this request
        host.getPipeline().getFirst().invoke(request, response);

    }

该方法很简单,校验该Engline 容器是否含有Host容器,如果不存在,返回400错误,否则继续执行 host.getPipeline().getFirst().invoke(request, response),可以看到 Host 容器先获取自己的管道,再获取第一个阀门,我们再看看该阀门的 invoke 方法。

8. host.getPipeline().getFirst().invoke 解析

该链式调用获取的不是Basic 阀门,因为他设置了第一个阀门:

    @Override
    public Valve getFirst() {
        if (first != null) {
            return first;
        }
        return basic;
    }

返回的是 AccessLogValve 实例,我们进入 AccessLogValve 的 invoke 方法查看:

    @Override
    public void invoke(Request request, Response response) throws IOException,
            ServletException {
        getNext().invoke(request, response);
    }

啥也没做,只是将请求交给了下一个阀门,执行getNext,该方法是其父类 ValveBase 的方法。下一个阀门是谁呢? ErrorReportValve,我们看看该阀门的 invoke 方法:

 @Override
    public void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Perform the request
        getNext().invoke(request, response);

        if (response.isCommitted()) {
            return;
        }

        Throwable throwable =
                (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

        if (request.isAsyncStarted() && response.getStatus() < 400 &&
                throwable == null) {
            return;
        }

        if (throwable != null) {

            // The response is an error
            response.setError();

            // Reset the response (if possible)
            try {
                response.reset();
            } catch (IllegalStateException e) {
                // Ignore
            }

            response.sendError
                (HttpServletResponse.SC_INTERNAL_SERVER_ERROR);

        }

        response.setSuspended(false);

        try {
            report(request, response, throwable);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }

        if (request.isAsyncStarted()) {
            request.getAsyncContext().complete();
        }
    }

该方法首先执行了下个阀门的 invoke 方法。然后根据返回的Request 属性设置一些错误信息。那么下个阀门是谁呢?其实就是基础阀门了:StandardHostValve,该阀门的 invoke 的方法是如何实现的呢?

 @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Select the Context to be used for this Request
        Context context = request.getContext();
        if (context == null) {
            response.sendError
                (HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                 sm.getString("standardHost.noContext"));
            return;
        }

        // Bind the context CL to the current thread
        if( context.getLoader() != null ) {
            // Not started - it should check for availability first
            // This should eventually move to Engine, it's generic.
            if (Globals.IS_SECURITY_ENABLED) {
                PrivilegedAction<Void> pa = new PrivilegedSetTccl(
                        context.getLoader().getClassLoader());
                AccessController.doPrivileged(pa);                
            } else {
                Thread.currentThread().setContextClassLoader
                        (context.getLoader().getClassLoader());
            }
        }
        if (request.isAsyncSupported()) {
            request.setAsyncSupported(context.getPipeline().isAsyncSupported());
        }

        // Don't fire listeners during async processing
        // If a request init listener throws an exception, the request is
        // aborted
        boolean asyncAtStart = request.isAsync(); 
        // An async error page may dispatch to another resource. This flag helps
        // ensure an infinite error handling loop is not entered
        boolean errorAtStart = response.isError();
        if (asyncAtStart || context.fireRequestInitEvent(request)) {

            // Ask this Context to process this request
            try {
                context.getPipeline().getFirst().invoke(request, response);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                if (errorAtStart) {
                    container.getLogger().error("Exception Processing " +
                            request.getRequestURI(), t);
                } else {
                    request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
                    throwable(request, response, t);
                }
            }
    
            // If the request was async at the start and an error occurred then
            // the async error handling will kick-in and that will fire the
            // request destroyed event *after* the error handling has taken
            // place
            if (!(request.isAsync() || (asyncAtStart &&
                    request.getAttribute(
                            RequestDispatcher.ERROR_EXCEPTION) != null))) {
                // Protect against NPEs if context was destroyed during a
                // long running request.
                if (context.getState().isAvailable()) {
                    if (!errorAtStart) {
                        // Error page processing
                        response.setSuspended(false);
    
                        Throwable t = (Throwable) request.getAttribute(
                                RequestDispatcher.ERROR_EXCEPTION);
    
                        if (t != null) {
                            throwable(request, response, t);
                        } else {
                            status(request, response);
                        }
                    }
    
                    context.fireRequestDestroyEvent(request);
                }
            }
        }

        // Access a session (if present) to update last accessed time, based on a
        // strict interpretation of the specification
        if (ACCESS_SESSION) {
            request.getSession(false);
        }

        // Restore the context classloader
        if (Globals.IS_SECURITY_ENABLED) {
            PrivilegedAction<Void> pa = new PrivilegedSetTccl(
                    StandardHostValve.class.getClassLoader());
            AccessController.doPrivileged(pa);                
        } else {
            Thread.currentThread().setContextClassLoader
                    (StandardHostValve.class.getClassLoader());
        }
    }

首先校验了Request 是否存在 Context,其实在执行 CoyoteAdapter.postParseRequest 方法的时候就设置了,如果Context 不存在,就返回500,接着还是老套路:context.getPipeline().getFirst().invoke,该管道获取的是基础阀门:StandardContextValve,我们还是关注他的 invoke 方法。

@Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Disallow any direct access to resources under WEB-INF or META-INF
        MessageBytes requestPathMB = request.getRequestPathMB();
        if ((requestPathMB.startsWithIgnoreCase("/META-INF/", 0))
                || (requestPathMB.equalsIgnoreCase("/META-INF"))
                || (requestPathMB.startsWithIgnoreCase("/WEB-INF/", 0))
                || (requestPathMB.equalsIgnoreCase("/WEB-INF"))) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        // Select the Wrapper to be used for this Request
        Wrapper wrapper = request.getWrapper();
        if (wrapper == null || wrapper.isUnavailable()) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        // Acknowledge the request
        try {
            response.sendAcknowledgement();
        } catch (IOException ioe) {
            container.getLogger().error(sm.getString(
                    "standardContextValve.acknowledgeException"), ioe);
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe);
            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            return;
        }
        
        if (request.isAsyncSupported()) {
            request.setAsyncSupported(wrapper.getPipeline().isAsyncSupported());
        }
        wrapper.getPipeline().getFirst().invoke(request, response);
    }

该方法也只是一些校验,最后卡是调用 wrapper.getPipeline().getFirst().invoke,获取到的也是基础阀门,该阀门是StandardWeapperValve ,我们看看该方法。该方法很重要。

9. wrapper.getPipeline().getFirst().invoke 解析

该方法超长:

    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Initialize local variables we may need
        boolean unavailable = false;
        Throwable throwable = null;
        // This should be a Request attribute...
        long t1=System.currentTimeMillis();
        requestCount++;
        StandardWrapper wrapper = (StandardWrapper) getContainer();
        Servlet servlet = null;
        Context context = (Context) wrapper.getParent();
        
        // Check for the application being marked unavailable
        if (!context.getState().isAvailable()) {
            response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                           sm.getString("standardContext.isUnavailable"));
            unavailable = true;
        }

        // Check for the servlet being marked unavailable
        if (!unavailable && wrapper.isUnavailable()) {
            container.getLogger().info(sm.getString("standardWrapper.isUnavailable",
                    wrapper.getName()));
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                        sm.getString("standardWrapper.isUnavailable",
                                wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                        sm.getString("standardWrapper.notFound",
                                wrapper.getName()));
            }
            unavailable = true;
        }

        // Allocate a servlet instance to process this request
        try {
            if (!unavailable) {
                servlet = wrapper.allocate();
            }
        } catch (UnavailableException e) {
            container.getLogger().error(
                    sm.getString("standardWrapper.allocateException",
                            wrapper.getName()), e);
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                           sm.getString("standardWrapper.isUnavailable",
                                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                           sm.getString("standardWrapper.notFound",
                                        wrapper.getName()));
            }
        } catch (ServletException e) {
            container.getLogger().error(sm.getString("standardWrapper.allocateException",
                             wrapper.getName()), StandardWrapper.getRootCause(e));
            throwable = e;
            exception(request, response, e);
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString("standardWrapper.allocateException",
                             wrapper.getName()), e);
            throwable = e;
            exception(request, response, e);
            servlet = null;
        }

        // Identify if the request is Comet related now that the servlet has been allocated
        boolean comet = false;
        if (servlet instanceof CometProcessor && request.getAttribute(
                Globals.COMET_SUPPORTED_ATTR) == Boolean.TRUE) {
            comet = true;
            request.setComet(true);
        }
        
        MessageBytes requestPathMB = request.getRequestPathMB();
        DispatcherType dispatcherType = DispatcherType.REQUEST;
        if (request.getDispatcherType()==DispatcherType.ASYNC) dispatcherType = DispatcherType.ASYNC; 
        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR,dispatcherType);
        request.setAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR,
                requestPathMB);
        // Create the filter chain for this request
        ApplicationFilterFactory factory =
            ApplicationFilterFactory.getInstance();
        ApplicationFilterChain filterChain =
            factory.createFilterChain(request, wrapper, servlet);
        
        // Reset comet flag value after creating the filter chain
        request.setComet(false);

        // Call the filter chain for this request
        // NOTE: This also calls the servlet's service() method
        try {
            if ((servlet != null) && (filterChain != null)) {
                // Swallow output if needed
                if (context.getSwallowOutput()) {
                    try {
                        SystemLogHandler.startCapture();
                        if (request.isAsyncDispatching()) {
                            //TODO SERVLET3 - async
                            ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch(); 
                        } else if (comet) {
                            filterChain.doFilterEvent(request.getEvent());
                            request.setComet(true);
                        } else {
                            filterChain.doFilter(request.getRequest(), 
                                    response.getResponse());
                        }
                    } finally {
                        String log = SystemLogHandler.stopCapture();
                        if (log != null && log.length() > 0) {
                            context.getLogger().info(log);
                        }
                    }
                } else {
                    if (request.isAsyncDispatching()) {
                        //TODO SERVLET3 - async
                        ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch();
                    } else if (comet) {
                        request.setComet(true);
                        filterChain.doFilterEvent(request.getEvent());
                    } else {
                        filterChain.doFilter
                            (request.getRequest(), response.getResponse());
                    }
                }

            }
        } catch (ClientAbortException e) {
            throwable = e;
            exception(request, response, e);
        } catch (IOException e) {
            container.getLogger().error(sm.getString(
                    "standardWrapper.serviceException", wrapper.getName(),
                    context.getName()), e);
            throwable = e;
            exception(request, response, e);
        } catch (UnavailableException e) {
            container.getLogger().error(sm.getString(
                    "standardWrapper.serviceException", wrapper.getName(),
                    context.getName()), e);
            //            throwable = e;
            //            exception(request, response, e);
            wrapper.unavailable(e);
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                           sm.getString("standardWrapper.isUnavailable",
                                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                            sm.getString("standardWrapper.notFound",
                                        wrapper.getName()));
            }
            // Do not save exception in 'throwable', because we
            // do not want to do exception(request, response, e) processing
        } catch (ServletException e) {
            Throwable rootCause = StandardWrapper.getRootCause(e);
            if (!(rootCause instanceof ClientAbortException)) {
                container.getLogger().error(sm.getString(
                        "standardWrapper.serviceExceptionRoot",
                        wrapper.getName(), context.getName(), e.getMessage()),
                        rootCause);
            }
            throwable = e;
            exception(request, response, e);
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString(
                    "standardWrapper.serviceException", wrapper.getName(),
                    context.getName()), e);
            throwable = e;
            exception(request, response, e);
        }

        // Release the filter chain (if any) for this request
        if (filterChain != null) {
            if (request.isComet()) {
                // If this is a Comet request, then the same chain will be used for the
                // processing of all subsequent events.
                filterChain.reuse();
            } else {
                filterChain.release();
            }
        }

        // Deallocate the allocated servlet instance
        try {
            if (servlet != null) {
                wrapper.deallocate(servlet);
            }
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString("standardWrapper.deallocateException",
                             wrapper.getName()), e);
            if (throwable == null) {
                throwable = e;
                exception(request, response, e);
            }
        }

        // If this servlet has been marked permanently unavailable,
        // unload it and release this instance
        try {
            if ((servlet != null) &&
                (wrapper.getAvailable() == Long.MAX_VALUE)) {
                wrapper.unload();
            }
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString("standardWrapper.unloadException",
                             wrapper.getName()), e);
            if (throwable == null) {
                throwable = e;
                exception(request, response, e);
            }
        }
        long t2=System.currentTimeMillis();

        long time=t2-t1;
        processingTime += time;
        if( time > maxTime) maxTime=time;
        if( time < minTime) minTime=time;

    }

我们分析一下该方法的重要步骤:

  1. 获取 StandardWrapper(封装了Servlet) 实例调用 allocate 方法获取 Stack 中的 Servlet 实例;
  2. 判断servlet 是否实现了 CometProcessor 接口,如果实现了则设置 request 的comet(Comet:基于 HTTP 长连接的“服务器推”技术) 属性为 true。
  3. 获取 ApplicationFilterFactory 单例(注意:这个获取单例的代码是有线程安全问题的),调用该单例的 createFilterChain 方法获取 ApplicationFilterChain 过滤器链实例。
  4. 执行过滤器链 filterChain 的 doFilter 方法。该方法会循环执行所有的过滤器,最终执行 servlet 的 servie 方法。

我们分析一下 allocate 方法。

10. StandardWrapper.allcate 获取 Servlet 实例方法解析

源码:

   @Override
    public Servlet allocate() throws ServletException {

        // If we are currently unloading this servlet, throw an exception
        if (unloading)
            throw new ServletException
              (sm.getString("standardWrapper.unloading", getName()));

        boolean newInstance = false;
        
        // If not SingleThreadedModel, return the same instance every time
        if (!singleThreadModel) {

            // Load and initialize our instance if necessary
            if (instance == null) {
                synchronized (this) {
                    if (instance == null) {
                        try {
                            if (log.isDebugEnabled())
                                log.debug("Allocating non-STM instance");

                            instance = loadServlet();
                            if (!singleThreadModel) {
                                // For non-STM, increment here to prevent a race
                                // condition with unload. Bug 43683, test case
                                // #3
                                newInstance = true;
                                countAllocated.incrementAndGet();
                            }
                        } catch (ServletException e) {
                            throw e;
                        } catch (Throwable e) {
                            ExceptionUtils.handleThrowable(e);
                            throw new ServletException
                                (sm.getString("standardWrapper.allocate"), e);
                        }
                    }
                }
            }

            if (!instanceInitialized) {
                initServlet(instance);
            }

            if (singleThreadModel) {
                if (newInstance) {
                    // Have to do this outside of the sync above to prevent a
                    // possible deadlock
                    synchronized (instancePool) {
                        instancePool.push(instance);
                        nInstances++;
                    }
                }
            } else {
                if (log.isTraceEnabled())
                    log.trace("  Returning non-STM instance");
                // For new instances, count will have been incremented at the
                // time of creation
                if (!newInstance) {
                    countAllocated.incrementAndGet();
                }
                return (instance);
            }
        }

        synchronized (instancePool) {

            while (countAllocated.get() >= nInstances) {
                // Allocate a new instance if possible, or else wait
                if (nInstances < maxInstances) {
                    try {
                        instancePool.push(loadServlet());
                        nInstances++;
                    } catch (ServletException e) {
                        throw e;
                    } catch (Throwable e) {
                        ExceptionUtils.handleThrowable(e);
                        throw new ServletException
                            (sm.getString("standardWrapper.allocate"), e);
                    }
                } else {
                    try {
                        instancePool.wait();
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
            }
            if (log.isTraceEnabled())
                log.trace("  Returning allocated STM instance");
            countAllocated.incrementAndGet();
            return instancePool.pop();

        }

    }

该方法很长,我们来看看该方法步骤: 判断该类(StandardWrapper)中的 Servlet 实例是否为null,默认不为null,该实例在初始化的时候就已经注入,如果没有注入,则调用 loadServlet 方法,反射加载实例(注意,如果这个servlet 实现了 singleThreadModel 接口,该StandardWrapper 就是多个servlet 实例的,默认是单个实例,多个实例会放入一个Stack(这个栈不是早就不建议使用了吗) 类型的栈中)。

11. ApplicationFilterFactory 解析(tomcat 7 会有并发问题)

获取创建过滤器链工厂的单例,但楼主看了代码,发现该代码一定会有问题。

    public static ApplicationFilterFactory getInstance() {
        if (factory == null) {
            factory = new ApplicationFilterFactory();
        }
        return factory;
    }

读过楼主 深入解析单例模式 的文章应该知道,这种写法并发的时候一定是有问题的,会创建多个实例。但是楼主下载了最新的 tomcat 源码,已经解决了该 bug,新的 tomcat 已经把 getInstance 去除了,将 createFilterChain 方法改为静态方法。

获取过滤器工厂链后,创建过滤器链实例,从该Wrapper 中获取父容器,从父容器 StandardContext 中获取实例,从该实例中获取 filterMaps,该 filterMaps 在初始化容器时从web.xml 中创建。

12. 执行过滤器链的 doFilter 方法

该方法主要执行 ApplicationFilterChain.internalDoFilter() 方法,那么 方法 internalDoFilter 内部又是如何实现的呢?下面时该方法的主要逻辑(主要时楼主字数受限了):

      // Call the next filter if there is one
        if (pos < n) {
            filter.doFilter(request, response, this);
        }
        servlet.service(request, response);

可以看到,ApplicationFilterChain 中维护了2个变量,当前位置 pos 和 过滤器数量,因此要执行完所有的过滤器,而过滤器最终又会执行 doFilter 方法,就会又回到该方法,直到执行完所有的过滤器,最后执行 servlet 的 service 方法。到这里,一个完整的http请求就从socket 到了我们编写的servlet中了。

13. 总结

终于,我们完成了对tomcat请求过程的剖析,详细理解了tomcat是如何处理一个http请求,也完了我们最初的计划,下一篇就是总结我们的前9篇文章,敬请期待。