SpringCloudEureka源码分析之客户端(EurekaClient)

破局之路课程 2024-03-26 04:00:06

基础架构

Eureka 服务治理架构的三个核心要素:

1、服务注册中心:Eureka提供的服务端,提供服务注册与发现的功能,也就是Eureka Server。

2、服务提供者:提供服务的应用,例如订单服务、商品服务等。

3、服务消费者:消费者应用从服务注册中心获取服务列表,从而使消费者可以知道从何处去调用其所需要的服务。

注:很多时候 一个服务即使服务提供者也是服务消费者,例如订单需要调用商品服务查询商品信息,商品服务需要查看商品下单情况则需要调用订单服务。

Eureka架构图

服务提供者

服务提供者需要做的工作有:服务注册、服务同步、服务续约

服务注册:“服务提供者”在启动的时候会通过发送REST请求的方式将自己注册到Eureka Server上,同时带上服务的一些元数据信息。Eureka Server接收到Rest请求后,将元数据信息存储在一个双层结构Map中,其中第一层的key是服务名,第二层的key是具体服务的实例名。

eureka.client.register-with-eureka=true。该值默认是true,若设置为false则不会注册。

服务同步:如上面架构图所示,这里面两个服务分别注册到两个不同的服务注册中心上,他们的信息分别被两个注册中心维护。由于服务注册中心之间是相互注册的,当服务提供者注册请求到一个服务注册中心时,它将该请求转发给集群中相连的其他注册中心,从而实现服务之间的同步。

服务续约(renew):注册完服务之后,服务提供者会维护一个心跳,防止Eureka Server的“剔除服务”,将该服务从服务列表中剔除。

eureka.instance.lease-renewal-interval-in-seconds=30 ,服务提供者会维护一个心跳,防止Eureka Server的“剔除服务”,将该服务从服务列表中剔除。

eureka.instance.lease-renewal-interval-in-seconds=30 // 续约间隔时间,默认30s

eureka.instance.lease-expiration-duration-in-seconds=90// 服务时效时间,默认90s

服务消费者

服务消费者主要做的工作有:获取服务、服务调用、服务下线。

获取服务:当启动服务消费者的时候,它会发送一个REST请求给服务注册中心,来获取上面的注册服务列表。

服务调用:服务消费者在获取服务清单后,通过服务名可以获得具体提供服务的实例名和该实例的元数据信息。有了这些服务实例的详细信息后,客户端就可以根据自己的需要决定具体调用哪一个实例,在Ribbon中默认采用轮询的方式调用,从而实现负载均衡。

访问实例的选择上Eureka有两个概念:Region,Zone。在选择上回优先选择同一个Zone下的,除非一个Zone下的服务挂掉,才会调用其他Zone下的。推荐阅读:

服务下线:当服务实例进行增长的关闭操作时,会触发一个服务下线的REST请求给Eureka Server,告诉服务注册中心 我要下线。

服务注册中心

服务注册中心要做的工作有:失效剔除、自我保护。

失效剔除:Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60s)将当前清单中超时(默认90s)没有续约的服务剔除出去。

自我保护:eureka server 会去将心跳在15分钟内是否低于85%,如果出现eureka会将他们保护起来,让这些实例不会过期,这样客户端就很容易拿到不存在的实例,这就需要容错机制,客户端重试、断路器等机制。建议在开发环境关闭自我保护机制。

红色字体表示触发了自我保护机制

源码来了

对Eureka有了总体上的理解了,接下来看一下他的源码,源码还是相对比较简单的。

1.@EnableEurekaClient解析

客户端应用通过添加@EnableEurekaClient注解,再在配置文件中添加eureka相关配置即可实现服务的注册与发现,还是颇为神奇的,主要功能应该都几种在@EnableEurekaClient这个注解中,下面我们来剖析一下这个注解。

1)@EnableEurekaClient源码如下:

@EnableEurekaClient只适用于Eureka作为注册中心,@EnableDiscoveryClient 可以是其他注册中心。

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@EnableDiscoveryClient // 主要注解就是这个public @interface EnableEurekaClient { } // @EnableDiscoveryClient@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Import(EnableDiscoveryClientImportSelector.class)// 主要就是为了导入该类public @interface EnableDiscoveryClient { boolean autoRegister() default true;}

2)EnableDiscoveryClientImportSelector.java

3)super.selectImports(metadata)即在SpringFactoryImportSelector.selectImports(metadata)

public abstract SpringFactoryImportSelector<T> implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware { ... @Override public String[] selectImports(AnnotationMetadata metadata) { // 1.默认enabled值为true if (!isEnabled()) { return new String[0]; } ... // 2.主要功能在这里 List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); ... return factories.toArray(new String[factories.size()]); } // SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader) public static List<String> loadFactoryNames(Class<?> factoryClass, ClassLoaderLoader) { // 1.factoryClassName值为org.springframework.cloud.client.discovery.EnableDiscoveryClient String factoryClassName = factoryClass.getName(); try { // 2.获取所有 META-INF/spring.factories文件 Enumeration<URL> urls = (classLoader != null ?Loader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); List<String> result = new ArrayList<String>(); // 3.遍历所有spring.factories文件 while (urls.hasMoreElements()) { URL url = urls.nextElement(); Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url)); String factoryClassNames = properties.getProperty(factoryClassName); // 4.获取properties中key为EnableDiscoveryClient对应的value值列表 result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames))); } return result; } ... }

3)super.selectImports(metadata)即在SpringFactoryImportSelector.selectImports(metadata)

public abstract SpringFactoryImportSelector<T> implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware { ... @Override public String[] selectImports(AnnotationMetadata metadata) { // 1.默认enabled值为true if (!isEnabled()) { return new String[0]; } AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(this.annotationClass.getName(), true)); Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is " + metadata.getClassName() + " annotated with @" + getSimpleName() + "?"); // Find all possible auto configurationes, filtering duplicates List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); if (factories.isEmpty() && !hasDefaultFactory()) { throw new IllegalStateException("Annotation @" + getSimpleName() + " found, but there are no implementations. Did you forget to include a starter?"); } if (factories.size() > 1) { // there should only ever be one DiscoveryClient, but there might be more than // one factory log.warn("More than one implementation " + "of @" + getSimpleName() + " (now relying on @Conditionals to pick one): " + factories); } return factories.toArray(new String[factories.size()]); } // SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader) public static List<String> loadFactoryNames(Class<?> factoryClass, ClassLoaderLoader) { // 1.factoryClassName值为org.springframework.cloud.client.discovery.EnableDiscoveryClient String factoryClassName = factoryClass.getName(); try { // 2.获取所有 META-INF/spring.factories文件 Enumeration<URL> urls = (classLoader != null ?Loader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); List<String> result = new ArrayList<String>(); // 3.遍历所有spring.factories文件 while (urls.hasMoreElements()) { URL url = urls.nextElement(); Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url)); String factoryClassNames = properties.getProperty(factoryClassName); // 4.获取properties中key为EnableDiscoveryClient对应的value值列表 result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames))); } return result; } ... }

注意:org.springframework.cloud.client.discovery.EnableDiscoveryClient对应的value值,可以在spring-cloud-netflix-eureka-client-1.3.1.RELEASE-sources.jar下META-INF/spring.factories文件中获取,具体值为

org.springframework.cloud.client.discovery.EnableDiscoveryClient=\

org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

总结:所以上述注册到Spring中的类为两个:

org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration

同时我们还注意到eureka-client下META-INF/spring.factories文件中还有其他内容

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration

EnableAutoConfiguration对应的value值列表中的类会在SpringBoot项目启动的时候注册到Spring容器中,EurekaClient的关键功能就在EurekaClientConfigServerAutoConfiguration中,下面我们一起来看下这个类

2.EurekaClientConfigServerAutoConfiguration功能解析

@Configuration@EnableConfigurationProperties@ConditionalOnClass({ EurekaInstanceConfigBean.class, EurekaClient.class, ConfigServerProperties.class })public EurekaClientConfigServerAutoConfiguration { ... @PostConstruct public void init() { if (this.instance == null || this.server == null) { return; } String prefix = this.server.getPrefix(); if (StringUtils.hasText(prefix)) { this.instance.getMetadataMap().put("configPath", prefix); } }}

通过该类@ConditionalOnClass注解可知,EurekaClientConfigServerAutoConfiguration类的产生需要一些条件,需要EurekaInstanceConfigBean.class, EurekaClient.class,ConfigServerProperties.class这三个类先行产生。

我们重点关注下EurekaClient.class,源码如下:

@ImplementedBy(DiscoveryClient.class)public interface EurekaClient extends LookupService {}

主要是一个接口,并定义了默认实现类DiscoveryClient,该接口定义了Eureka客户端的主要功能,包括获取服务URL、注册当前服务等功能。

3.DiscoveryClient

实际没有绝对的服务消费者和服务提供者,每一个服务提供者也可以是一个服务消费者。

消费者和提供者的主要功能点有:服务注册、服务续约、服务下线、服务调用

下面根据功能点来对照各自的源码,以下方法可在com.netflix.discovery.DiscoveryClient中找到

1)服务注册(发送注册请求到注册中心)

boolean register() throws Throwable { ... EurekaHttpResponse<Void> httpResponse; try { // 主要的注册功能就是这句话 // 真正实现在AbstractJerseyEurekaHttpClient.register() httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } ... return httpResponse.getStatusCode() == 204; } // AbstractJerseyEurekaHttpClient.register() public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { // 1.构造一个HTTP请求 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); // 2.封装请求类型和返回类型,将当前服务的元信息封装为InstanceInfo, // 发送post请求到serviceUrl,serviceUrl即我们在配置文件中配置的defaultZone response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); // 3.返回响应状态 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }

2)服务续约(本质就是发送当前应用的心跳请求到注册中心)

boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { // 1.本质就是发送心跳请求 // 2.真正实现为 AbstractJerseyEurekaHttpClient.sendHeartBeat() httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); // 2.如果请求失败,则调用注册服务请求 if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } } // AbstractJerseyEurekaHttpClient.sendHeartBeat() public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { // 主要就是将当前实例的元信息(InstanceInfo)以及状态(UP)通过HTTP请求发送到serviceUrl WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }

3.服务调用(本质就是获取调用服务名所对应的服务提供者实例信息,包括IP、port等)

public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure) { return getInstancesByVipAddress(vipAddress, secure, instanceRegionChecker.getLocalRegion()); } //getInstancesByVipAddress() public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, @Nullable String region) { if (vipAddress == null) { throw new IllegalArgumentException( "Supplied VIP Address cannot be null"); } Applications applications; // 1.判断服务提供方是否当前region,若是的话直接从localRegionApps中获取 if (instanceRegionChecker.isLocalRegion(region)) { applications = this.localRegionApps.get(); // 2.否则的话从远程region获取 } else { applications = remoteRegionVsApps.get(region); if (null == applications) { logger.debug("No applications are defined for region {}, so returning an empty instance list for vip " + "address {}.", region, vipAddress); return Collections.emptyList(); } } // 3.从applications中获取服务名称对应的实例名称列表 if (!secure) { return applications.getInstancesByVirtualHostName(vipAddress); } else { return applications.getInstancesBySecureVirtualHostName(vipAddress); } }

4)服务下线(本质就是发送取消注册的HTTP请求到注册中心)

void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); // 重点在这里 EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + appPathIdentifier + " - deregister status: " + httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e); } } } // AbstractJerseyEurekaHttpClient.cancel() public EurekaHttpResponse<Void> cancel(String appName, String id) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); // 本质就是发送delete请求到注册中心 response = resourceBuilder.delete(ClientResponse.class); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }

总结:通过以上分析可知,服务的注册、下线等操作实际上就是通过发送HTTP请求到注册中心来实现的。

0 阅读:0

破局之路课程

简介:感谢大家的关注