Dubbo 动态规则配置分析

  1. 云栖社区>
  2. Java技术进阶>
  3. 博客>
  4. 正文

Dubbo 动态规则配置分析

晴天哥 2019-12-29 18:01:15 浏览744
展开阅读全文

开篇

  • 覆盖规则是Dubbo设计的在无需重启应用的情况下,动态调整RPC调用行为的一种能力。
  • 在Dubbo2.6及更早版本中,所有的服务治理规则都只针对服务粒度(service),如果要把某条规则作用到应用粒度上,需要为应用下的所有服务(service)配合相同的规则,变更,删除的时候也需要对应的操作。
  • 2.7.0版本开始,支持从服务和应用两个粒度来调整动态配置。
  • 这篇文章基于Dubbo-2.6.x描述动态配置的生效过程,下发配置的过程后续单独分析


配置规则

向注册中心写入动态配置覆盖规则 。该功能通常由监控中心或治理中心的页面完成。

RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&timeout=1000"));
  • 参考官网的文档配置规则
    )
  • override:// 表示数据采用覆盖方式,支持 override 和 absent,可扩展,必填。
    0.0.0.0 表示对所有 IP 地址生效,如果只想覆盖某个 IP 的数据,请填入具体 IP,必填。
  • com.foo.BarService 表示只对指定服务生效,必填。
  • category=configurators 表示该数据为动态配置类型,必填。
  • dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。
  • enabled=true 覆盖规则是否生效,可不填,缺省生效。
  • application=foo 表示只对指定应用生效,可不填,表示对所有应用生效。
  • timeout=1000 表示将满足以上条件的 timeout 参数的值覆盖为 1000。如果想覆盖其它参数,直接加在 override 的 URL 参数上。


流程分析

public class ZookeeperRegistry extends FailbackRegistry {

    // 变量url的值
    // consumer://172.17.32.176/org.apache.dubbo.demo.DemoService?
    // application=dubbo-demo-api-consumer&category=providers,configurators,routers&dubbo=2.0.2
    // &interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello
    // &pid=58968&side=consumer&sticky=false&timestamp=1571824631224
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                // 暂时不关注这部分逻辑
            } else {
                List<URL> urls = new ArrayList<>();
                // 处理providers、configurators、routers等路径
                // /dubbo/org.apache.dubbo.demo.DemoService/providers
                // /dubbo/org.apache.dubbo.demo.DemoService/configurators
                // /dubbo/org.apache.dubbo.demo.DemoService/routers
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        // 创建zk节点变化回调监听器
                        listeners.putIfAbsent(listener, 
                        (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(
                                url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    // 创建path对应的节点
                    zkClient.create(path, false);
                    // 添加path下的children的监听
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // 处理path下的children
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }

                // 通知回调notify动作
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
        }
    }
}
  • 1.consumer在refer过程中订阅providers/configurators/routers等节点children变更事件,一旦子节点有变化就会触发consumer侧的监听事件,重新生成服务引用。

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            // 配置变更会触发configuratorUrls的变更
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // 设置动态配置中心对象configurators
    if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }

    // 省略非核心代码

    // providers
    refreshInvoker(invokerUrls);
}
  • 2.获取配置变更的configuratorUrls,生成configuratorUrls对应的configurators对象,该对象会用在动态改变consumer侧的URL链接。
  • 3.进入refreshInvoker()方法内部进行invoker的重建过程。


private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.methodInvokerMap = null; // Set the method invoker map to null
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }

        // 转换provider对应的URL为对应的invoker对象
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}
    1. refreshInvoker()方法内部的toInvokers()方法执行invokder的重建过程。


private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        // 省略非核心代码

        URL url = mergeUrl(providerUrl);

        String key = url.toFullString(); // The parameter urls are sorted
        if (keys.contains(key)) { // Repeated url
            continue;
        }
        keys.add(key);
        
        // 重新生成invoker对象
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}
    1. toInvokers()方法内部的mergeUrl()方法合并consumer、provider、configurator的参数信息,根据合并后的url的重建invoker对象。


