Dubbo点滴(5)之服务注册中心

  1. 云栖社区>
  2. 博客>
  3. 正文

Dubbo点滴(5)之服务注册中心

技术小阿哥 2017-11-27 20:41:00 浏览837
展开阅读全文

首先DUBBO本质上是一款RPC框架,不可缺少的2个角色:服务提供者和服务消费者。

已经剖析了服务端暴露端口过程。本文简单说下注册中心。

1.注册中心是什么玩意

这是官网提供的图例

wKioL1hd64ujeWFHAAHY0C6Ailg777.png

通过图例,可以看出消费者和提供者并不是直接通信的,中间有个第三者,就是注册中心。这种结构,可以实现消费者和提供者两者的依赖,和参数信息的集群化。所以这带来了几个问题。

  1. 服务注册

  2. 服务发现

  3. 服务订阅

  4. 服务通知

2. 服务暴露及服务注册过程

Dubbo点滴(4)之暴露服务解析》已经剖析了dubbo协议具体打开网络端口过程。本节内容会隐去这部分内容。因为一个完整的服务暴露,主要涉及2部分内容,1)打开端口等待消费者连接;2)将服务信息登记到注册中心,以告知消费者可以连接了。

wKiom1hd7nHhegDoAADMiy-mF8w911.png

有3点需要说明:

1)首先,根据条件判断会暴露一个injvm本地服务(step 6);

InjvmProtocol协议完成,主要供同一JVM种的消费者调用,提供RPC效率。

2) 为服务暴露一个dubbo服务(step 12),一般为DubboProtocol完成

3)step 12提供的的服务,注册到注册中心(step 13-step 23)。这一步是本文的剖析重点。

3.认识注册中心

110353770.jpg

该图是DUBBO的总体结构图。重点停留在Resistry层。比较重要的是几个组件

ZookeeperRegistry :负责与zookeeper进行交互

RegistryProtocol :从注册中心获取可用服务,或者将服务注册到zookeeper,然后提供服务或者提供调用代理。

RegistryDirectory :维护着所有可用的远程Invoker或者本地的Invoker。这个类实现了NotifyListner。

NotifyListener :负责RegistryDirectory和ZookeeperRegistry的通信。

FailbackRegistry:继承自Registry,实现了失败重试机制。

4. 注册中心数据模型

wKioL1hd9GbhoQpFAAB6Ku26nf4144.jpg

流程说明:

  • 服务提供者启动时

    • 向/dubbo/com.foo.BarService/providers目录下写入自己的URL地址。

  • 服务消费者启动时

    • 订阅/dubbo/com.foo.BarService/providers目录下的提供者URL地址。

    • 并向/dubbo/com.foo.BarService/consumers目录下写入自己的URL地址。

  • 监控中心启动时

    • 订阅/dubbo/com.foo.BarService目录下的所有提供者和消费者URL地址。

4.Registry 结构树

wKioL1hd9eThQ4lUAABw_9KRcyI681.png

ZookeeperRegistry是常见的注册中心实现方案,由ZookeeperRegistryFactory负责构造。

AbstractRegistry这个类主要存储的是已经注册的服务接口,已经订阅的服务接口和已经收到通知的接口的URL,不能直接调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractRegistry implements Registry {
 
    // 本地磁盘缓存,其中特殊的key值.registies记录注册中心列表,其它均为notified服务提供者列表
    private final Properties properties = new Properties();
    // 文件缓存定时写入
    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1new NamedThreadFactory("DubboSaveRegistryCache"true));
    //是否是同步保存文件
    private final boolean syncSaveFile ;
    private final AtomicLong lastCacheChanged = new AtomicLong();
    private final Set<URL> registered = new ConcurrentHashSet<URL>();
    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    ...
    }

FailbackRegistry 继承自AbstractRegistry ,实现了失败重试机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class FailbackRegistry extends AbstractRegistry {
// 定时任务执行器
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1new NamedThreadFactory("DubboRegistryFailedRetryTimer"true));
 
// 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
private final ScheduledFuture<?> retryFuture;
 
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
 
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
 
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
 
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
 
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
...
}


5.服务提供者启动的注册过程

服务提供者启动时 向/dubbo/com.foo.BarService/providers目录下写入自己的URL地址。接下来,找寻下代码执行路径。RegistryProtocol 作为注册中心的核心组件,作为代码的入口,还要明确一点,这是个注册登记过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//入口
public class RegistryProtocol implements Protocol {
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
}
public abstract class AbstractRegistry implements Registry {
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()){
            logger.info("Register: " + url);
        }
        registered.add(url);
    }
...
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服务器端发送注册请求
        doRegister(url);
    catch (Exception e) {
    ...
    }
 
        // 将失败的注册请求记录到失败列表,定时重试
        failedRegistered.add(url);
    }
    protected abstract void doRegister(URL url);
     
    protected abstract void doUnregister(URL url);
     
    protected abstract void doSubscribe(URL url, NotifyListener listener);
     
    protected abstract void doUnsubscribe(URL url, NotifyListener listener);
...
}   
  
 public class ZookeeperRegistry extends FailbackRegistry {
     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        //如果provider的url是“0.0.0.0”或者在参数中带anyHost=true则抛出异常注册地址不存在
        if (url.isAnyHost()) {
          throw new IllegalStateException("registry address == null");
       }
        //服务分组(默认“dubbo”)
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (! group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        //服务分组根地址
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        //添加状态监听器
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
               if (state == RECONNECTED) {
               try {
          recover();
       catch (Exception e) {
          logger.error(e.getMessage(), e);
       }
               }
            }
        });
    }
  
     protected void doRegister(URL url) {
            try {
                //连接注册中心注册,在zk上创建节点
               zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
 }

6. 注册中心关系类图

wKiom1hd--Pyk5anAADYmQZPcMI910.png

重要概念:

ZookeeperRegistry :负责与zookeeper进行交互

RegistryProtocol :从注册中心获取可用服务,或者将服务注册到zookeeper,然后提供服务或者提供调用代理。

RegistryDirectory :维护着所有可用的远程Invoker或者本地的Invoker。这个类实现了NotifyListner。

NotifyListener :负责RegistryDirectory和ZookeeperRegistry的通信。

FailbackRegistry:继承自Registry,实现了失败重试机制。




本文转自 randy_shandong 51CTO博客,原文链接:http://blog.51cto.com/dba10g/1885721,如需转载请自行联系原作者

网友评论

登录后评论
0/500
评论