java B2B2C Springcloud多租户电子商城系统-Eureka 服务与实例列表获取详解

yayay · · 79 次点击 · · 开始浏览    
关于服务与实例列表获取 EurekaClient端 我们从Ribbon说起:EurekaClient也存在缓存,应用服务实例列表信息在每个EurekaClient服务消费端都有缓存。一般的,Ribbon的LoadBalancer会读取这个缓存,来知道当前有哪些实例可以调用,从而进行负载均衡。这个loadbalancer同样也有缓存。 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 首先看这个LoadBalancer的缓存更新机制,相关类是PollingServerListUpdater: ``` final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //从EurekaClient缓存中获取服务实例列表,保存在本地缓存 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //定时调度 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); ``` 这个updateAction.doUpdate();就是从EurekaClient缓存中获取服务实例列表,保存在BaseLoadBalancer的本地缓存: ``` protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); public void setServersList(List lsrv) { //写入allServerList的代码,这里略 } @Override public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); } ``` 这里的getAllServers会在每个负载均衡规则中被调用,例如RoundRobinRule: ``` public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); //获取服务实例列表,调用的就是刚刚提到的getAllServers List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } ``` 这个缓存需要注意下,有时候我们只修改了EurekaClient缓存的更新时间,但是没有修改这个LoadBalancer的刷新本地缓存时间,就是ribbon.ServerListRefreshInterval,这个参数可以设置的很小,因为没有从网络读取,就是从一个本地缓存刷到另一个本地缓存 。 然后我们来看一下EurekaClient本身的缓存,直接看关键类DiscoveryClient的相关源码,我们这里只关心本地Region的,多Region配置我们先忽略: ``` //本地缓存,可以理解为是一个软链接 private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); private void initScheduledTasks() { //如果配置为需要拉取服务列表,则设置定时拉取任务,这个配置默认是需要拉取服务列表 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } //其他定时任务初始化的代码,忽略 } //定时从EurekaServer拉取服务列表的任务 class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } void refreshRegistry() { try { //多Region配置处理代码,忽略 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } //日志代码,忽略 } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } //定时从EurekaServer拉取服务列表的核心方法 private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); //判断,如果是第一次拉取,或者app列表为空,就进行全量拉取,否则就会进行增量拉取 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //缓存更新完成,发送个event给观察者,目前没啥用 onCacheRefreshed(); // 检查下远端的服务实例列表里面包括自己,并且状态是否对,这里我们不关心 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } //全量拉取代码 private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; //访问/eureka/apps接口,拉取所有服务实例信息 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } //增量拉取代码 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; //访问/eureka/delta接口,拉取所有服务实例增量信息 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { //如果delta为空,拉取增量失败,就全量拉取 logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //这里设置原子锁的原因是怕某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改 //拉取增量成功,检查hashcode是否一样,不一样的话也会全量拉取 logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } } ``` 以上就是对于EurekaClient拉取服务实例信息的源代码分析,总结EurekaClient 重要缓存如下: EurekaClient第一次全量拉取,定时增量拉取应用服务实例信息,保存在缓存中。 EurekaClient增量拉取失败,或者增量拉取之后对比hashcode发现不一致,就会执行全量拉取,这样避免了网络某时段分片带来的问题。 同时对于服务调用,如果涉及到ribbon负载均衡,那么ribbon对于这个实例列表也有自己的缓存,这个缓存定时从EurekaClient的缓存更新 EurekaServer端 在EurekaServer端,所有的读取请求都是读的ReadOnlyMap(这个可以配置) 有定时任务会定时从ReadWriteMap同步到ReadOnlyMap这个时间配置是: ``` #eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上 #默认30s eureka.server.responseCacheUpdateInvervalMs=3000 ``` 相关代码: ``` if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()}; logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache", th); } } } }; } ``` ReadWriteMap是一个LoadingCache,将Registry中的服务实例信息封装成要返回的http响应(分别是经过gzip压缩和非压缩的),同时还有两个特殊key,ALL_APPS和ALL_APPS_DELTA ALL_APPS就是所有服务实例信息 ALL_APPS_DELTA就是之前讲注册说的RecentlyChangedQueue里面的实例列表封装的http响应信息 。 [java B2B2C源码电子商务平台](https://2147775633.iteye.com/blog/2434341)
79 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet