SpringCloud服务治理与负载均衡原理

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 从源码的角度上介绍Eureka服务治理的原理以及LoadBalanced原理与策略

老规矩本文还是重点从原理上讨论,不涉及使用。

 Eureka原理

1  服务治理

整体而言Eureka的服务治理模型如下所示:

7574a59d9a227093bf97dfa3f19db22db06f48d3

Eureka的服务治理原理与过程是:

 - 启动注册中心。注册中心启动时会:启动一个定时任务用于检测、清理失效的任务(由EvictionTask实现)。

 - 启动服务集群。服务提供者启动时会:向注册中心注册服务(由DiscoveryClient#InstanceInfoReplicator类实现);启动一个定时任务用于定时发送心跳信息到注册中心(由DiscoveryClient#HeartbeatThread类实现)。

 - 启动消费者集群。服务消费者启动时会:全量的从注册中心获取服务(由DiscoverClient#getAndStoreFullRegistry()方法实现);启动定时任务用于定时增量的从注册中心获取服务信息(由DiscoverClient#getAndUpdateDelta()方法实现)。

 - 注册中心是集群模式时,会将收到的注册信息在多个Eureka Server之间拷贝,这是一种高可用方案,当部分Eureka Server失效后,整个集群依然可以运行。

 - 服务提供者向注册中心发送心跳的目的是告诉注册中心我还活着,别将我踢出了。

 - 服务消费者获取到服务提供者信息以后,会在本地缓存提供者信息,当需要使用提供者服务时,直接从缓存的清单中获取提供者信息,然后通过负载均衡的方式进行调用。本地缓存服务提供者清单可以在集群的注册中心不可用时,依然能够使用服务。

这里只是简单介绍一下Eureka的流程,后面会从源码的角度分析。

 

2  服务分区

eureka通过region和zone来进行分区。region:可以简单理解为地理上的分区,比如亚洲地区、华北地区等等,没有具体大小的限制。zone:可以简单理解为region内的具体机房,比如说region划分为上海,然后上海有两个机房,就可以在此region之下划分出zone1,zone2两个zone。

通过region和zone可以实现一种高可用的多活方案,如下图所示:

831297e7bab3ab53d07279b836c1d2a780f5d9b6

正常情况下,zone1中的Consumer1消费Provider1提供的服务,这样性能损耗最低。

当zone1中的Provider1服务不可用时,消费者可以转向zone2消费Provider2,如下图所示:

e82f6d279f5656b1ac26e5af518c73d08fe83bd6

 

3  对比Zookeeper

1)   Eureka保证AP

CAP中,Eureka保证AP:Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务,只要有一台Eureka Server还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。

如果在15分钟内超过85%的节点都没有正常的心跳,Eureka会认为客户端与注册中心出现了网络故障,那么Eureka将进入自我保护机制,此时的处理逻辑是:

 - Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务

 - Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)

  - 当网络稳定时,当前实例新的注册信息会被同步到其它节点中

2)   Zookeeper保证CP

CAP中Zookeeper保证CP:当zookeeper集群处于选主过程中,将不能向zk注册服务,也不能获取服务;当zookeeper集群超过半数机器不可用,因为无法选出master,也就处于瘫痪状态。就这一点而言,作为注册中心,其实并不是最好的选择。

 

 

 服务注册

1  服务提供方注册

1)   源码分析

服务注册主要有两个步骤组成:注册和续约。注册是为了将提供的服务信息注册到EurekaServer端,续约是在注册完服务之后,服务提供者会维护一个心跳用来持续告诉EurekaServer我还活着,以防止Eureka Server的剔除任务将该服务实例从服务列表中排除出去。

对于DiscoverClient而言,主要的逻辑是:创建DiscoverClient时,会注册当前服务到EurekaServer,并且启动一个定时程序来不断的续约。

a)    注册

在DiscoverClient的构造函数中,会调用initScheduledTasks()方法,这个方法就是用来注册服务的。示例代码如下:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {

//省略代码
    initScheduledTasks();
//
省略代码
}

initScheduledTasks是注册和续约的核心逻辑,他先通过InstanceInfoReplicator来注册服务,然后创建定时程序不断的续约。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh", scheduler, cacheRefreshExecutor,
                       registryFetchIntervalSeconds, TimeUnit.SECONDS,
                        expBackOffBound, new CacheRefreshThread()
                ),
               registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                "heartbeat", scheduler, heartbeatExecutor, 
                renewalIntervalInSecs, TimeUnit.SECONDS, 
                expBackOffBound, new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                       InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
               instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
           applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

