Tomcat 接收连接的accept流程

  1. 云栖社区>
  2. 博客列表>
  3. 正文

Tomcat 接收连接的accept流程

晴天哥 2018-11-29 10:10:38 浏览334 评论0

摘要: 开篇  这篇文章的主要目的是分析下Tomcat在处理连接请求的整个过程,参考了前人的文章并在文末指出,通过时序图能够较清楚的走通整个流程。 Tomcat处理流程 说明: Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。

开篇

 这篇文章的主要目的是分析下Tomcat在处理连接请求的整个过程,参考了前人的文章并在文末指出,通过时序图能够较清楚的走通整个流程。


Tomcat处理流程

Tomcat处理流程
说明:

Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。

    1. Acceptor线程组。用于接受新连接,并将新连接封装一下,选择一个 Poller 将新连接添加到 Poller 的事件队列中,Acceptor线程组是多个线程组成的线程组
    1. Poller 线程组。用于监听 Socket 事件,当 Socket 可读或可写等等时,将 Socket 封装一下添加到 worker 线程池的任务队列中,Poller线程组是多个线程组成的线程组
    1. worker 线程组。用于对请求进行处理,包括分析请求报文并创建 Request 对象,调用容器的 pipeline 进行处理,worker线程组是Executor创建的线程池
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    public void startInternal() throws Exception {
            // 创建worker线程组
            if ( getExecutor() == null ) {
                createExecutor();
            }

            // Poller线程组由一堆线程组成
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }
}

public abstract class AbstractEndpoint<S> {
    // Acceptor线程组由一堆线程组成
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

    // worker的线程组由executor创建线程池组成
    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }
}


请求处理过程分解

Acceptor接收连接过程

Acceptor.jpg

说明:
Acceptor接受的新连接没有立即注册到selector当中,需要先封装成PollerEvent对象后保存至PollerEvent队列当中,Poller对象会消费PollerEvent队列,类似生产消费模型。

    1. Acceptor 在启动后会阻塞在 ServerSocketChannel.accept(); 方法处,当有新连接到达时,该方法返回一个 SocketChannel。
    1. setSocketOptions()方法将 Socket 封装到 NioChannel 中,并注册到 Poller。
    1. 我们一开始就启动了多个 Poller 线程,注册的时候采用轮询选择 Poller 。NioEndpoint 维护了一个 Poller 数组,当一个连接分配给 pollers[index] 时,下一个连接就会分配给 pollers[(index+1)%pollers.length]。
    1. addEvent() 方法会将 Socket 添加到该 Poller 的 PollerEvent 队列中。到此 Acceptor 的任务就完成了。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    private volatile ServerSocketChannel serverSock = null;
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        public void run() {

            while (running) {
                state = AcceptorState.RUNNING;
                try {
                    SocketChannel socket = null;
                    try {
                        // 监听socket负责接收新连接
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                    }

                    if (running && !paused) {
                        // 处理接受到的socket对象
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } 
                } catch (Throwable t) {
                }
            }
            state = AcceptorState.ENDED;
        }
    }


    protected boolean setSocketOptions(SocketChannel socket) {
        try {
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            
            channel = new NioChannel(socket, bufhandler);
            // 注册到Poller当中
            getPoller0().register(channel);
        } catch (Throwable t) {
        }

        return true;
    }

    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }


    public class Poller implements Runnable {
        public void register(final NioChannel socket) {
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            r = new PollerEvent(socket,ka,OP_REGISTER);
            
            // 添加PollerEvent队列当中
            addEvent(r);
        }


        private void addEvent(PollerEvent event) {
            // 投入到PollerEvent队列当中
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
    }

}


Poller处理请求

Poller.jpg

说明:
Poller会消费PollerEvent队列(由Acceptor进行投递),并注册到Selector当中。当注册到Selector的socket数据可读的时候将socket封装成SocketProcessor对象,投递到Executor实现的线程池进行处理。

    1. selector.select(1000)。当 Poller 启动后因为 selector 中并没有已注册的 Channel,所以当执行到该方法时只能阻塞。所有的 Poller 共用一个 Selector,其实现类是 sun.nio.ch.SelectorImpl。
    1. events() 方法通过 addEvent() 方法添加到事件队列中的 Socket 注册到SelectorImpl。这里指的socket是accept过来的请求的socket。
    1. 当 Socket 可读时,Poller 才对其进行处理,createSocketProcessor() 方法将 Socket 封装到 SocketProcessor 中,SocketProcessor 实现了 Runnable 接口。worker 线程通过调用其 run() 方法来对 Socket 进行处理。
    1. execute(SocketProcessor) 方法将 SocketProcessor 提交到线程池,放入线程池的 workQueue 中。workQueue 是 BlockingQueue 的实例。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    public static class PollerEvent implements Runnable {
        private NioChannel socket;
        private int interestOps;
        private NioSocketWrapper socketWrapper;

        public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
            reset(ch, w, intOps);
        }

        public void run() {
            if (interestOps == OP_REGISTER) {
                try {
                    socket.getIOChannel().register(
                            socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                } catch (Exception x) {

                }
            }
        }
    }


    public class Poller implements Runnable {

        public void run() {
            while (true) {
                // events()负责处理PollerEvent事件并注册到selector当中
                hasEvents = events();
                keyCount = selector.select(selectorTimeout);

                // 处理新接受的socket的读写事件
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();

                    processKey(sk, attachment);
                }
            }
        }

        // 处理读写事件
        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            if (sk.isReadable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                    closeSocket = true;
                }
             }
                            
            if (!closeSocket && sk.isWritable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                    closeSocket = true;
                }
            }
        }
    }
}


public abstract class AbstractEndpoint<S> {
    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {

        try {
            sc = createSocketProcessor(socketWrapper, event);
            Executor executor = getExecutor();
            // 注册到Worker的线程池ThreadPoolExecutor。
            if (dispatch && executor != null) {
                executor.execute(sc);
            } 
        } catch (RejectedExecutionException ree) {
        } 

        return true;
    }
}


Worker处理具体请求

Worker.jpg

说明:

    1. 当新任务添加到 workQueue(ThreadPoolExecutor)后,workQueue.take()方法会返回一个 Runnable,通常是 SocketProcessor,然后 worker 线程调用 SocketProcessor的run() -> doRun()方法对 Socket 进行处理。
    1. createProcessor() 会创建一个Http11Processor, 它用来解析 Socket,将 Socket 中的内容封装到Request中。注意这个Request是临时使用的一个类,它的全类名是org.apache.coyote.Request。
    1. CoyoteAdapter的postParseRequest()方法封装一下 Request,并处理一下映射关系(从 URL 映射到相应的 Host、Context、Wrapper)。
    1. CoyoteAdapter将 Rquest 提交给 Container(StandardEngine) 处理之前,并将 org.apache.coyote.Request封装到 org.apache.catalina.connector.Request,传递给 Container处理的 Request 是 org.apache.catalina.connector.Request。
    1. connector.getService().getMapper().map(),用来在Mapper中查询 URL 的映射关系。映射关系会保留到 org.apache.catalina.connector.Request 中,Container处理阶段 request.getHost()是使用的就是这个阶段查询到的映射主机,以此类推 request.getContext()、request.getWrapper()都是。
    1. connector.getService().getContainer().getPipeline().getFirst().invoke()会将请求传递到 Container(StandardEngine)处理,至此进入了Engine->Host->Context->Wrapper的处理流程,当然了 Container处理也是在 Worker线程中执行的(也就是说Tomcat处理请求是通过ThreadPoolExecutor的线程池实现的),但是这是一个相对独立的模块,所以单独分出来一节。


Container单个请求处理流程

StandardEngineValve

说明:

    1. 每个容器(Engine、Host、Context、Wrapper)的 StandardPipeline 上都会有多个已注册的 Valve,我们只关注每个容器的 Basic Valve,其他 Valve 都是在 Basic Valve 前执行。
    1. request.getHost().getPipeline().getFirst().invoke() 先获取对应的 StandardHost,并执行其 pipeline。
    1. request.getContext().getPipeline().getFirst().invoke() 先获取对应的 StandardContext,并执行其 pipeline。
    1. request.getWrapper().getPipeline().getFirst().invoke() 先获取对应的 StandardWrapper,并执行其 pipeline。
    1. StandardWrapper的Basic Valve是StandardWrapperValve,通过allocate() 用来加载并初始化 Servlet,值的一提的是 Servlet 并不都是单例的,当 Servlet 实现了 SingleThreadModel 接口后,StandardWrapper 会维护一组 Servlet 实例,这是享元模式。当然了 SingleThreadModel在 Servlet 2.4 以后就弃用了。
    1. createFilterChain() 方法会从 StandardContext 中获取到所有的过滤器,然后将匹配 Request URL 的所有过滤器挑选出来添加到 filterChain 中。
      doFilter() 执行过滤链,当所有的过滤器都执行完毕后调用 Servlet 的 service() 方法。


参考文章

谈谈 Tomcat 请求处理流程


招聘信息

【招贤纳士】

欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。

贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com

贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。

贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。

公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)

【云栖快讯】阿里云栖开发者沙龙(Java技术专场)火热来袭!快来报名参与吧!  详情请点击

网友评论