美文网首页eureka 注册中心
Eureka之服务发现LookupService

Eureka之服务发现LookupService

作者: Mr_1214 | 来源:发表于2018-08-28 11:36 被阅读286次

Eureka 注册中心,核心为提供服务注册,以及服务发现功能,下面我们重点讨论下Eureka核心功能之一的服务发现。
我们知道Eureka采用的C-S架构,客户端要通过请求服务端获取注册列表,提供给服务使用,服务端要保存客户端发来的注册信息保存下(Eureka采用的是内存存储),并提供给资源服务Resources使用,有关资源服务可见Eureka之REST API

LookupService体系图

LookupService.png
  1. InstanceRegistry为Server端对资源服务提供服务发现功能的接口,用来读取服务端本地缓存注册表信息
  2. EurekaClient 为Client端实现的服务发现接口,用来获取客户端本地缓存中的注册表信息
  3. RemoteRegionRegistry 为Server用来读取其它区域的服务注册表的接口,并将其它区域的服务注册表缓存到本地,供本地InstanceRegistry使用。

Eureka区域,分区

eureka提供了region和zone两个概念来进行分区,这两个概念均来自于亚马逊的AWS:

  1. region:可以简单理解为地理上的分区,比如亚洲地区,或者华北地区,再或者北京等等,没有具体大小的限制。根据项目具体的情况,可以自行合理划分region。RemoteRegionRegistry就是其它分区的服务注册信息。
  2. zone:可以简单理解为region内的具体机房,比如说region划分为北京,然后北京有两个机房,就可以在此region之下划分出zone1,zone2两个zone。

RemoteRegionRegistry 根据配置实例化:

  1. 配置KEY: eureka.remoteRegionUrlsWithName
  2. 格式: eureka.remoteRegionUrlsWithName=region1;http://region1host/eureka/v2,region2;http://region2host/eureka/v2
  3. 解析: regionName->remoteRegionURL 根据解析出来的MAP 创建相应的 RemoteRegionRegistry

备注: 一般中小行项目基本所有服务都部署在同一机房,不存在区域划分这些,即初始化时:RemoteRegionRegistry List为空,具体可参见AbstractInstanceRegistry的initRemoteRegionRegistry方法

LookupService类图以及实现

LookupServiceUml.png
服务发现LookupService接口
/**
 * 服务发现接口
 * Lookup service for finding active instances.
 *
 * @author Karthik Ranganathan, Greg Kim.
 * @param <T> for backward compatibility

 */
public interface LookupService<T> {

    /**
     * 根据集群ID获取集群列表
     * Returns the corresponding {@link Application} object which is basically a
     * container of all registered <code>appName</code> {@link InstanceInfo}s.
     *
     * @param appName
     * @return a {@link Application} or null if we couldn't locate any app of
     *         the requested appName
     */
    Application getApplication(String appName);

    /**
     * 获取服务注册列表
     * Returns the {@link Applications} object which is basically a container of
     * all currently registered {@link Application}s.
     *
     * @return {@link Applications}
     */
    Applications getApplications();

    /**
     * 根据实例ID获取实例信息
     */
    List<InstanceInfo> getInstancesById(String id);

    /**
     * 轮询获取集群中的下一个可用的注册服务,起到负载均衡作用,保证每个可用注册服务均衡的被使用,此方法仅提供个Eureka 客户端使用实现,服务端默认为空实现
     * Gets the next possible server to process the requests from the registry
     * information received from eureka.
     */
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);

从接口可以看到提供了四个功能方法如下:

  1. getApplication:根据集群ID获取集群列表
  2. getApplications:获取服务注册列表
  3. getInstancesById:根据实例ID获取实例信息
  4. getNextServerFromEureka:轮询获取集群中的下一个可用的注册服务,起到负载均衡作用,保证每个可用注册服务均衡的被使用,此方法仅提供个Eureka 客户端使用实现,服务端默认为空实现