InstanceInfoReplicator#start方法将会触发注册操作,注册的源码如下:

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

以下为截取注册信息(即instanceInfo信息),它包含了查找服务提供者的所有信息。

5d183640f4e9ddc135956d1f49226f54823b26b6

 

b)    续约

通过以下两个属性,可以控制续约的间隔时间、续约过期时间:

eureka.instance.lease-renewal-interval-in-seconds=30

eureka.instance.lease -expiration-duration-in-seconds=90 

 

在DiscoveryClient#initScheduledTasks中,会启动一个定时程序,用不断的续约。

private void initScheduledTasks() {
//
省略代码
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                       "heartbeat",
                        scheduler,
                       heartbeatExecutor,
                       renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
//
省略代码
}

续约的定时程序最终将触发DiscoveryClient#renew方法,此方法将会发送心跳信息到EurekaServer。

boolean renew() {
   EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
           REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

 

2  注册中心管理服务

注册中心受理注册申请的入口在ApplicationResource#addInstance方法中,他通过调用PeerAwareInstanceRegistry#register方法完成注册。PeerAwareInstanceRegistryImpl#register首先进行服务登记,然后将此服务信息复制到其他的注册中心,源码如下所示:

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

注意:续约的入口是InstanceResource#renewLease方法

 

 

1)   服务登记

服务登记的逻辑比较复杂,简单来说就是将收到的注册申请信息放到一个ConcurrentHashMap中,然后更新此Instance的时间、状态等信息,这些信息对于服务中心统计服务存活状态非常重要。源码如下所示:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
       REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                if ("true".equals(serverConfig.getExperimental("registry.registration.ignoreIfDirtyTimestampIsOlder"))) {
                   logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                } else {
                   registrant.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                }
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                   this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                   this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
           lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                   registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
               overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
           registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
       registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
       registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
       registrant.setLastUpdatedTimestamp();
       invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
)

 

 

2)   拷贝服务信息到其他注册中心

拷贝服务信息到其他注册中心,可以实现高可用,只向一个Eureka Server中注册服务,他们在整个Eureka Server集群中都能看到这些服务实例,那么当出现注册中心部分机器不可用的时候,这个集群依然可用。

拷贝逻辑的核心是:首先确认是否要拷贝,然后获取出所有的Eureka Server节点,然后将此信息逐个拷贝到这些节点中。

private void replicateToPeers(Action action, String appName, String id,
          InstanceInfo info /* optional */,
          InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
           replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

replicateInstanceActionsToPeers接口根据不同的action执行不同的业务操作,在服务注册阶段,这里是通过node.register来将服务的实例信息注册到其他注册中心节点。

private void replicateInstanceActionsToPeers(Action action, String appName,
        String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
               node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
               node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

 

3)   服务提供者关闭服务

服务提供者关闭时,依然会调用ApplicationResource#addInstance方法,区别是,注册时参数InstanceInfo中status=UP,关闭时参数InstanceInfo中status=DOWN。因为状态不懂,所以关闭时,不会更新租约信息的时间信息。

 

4)   清理失效的服务

Eureka Server启动时,会创建一个定时器,定期检查过期的服务,然后将这些服务器清理掉。AbstractInstanceRegistry#postInit方法就是用来清理失效服务的,源码如下:

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
   evictionTimer.schedule(evictionTaskRef.get(),
           serverConfig.getEvictionIntervalTimerInMs(),
           serverConfig.getEvictionIntervalTimerInMs());
}

 

AbstractInstanceRegistry#evict包含了清理失效服务的核心逻辑,

 - 首先检查是否清理租赁期满的服务。

 - 如果需要清理,那么从registry(一个ConcurrentHashMap)中逐个取出服务信息并检查是否已经过期。

 - 如果过期那么放到expiredLeases集合中。

 - 通过方法Math.min(expiredLeases.size(), evictionLimit)计算出需要清理的数量,然后随机清理。

源码如下所示:

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                   expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
           Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false);
        }
    }
}

 

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法用来计算是否启用租赁过期清理功能。源码如下所示:

public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

配置项eureka.server.enable-self-preservation用来控制注册中心的保护机制,默认为false。isSelfPreservationModeEnabled()方法由此配置项控制。Eureka会统计15分钟之内心跳失败的比例低于85%将会触发保护机制,不剔除服务提供者,如果关闭服务注册中心将不可用的实例正确剔除

 

 服务发现

1  消费端获取服务

当消费者启动时,会发送一个REST请求给服务注册中心,来获取注册中心注册的服务清单。为了性能考虑,Eureka Server会维护一份只读的服务清单来返回给消费端,同时消费端也会缓存此列表,默认每隔30秒更新一次。有两个与之相关的参数:

eureka.client.fetch-registry= true //是否获取注册的服务

eureka.client.registery-fetch-interval-second=30 //多久获取一次注册的服务,单位是秒。

1)   启动时获取服务

在DiscoverClient启动时,会通过以下代码触发获取注册服务的请求,如果获取失败,那么会通过fetchRegistryFromBackup方法获取。

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    fetchRegistryFromBackup();
}

获取服务并缓存的逻辑由fetchRegistry实现,具体来说就是通过getAndStoreFullRegistry()或者getAndUpdateDelta()方法来获取提供者信息;然后刷新缓存,最后更新服务状态标识。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            getAndStoreFullRegistry();
        } else {
           getAndUpdateDelta(applications);
        }
       applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

 

2)   定时更新

在DiscoverClient启动时,会启动一个定时程序,用于定时获取服务信息。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh", scheduler, cacheRefreshExecutor,
                       registryFetchIntervalSeconds, TimeUnit.SECONDS,
                        expBackOffBound, new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
//
省略代码
}

 

2  注册中心受理获取服务的请求

1)   全量获取

消费端首次启动时,通过这个方法获取全量信息

public InstanceResource getInstanceInfo(@PathParam("id") String id) {
    return new InstanceResource(this, id, serverConfig, registry);
}

从以下的截图可以看到,请求服务信息时,注册中心直接返回了缓存的服务列表信息。

4aa33973de823d7398612198ba3284f7a1997d71

 

2)   增量获取

增量更新的逻辑如下,对用于处理消费端getAndUpdateDelta()的请求。

@Path("delta")
@GET
public Response getContainerDifferential(
        @PathParam("version") String version,
        @HeaderParam(HEADER_ACCEPT) String acceptHeader,
       @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
       @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
        @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

    // If the delta flag is disabled in discovery or if the lease expiration
    // has been disabled, redirect clients to get all instances
    if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
        return Response.status(Status.FORBIDDEN).build();
    }

    String[] regions = null;
    if (!isRemoteRegionRequested) {
       EurekaMonitors.GET_ALL_DELTA.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
       EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
    }

    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }

    Key cacheKey = new Key(Key.EntityType.Application,
           ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        return Response.ok(responseCache.getGZIP(cacheKey))
               .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        return Response.ok(responseCache.get(cacheKey))
                .build();
    }
}

 

 服务消费

1  负载均衡

示例代码中使用了@LoadBalanced注解后,LoadBalancerAutoConfiguration会在创建RestTemplate时为他加上LoadBalancerInterceptor拦截器。源代码如下:

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
      final LoadBalancerInterceptor loadBalancerInterceptor) {
   return new RestTemplateCustomizer() {
      @Override
      public void customize(RestTemplate restTemplate) {
        List<ClientHttpRequestInterceptor> list = new ArrayList<>(
              restTemplate.getInterceptors());
        list.add(loadBalancerInterceptor);
        restTemplate.setInterceptors(list);
      }
   };
}

 

LoadBalancerInterceptor#intercept负责对请求进行拦截,这里拦截的核心罗是:通过负载均衡的方式调用服务提供者的服务,如下所示:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   return this.loadBalancer.execute(serviceName,
         new LoadBalancerRequest<ClientHttpResponse>() {

            @Override
            public ClientHttpResponse apply(final ServiceInstance instance)
                  throws Exception {
               HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                     instance);
               return execution.execute(serviceRequest, body);
            }
         });
}

 

通过RestTemplate请求时,经过拦截器,最终会通过LoadBalancerClient#execute执行业务逻辑。RibbonLoadBalancerClient#execute主要逻辑如下:

 - 首先选择一个LoadBalancer。

 - 然后通过LoadBalancer的规则选择服务提供者的服务器,默认情况LoadBalancer为ZoneAwareLoadBalancer。

 - 封装球球上下文,并发起http请求,返回结果

代码如下所示:

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
   Server server = getServer(loadBalancer);
   if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
   }
   RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
         serviceId), serverIntrospector(serviceId).getMetadata(server));

   RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
   RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

   try {
      T returnVal = request.apply(ribbonServer);
     statsRecorder.recordStats(returnVal);
      return returnVal;
   }
   // catch IOException and rethrow so RestTemplate behaves correctly
   catch (IOException ex) {
      statsRecorder.recordStats(ex);
      throw ex;
   }
   catch (Exception ex) {
      statsRecorder.recordStats(ex);
      ReflectionUtils.rethrowRuntimeException(ex);
   }
   return null;
}

 