// URL参数匹配顺序 override > Consumer > Provider
private URL mergeUrl(URL providerUrl) {
    // Merge the consumer side parameters
    providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); 

    // 合并动态配置导provider的URL当中,合并动态配置
    List<Configurator> localConfigurators = this.configurators; // local reference
    if (localConfigurators != null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            providerUrl = configurator.configure(providerUrl);
        }
    }

    providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!

    // The combination of directoryUrl and override is at the end of notify, which can't be handled here
    this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters

    // 省略非核心代码
    return providerUrl;
}
    1. URL参数的合并顺序按照 override > Consumer > Provider进行合并,通过mergeUrl()方法将动态配置的规则生效在URL当中。
    1. configurator.configure(providerUrl)中的configurator对象就是动态配置生成的对象,作用于provider的url之上。


public URL configure(URL url) {
    if (configuratorUrl == null || configuratorUrl.getHost() == null
            || url == null || url.getHost() == null) {
        return url;
    }
    // If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
    if (configuratorUrl.getPort() != 0) {
        if (url.getPort() == configuratorUrl.getPort()) {
            return configureIfMatch(url.getHost(), url);
        }
    } else {// override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0
        // 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
        // 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
        if (url.getParameter(Constants.SIDE_KEY, Constants.PROVIDER).equals(Constants.CONSUMER)) {
            return configureIfMatch(NetUtils.getLocalHost(), url);// NetUtils.getLocalHost is the ip address consumer registered to registry.
        } else if (url.getParameter(Constants.SIDE_KEY, Constants.CONSUMER).equals(Constants.PROVIDER)) {
            return configureIfMatch(Constants.ANYHOST_VALUE, url);// take effect on all providers, so address must be 0.0.0.0, otherwise it won't flow to this if branch
        }
    }
    return url;
}
private URL configureIfMatch(String host, URL url) {
    if (Constants.ANYHOST_VALUE.equals(configuratorUrl.getHost()) || host.equals(configuratorUrl.getHost())) {
        String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY,
                configuratorUrl.getUsername());
        String currentApplication = url.getParameter(Constants.APPLICATION_KEY, url.getUsername());
        if (configApplication == null || Constants.ANY_VALUE.equals(configApplication)
                || configApplication.equals(currentApplication)) {
            Set<String> conditionKeys = new HashSet<String>();
            conditionKeys.add(Constants.CATEGORY_KEY);
            conditionKeys.add(Constants.CHECK_KEY);
            conditionKeys.add(Constants.DYNAMIC_KEY);
            conditionKeys.add(Constants.ENABLED_KEY);
            for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) {
                String key = entry.getKey();
                String value = entry.getValue();
                if (key.startsWith("~") || Constants.APPLICATION_KEY.equals(key) || Constants.SIDE_KEY.equals(key)) {
                    conditionKeys.add(key);
                    if (value != null && !Constants.ANY_VALUE.equals(value)
                            && !value.equals(url.getParameter(key.startsWith("~") ? key.substring(1) : key))) {
                        return url;
                    }
                }
            }
            return doConfigure(url, configuratorUrl.removeParameters(conditionKeys));
        }
    }
    return url;
}
public class OverrideConfigurator extends AbstractConfigurator {

    public OverrideConfigurator(URL url) {
        super(url);
    }

    @Override
    public URL doConfigure(URL currentUrl, URL configUrl) {
        return currentUrl.addParameters(configUrl.getParameters());
    }
}


public URL addParameters(Map<String, String> parameters) {
    if (parameters == null || parameters.size() == 0) {
        return this;
    }

    boolean hasAndEqual = true;
    for (Map.Entry<String, String> entry : parameters.entrySet()) {
        String value = getParameters().get(entry.getKey());
        if (value == null) {
            if (entry.getValue() != null) {
                hasAndEqual = false;
                break;
            }
        } else {
            if (!value.equals(entry.getValue())) {
                hasAndEqual = false;
                break;
            }
        }
    }
    // return immediately if there's no change
    if (hasAndEqual) return this;

    Map<String, String> map = new HashMap<String, String>(getParameters());
    map.putAll(parameters);
    return new URL(protocol, username, password, host, port, path, map);
}
    1. OverrideConfigurator的整个configure流程,执行一些前置的配置检查,覆盖override的参数后返回新的URL对象。


参考文章

源码分析Dubbo override实现原理
Dubbo配置规则

网友评论

登录后评论
0/500
评论
晴天哥
+ 关注
所属团队号: Java技术进阶