服务端实现AbstractInstanceRegistry
  1. 缓存服务注册属性

   /**
     * 服务注册表
     */
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. 其它区域服务发现集合属性
    /**
     * 远程区域服务发现注册服务集合
     */
    protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
  1. getApplications功能方法实现
    /**
     * 获取服务注册列表信息
     * Get all applications in this instance registry, falling back to other regions if allowed in the Eureka config.
     *
     * @return the list of all known applications
     *
     * @see com.netflix.discovery.shared.LookupService#getApplications()
     */
    public Applications getApplications() {
        //配置参数,是否禁用远程区域的服务注册列表
        boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
        if (disableTransparentFallback) {
            //获取本地服务注册列表
            return getApplicationsFromLocalRegionOnly();
        } else {
            //获取服务注册列表在所有区域中,包含本地
            return getApplicationsFromAllRemoteRegions();  // Behavior of falling back to remote region can be disabled.
        }
    }
    
        /**
     * 获服务注册列表,在所有区域
     * Returns applications including instances from all remote regions. <br/>
     * Same as calling {@link #getApplicationsFromMultipleRegions(String[])} with a <code>null</code> argument.
     */
    public Applications getApplicationsFromAllRemoteRegions() {
        //根据区域信息获取服务注册列表,此处区域列表传入所有远程区域列表
        return getApplicationsFromMultipleRegions(allKnownRemoteRegions);
    }

    /**
     * 获取本地服务注册列表
     * Returns applications including instances from local region only. <br/>
     * Same as calling {@link #getApplicationsFromMultipleRegions(String[])} with an empty array.
     */
    @Override
    public Applications getApplicationsFromLocalRegionOnly() {
        //根据区域信息获取服务注册列表,此处区域列表传入空值
        return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
    }
    
        /**
     * 根据区域信息查询服务注册列表
     */
    public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

        //是否根据远程区域查询服务注册列表标识
        boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

        logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                includeRemoteRegion, Arrays.toString(remoteRegions));

        //记录统计监控数据
        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
        } else {
            GET_ALL_CACHE_MISS.increment();
        }

        //构建服务注册列表实例信息
        Applications apps = new Applications();
        apps.setVersion(1L);
        //加入本地注册信息到服务注册列表实例中
        for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
            Application app = null;

            if (entry.getValue() != null) {
                for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                    Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                    if (app == null) {
                        app = new Application(lease.getHolder().getAppName());
                    }
                    app.addInstance(decorateInstanceInfo(lease));
                }
            }
            if (app != null) {
                apps.addApplication(app);
            }
        }
        //是否加入远程区域的注册信息
        if (includeRemoteRegion) {
            //遍历远程区域,添加远程区域的注册信息到服务注册列表实例
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteApps = remoteRegistry.getApplications();
                    for (Application application : remoteApps.getRegisteredApplications()) {
                        if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                            logger.info("Application {}  fetched from the remote region {}",
                                    application.getName(), remoteRegion);

                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }
                            for (InstanceInfo instanceInfo : application.getInstances()) {
                                appInstanceTillNow.addInstance(instanceInfo);
                            }
                        } else {
                            logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                            + "whitelist and this app is not in the whitelist.",
                                    application.getName(), remoteRegion);
                        }
                    }
                } else {
                    logger.warn("No remote registry available for the remote region {}", remoteRegion);
                }
            }
        }
        //设置实例的hashCode
        apps.setAppsHashCode(apps.getReconcileHashCode());
        return apps;
    }

getApplications功能主要由以下方法完成:

  1. getApplications : 此方法根据配置参数信息,判断是只从本地缓存中获取注册列表(getApplicationsFromLocalRegionOnly)还是从所以区域获取注册列表(getApplicationsFromAllRemoteRegions),参数信息:disableTransparentFallback
  2. getApplicationsFromLocalRegionOnly :获取本地缓存中的注册表信息,调用getApplicationsFromMultipleRegions传入空区域集合
  3. getApplicationsFromAllRemoteRegions : 从所有区域查找注册表信息,传入全部区域集合。
  4. getApplicationsFromMultipleRegions :根据传入区域信息查询注册表信息

getApplicationsFromMultipleRegions完成如下功能:

  1. 构建空注册表实例:Applications
  2. 添加本地缓存的注册信息到Applications
  3. 判断是否查找其它区域
  4. 遍历其它区域RemoteRegionRegistry获取Applications
  5. 添加其它区域中注册的信息到Applications(剔除重复的)
  6. 设置apps的hashCode 用于增量更新
  1. getApplications功能方法实现
    /**
     * 根据服务集群ID,查询服务集群列表
     *
     * @param appName the application name of the application
     * @return the application
     *
     * @see com.netflix.discovery.shared.LookupService#getApplication(java.lang.String)
     */
    @Override
    public Application getApplication(String appName) {
        //获取配置信息
        //配置参数,是否禁用远程区域的服务注册列表
        boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
        return this.getApplication(appName, !disableTransparentFallback);
    }


    /**
     * 根据服务集群ID,查询服务集群列表
     * Get application information.
     * @return the application
     */
    @Override
    public Application getApplication(String appName, boolean includeRemoteRegion) {
        Application app = null;

        //获取本地注册列表中的服务集群信息
        Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);

        if (leaseMap != null && leaseMap.size() > 0) {
            //构建集群实例Application
            for (Entry<String, Lease<InstanceInfo>> entry : leaseMap.entrySet()) {
                if (app == null) {
                    app = new Application(appName);
                }
                app.addInstance(decorateInstanceInfo(entry.getValue()));
            }
        } else if (includeRemoteRegion) {
            //包含远程区域时,通过远程区域注册服务查询集群信息
            for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                Application application = remoteRegistry.getApplication(appName);
                if (application != null) {
                    return application;
                }
            }
        }
        //返回集群实例
        return app;
    }

getApplication功能主要由以下方法完成:
getApplication : 根据 集群名称以及参数是否禁用其它区域来获取集群列表Application

  1. 通过本地缓存注册信息获取组装集群实例Application
  2. 当本地缓存中未查到集群信息且可用远程区域发现服务查询时,遍历远程区域发现服务查询集群信息组装
    集群实例Application
  3. 返回集群实例Application
  1. getInstancesById功能方法实现

与4功能类似

  1. getNextServerFromEureka功能方法实现

空实现

客户端实现DiscoveryClient
    /*
     * (non-Javadoc)
     * @see com.netflix.discovery.shared.LookupService#getApplication(java.lang.String)
     */
    @Override
    public Application getApplication(String appName) {
        return getApplications().getRegisteredApplications(appName);
    }

    /*
     * (non-Javadoc)
     *
     * @see com.netflix.discovery.shared.LookupService#getApplications()
     */
    @Override
    public Applications getApplications() {
        return localRegionApps.get();
    }
    
        /*
     * (non-Javadoc)
     * @see com.netflix.discovery.shared.LookupService#getInstancesById(java.lang.String)
     */
    @Override
    public List<InstanceInfo> getInstancesById(String id) {
        List<InstanceInfo> instancesList = new ArrayList<InstanceInfo>();
        for (Application app : this.getApplications()
                .getRegisteredApplications()) {
            InstanceInfo instanceInfo = app.getByInstanceId(id);
            if (instanceInfo != null) {
                instancesList.add(instanceInfo);
            }
        }
        return instancesList;
    }
    

在客户端的实现方法中可以清晰的看到是通过localRegionApps.get()获取服务注册列表实例,此实例是在本地缓存存储且为原子性的。下面我们主要查看localRegionApps是怎么加载以及更新

  • 构造函数首次加载localRegionApps

        //通过fetchRegistry方法首次从远程Server端获取注册列表缓存到客户端本地,默认选择增量拉取
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

在构造函数中调用fetchRegistry 首次初始化从服务端拉取服务注册实例并缓存到客户端内存中

  • 初始化定时任务,定时拉取服务端注册信息刷新本地缓存
    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        //判断客户端远程拉取注册表禁用配置,true,标识客户端运行定时拉取远程注册表配置
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //定时任务定时执行客户端远程拉取注册表来更新客户端本地缓存
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    }
    
        /**
     * 远程拉取Server端注册表信息更新客户端本地缓存,定时任务,客户端定时执行此任务
     * The task that fetches the registry information at specified intervals.
     *
     */
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }
    
  1. 构造函数初始化定时任务,调用initScheduledTasks
  2. 定时任务,初始化刷新服务注册表实例定时任务
  3. CacheRefreshThread线程定时执行,调用refreshRegistry()方法从server端拉取注册信息并刷新本地缓存
  • fetchRegistry功能
/**
     * 客户端拉取Eureka Server服务注册列表,本地缓存 (forceFullRegistryFetch 是否全量拉取标识)
     */
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            //本地缓存中获取注册列表信息
            Applications applications = getApplications();

            //判断全量拉取还是增量拉取
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                //全量拉取注册表信息
                getAndStoreFullRegistry();
            } else {
                //增量拉取注册表信息
                getAndUpdateDelta(applications);
            }
            //设置hashCode,注册列表判断变化是根据hashCode
            applications.setAppsHashCode(applications.getReconcileHashCode());
            //日志打印统计
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        //通知缓存刷新事件
        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        //更新实例的远程状态
        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }
    
       /**
     * 全量拉取Eureka Server端的注册列表信息
     */
    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        //通过远程通讯组件请求Server端获取全量注册列表
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //获取的注册列表缓存到本地
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
    
        /**
     * 增量拉取Eureka Server端的注册列表信息
     */
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        //通过远程通讯组件请求Server端获取增量注册列表
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            //增量获取为空时,全量拉取
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            //拉取注册表更新锁
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    //更新增量信息到本地缓存中的注册表中
                    updateDelta(delta);
                    //获取更新后的HashCode
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            //判断本地的hashCode是否与远程hashCode一致,不一致时,需要同步一致
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }
  1. fetchRegistry : 根据传入参数以及配置信息判断是否全量拉取还是增量拉取,全量拉取调用getAndStoreFullRegistry方法 ;增量拉取调用getAndUpdateDelta方法
  2. getAndStoreFullRegistry:全量从Server拉取服务注册信息,更新本地缓存。此方法通过Eureka提供的远程通讯模块调用Server端暴露的Resources信息获取服务注册列表实例apps
  3. getAndUpdateDelta :增量从Server端拉取服务的增量注册信息,更新本地缓存实例。
远程区域服务发现实现RemoteRegionRegistry

与客户端实现方式基本一致,不在描述

客户端,服务端,远程区域服务端交互

LookupService1.png

相关文章

网友评论

    本文标题:Eureka之服务发现LookupService

    本文链接:https://www.haomeiwen.com/subject/xsyfwftx.html