2  路由选择规则

负载均衡策略,很大程度上决定于路由策略。

1)   RandomRule

随机选择一个服务实例,具体来说是通过Random随机的选择服务器实例。

2)   RoundRobinRule

轮询服务器实例,选择服务实例,是默认采用的负载均衡策略。具体的实现逻辑是,定义一个counter,每选择一次提供者counter+1,通过「counter/总的实例数」确定这次选择第几个实例。

 

3)   RetryRule

轮询+重试的策略,首先会尝试通过轮询方式获取服务实例,如果获取的服务实例不可用那么尝试重新获取,重试有一个时间限制,如果超过了deadline(默认500ms)还是没取到,则会返回一个null。

 

4)   WeightedResponseTimeRule

会根据每一个实例的运行情况来给计算出该实例的一个权重,然后在挑选实例的时候则根据权重进行挑选,这样能够实现更优的实例调用。

WeightedResponseTimeRule中有一个名叫DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重,权重的计算规则也很简单,如果一个服务的平均响应时间越短则权重越大,那么该服务实例被选中执行任务的概率也就越大。

 

5)   ClientConfigEnabledRoundRobinRule

和RoundRobinRule策略一致。

 

6)   BestAvailableRule

根据loadBalancerStats中保存的服务实例的状态信息来过滤掉失效的服务实例的功能,然后顺便找出并发请求最小的服务实例来使用。如果loadBalancerStats为null,则采用轮询策略。

 

7)   PredicateBasedRule

通过内部一个过滤器过滤出一部分服务实例清单,然后采用轮询策略。

 

8)   ZoneAvoidanceRule

ZoneAvoidanceRule中的过滤条件是以ZoneAvoidancePredicate为主过滤条件和以AvailabilityPredicate为次过滤条件组成的一个叫做CompositePredicate的组合过滤条件,然后采用轮询策略。

 

 

 

相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
缓存 Java API
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
|
1月前
|
负载均衡 容灾 UED
SpringCloud-Eureka原理分析
SpringCloud-Eureka原理分析
18 1
|
4月前
|
JSON 负载均衡 Java
Spring Cloud Ribbon:负载均衡的服务调用
Spring Cloud Ribbon:负载均衡的服务调用
64 0
|
17天前
|
负载均衡 网络协议 Java
构建高效可扩展的微服务架构:利用Spring Cloud实现服务发现与负载均衡
本文将探讨如何利用Spring Cloud技术实现微服务架构中的服务发现与负载均衡,通过注册中心来管理服务的注册与发现,并通过负载均衡策略实现请求的分发,从而构建高效可扩展的微服务系统。
|
1月前
|
负载均衡 算法
负载均衡的原理
负载均衡的原理
|
1月前
|
SpringCloudAlibaba 负载均衡 Java
【二】SpringCloud Alibaba之Nacos整合篇(配置负载均衡)
【二】SpringCloud Alibaba之Nacos整合篇(配置负载均衡)
237 0
|
1月前
|
负载均衡 算法 网络协议
负载均衡原理与算法详述
大型网站面临的挑战大型网站都要面对庞大的用户量,高并发,海量数据等挑战。为了提升系统整体的性能,可以采用垂直扩展和水平扩展两种方式。
37 0
负载均衡原理与算法详述
|
1月前
|
安全 Java 数据安全/隐私保护
【Spring底层原理高级进阶】【SpringCloud整合Spring Security OAuth2】深入了解 Spring Security OAuth2:底层解析+使用方法+实战
【Spring底层原理高级进阶】【SpringCloud整合Spring Security OAuth2】深入了解 Spring Security OAuth2:底层解析+使用方法+实战
|
1月前
|
存储 负载均衡 Java
【Spring底层原理高级进阶】微服务 Spring Cloud 的注册发现机制:Eureka 的架构设计、服务注册与发现的实现原理,深入掌握 Ribbon 和 Feign 的用法 ️
【Spring底层原理高级进阶】微服务 Spring Cloud 的注册发现机制:Eureka 的架构设计、服务注册与发现的实现原理,深入掌握 Ribbon 和 Feign 的用法 ️
|
1月前
|
负载均衡 算法 Java
SpringCloud负载均衡源码解析 | 带你从表层一步步剖析Ribbon组件如何实现负载均衡功能
SpringCloud负载均衡源码解析 | 带你从表层一步步剖析Ribbon组件如何实现负载均衡功能