Dubbo-多线程通信原理

简介: 根据dubbo的源代码,讲述跨服务器场景下,多线程如何通信


本文代码摘录的时候,将一些与本流程无关的内容去掉了,如有需要请看源码。

如果大家对Dubbo RPC原理原理感兴趣,可以看我之前写过的另外一篇博客《 Dubbo RPC》。

 

一、 思考与目标

1. 思考

并发情况下,dubbo的RPC模型如下图所示:

b560b47f97fde73084638823a66e49f43506bc2c

如图所示,Consumer端可能同时有多个线程调用Provider的服务,此时Provider会启动多个线程来分别处理这些并发调用,处理完以后将数据返回。在这种多线程环境中,Dubbo是如何做到Consumer Thread A不会拿到其他线程的请求结果?

其实这是一个非常普遍的问题,例如我们的服务器与MQ服务器、数据库、缓存等等第三方服务器通信时,都需要确保并发环境下数据不会错乱,并且要尽可能降低服务器的性能损耗。

 

 

2. 目标

l   探究Dubbo RPC多线程通信原理。

l   写一个简单的示例,实现多线程数据交换

 

二、   Dubbo多线程通信原理

1. 流程图


8ad7d4ad5a6ec8a2ae2ff741a490eb351043df7c

如上图所示,消费端多线程并行消费服务的场景,主要流程如下

  • 获取DubboInvoker对象;
  • 将请求体信息封装在一个Request对象中,Request中会包括一个自增的id;然后将Request存到一个ConcurrentHashMap中(key=id,value= DefaultFuture)
  • 将request数据写入Channel,
  • Consumer Thread执行Defaultfuture#get()方法等待返回结果;
  • 服务提供方创建多线程处理用户请求,并将放回结果封装在Response中(包括Request#id)
  • 将response写入Channel
  • 消费方从Channel中收到数据以后,解析出id,从Map中解析出DefaultFuture
  • 唤醒Consumer Thread,返回结果;
  • DefaultFuture也会启动一个定时程序,检查在timeout内,结果是否返回,如果未返回,将DefaultFuture从map中移除,并抛出超时异常。

 

 

2. 多线程下的通信

DubboInvoker#doInvoke方法中,在ExchangeClient#request(inv, timeout)调用时,返回一个DefaultFuture对象,接着会调用DefaultFuture.get()方法(等待返回结果)。

对于consumer端而言,服务器会为每一个请求创建一个线程,因为rpc操作是一个慢动作,为了节省资源,当线程发送rpc请求后,需要让当前线程释放资源、进入等待队列,当获取到返回结果以后,再唤醒这个线程。

RPC请求的过程为:每一个RPC请求都有一个唯一的id,RPC请求的时候,会将此id也发送给provider;provider处理完请求后会将此id和返回结果一同返回给consumer;consumer收到返回信息以后解析出id,然后从FUTURES中找到相对应的DefaultFuture,并通过DefaultFuture.done#signal()唤醒之前等待线程。

下面根据源码详细讨论一下多线程情况下rpc请求的细节,即dubbo多线程模型的实现。

1) DefaultFuture#field

这里列出了与多线程相关的几个重要的属性

 private final Lock                            lock = new ReentrantLock();
    private final Condition                       done = lock.newCondition();
    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();


 

2)   DefaultFuture#构造函数

创建好DefaultFuture对象以后,将DefaultFuture存入了FUTURES中。其实每一次请求,多会生成一个唯一的id,即对于每个服务器而言,id唯一。

public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

 

3) DefaultFuture#get

主要逻辑是:获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,要么被唤醒;如果被唤醒,则返回rpc的结果。

  public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

 

4) DefaultFuture#received

收到返回结果时,调用此方法。首先从FUTURES中根据id获取DefaultFuture,如果不存在,打印一条日志;如果存在则通过signal释放一个唤醒信号,将线程从等待队列中唤醒。

    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at ")。
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
 
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }



5) DefaultFuture#received

以下代码是用来从FUTURES清理rpc请求超时的DefaultFuture

   private static class RemotingInvocationTimeoutScan implements Runnable {
        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }
    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

 

6) HeaderExchangeHandler#handleRequest

创建Response对象时,带上请求id

   Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
 
            String msg;
            if (data == null) msg = null;
            else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
 
            return res;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }


三、  示例

根据dubbo的实现方案,我写了一个简单的示例,以方便理解,见https://gitee.com/wuzhengfei/great-truthConcurrentRWTester代码

 

 

 

 

相关文章
|
10天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
13天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
1月前
|
Python
如何在Python中实现线程之间的同步和通信?
【2月更文挑战第17天】【2月更文挑战第51篇】如何在Python中实现线程之间的同步和通信?
|
1月前
|
算法 安全 调度
多线程如何工作,工作原理是什么
多线程如何工作,工作原理是什么
|
21天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
38 0
|
30天前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
51 2
|
30天前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
37 1
|
30天前
|
负载均衡 安全 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(一)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
57 2
|
1月前
|
消息中间件 并行计算 网络协议
探秘高效Linux C/C++项目架构:让进程、线程和通信方式助力你的代码飞跃
探秘高效Linux C/C++项目架构:让进程、线程和通信方式助力你的代码飞跃
33 0
|
1月前
|
消息中间件 前端开发 NoSQL
面试官:说说线程池的工作原理?
面试官:说说线程池的工作原理?
30 0