SpringcloudRibbon客户端负载均衡详解(一)

破局之路课程 2024-03-25 00:25:45
前言

Spring Cloud Ribbon 是一个基于HTTP和TCP的客户端负载均衡工具,它基于Netflix Ribbon实现。通过Spring Cloud的封装,将面向服务的REST模板请求自动转换成客户端负载均衡的服务调用。Spring Cloud Ribbon 虽然只是一个工具类框架,它不像注册中心、配置中心、API网关那样需要独立部署,但是几乎存在于每个Spring Cloud构建的微服务和基础设施中。微服务之间的调用、API网关请求转发实际上都是通过Ribbon来实现的,Feign也是。因此理解Ribbon对于使用Spring Cloud至关重要。

从上图中可以看到LoadBalanced注解在spring cloud common包中,因此也很好理解Ribbon贯穿整个Spring cloud体系中。

客户端负载均衡

负载均衡对系统的高可用、网络压力的缓解和处理能力扩容的重要手段。

负载均分为硬件负载均衡(F5)、软件负载均衡(Nginx、Apache)。

负载均衡算法有:线性轮询、权重轮询、流量负载等。

通过Spring Cloud Ribbon的封装我们载微服务架构中使用客户端负载均衡调用只要两步:

1、服务提供者只需要启动多个服务实例并注册到一个注册中心或是多个相关联的服务注册中心

2、服务消费者直接通过调用被@LoadBalance 注解修饰过的RestTemplate来实现服务的接口调用。

负载均衡

RestTemplate详解

调用代码示例

1、配置@LoadBalanced @Bean@LoadBalancedRestTemplate restTemplate(){ return new RestTemplate(); }

package com.anzy.cloud.cloud01;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;import org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration;import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.cloud.client.loadbalancer.LoadBalanced;import org.springframework.cloud.openfeign.EnableFeignClients;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.transaction.annotation.EnableTransactionManagement;import org.springframework.web.client.RestTemplate;/** * ServerEurekaApplication * * @author anzy * @date 2018-05-01 */@SpringBootApplication(exclude = { ElasticsearchAutoConfiguration.class, RabbitAutoConfiguration.class, RedisAutoConfiguration.class, MongoAutoConfiguration.class, MongoDataAutoConfiguration.class})@EnableTransactionManagement // 事务//@EnableEurekaClient // @EnableEurekaClient只适用于Eureka作为注册中心,@EnableDiscoveryClient 可以是其他注册中心。@EnableDiscoveryClient@EnableFeignClients(basePackages = {"com.anzy.cloud"})@ComponentScan(basePackages = {"com.anzy.cloud"})public DemoCloud01Application { public static void main(String[] args) { SpringApplication.run(DemoCloud01Application.class, args); } @Bean @LoadBalanced RestTemplate restTemplate(){ return new RestTemplate(); }}

2、注入RestTemplate 调用getForEntity()方法

package com.anzy.cloud.cloud01.consumer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.client.RestTemplate;/** * @author anzy * @version 1.0 * @date 2020/1/18 20:51 **/@RestController@RequestMapping(value = "/consumer")public ConsumerController { @Autowired RestTemplate restTemplate; @GetMapping("/hello") public String hello(){ // 调用SERVICE-ADMIN 服务下的 /admin/test/mobile/v1/test 接口 return restTemplate.getForEntity("http://SERVICE-ADMIN/admin/test/mobile/v1/test",String.class).getBody().toString(); }}

RestTemplate请求方式主要分为:post、get、put、delete。如下图:

get请求

post请求

PUT请求方法

delete方法

源码分析:

首先看一下LoadBalanced注解源码

/** * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient * 注解用于标记RestTemplate 来配置使用LoadBalancerClient * @author Spencer Gibb */@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Qualifierpublic @interface LoadBalanced {}

从LoadBalanced注解 中的注释可以看到 来配置使用LoadBalancerClient,然后搜索LoadBalancerClient,查看其源码:

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by Fernflower decompiler)//package org.springframework.cloud.client.loadbalancer;import java.io.IOException;import java.net.URI;import org.springframework.cloud.client.ServiceInstance;public interface LoadBalancerClient extends ServiceInstanceChooser { // 使用从负载均衡中挑选的实例来执行请求任务 <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; // 使用从负载均衡中挑选的实例来执行请求任务 <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; // 为系统构建一个合适的host:port 形式的URI URI reconstructURI(ServiceInstance instance, URI original);}

LoadBalancerClient父类

public interface ServiceInstanceChooser { /** * Choose a ServiceInstance from the LoadBalancer for the specified service * 从负载均衡中选择一个服务实例 * @param serviceId the service id to look up the LoadBalancer * @return a ServiceInstance that matches the serviceId */ ServiceInstance choose(String serviceId);}

整理了Ribbon类之间的关系如图:

Ribbon UML图

首先来看一下LoadBalancerAutoConfiguration.java类,从名字上可以看到是一个配置类

package org.springframework.cloud.client.loadbalancer;import org.springframework.beans.factory.ObjectProvider;import org.springframework.beans.factory.SmartInitializingSingleton;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.http.client.ClientHttpRequestInterceptor;import org.springframework.retry.backoff.BackOffPolicy;import org.springframework.retry.support.RetryTemplate;import org.springframework.web.client.RestTemplate;import java.util.ArrayList;import java.util.Collections;import java.util.List;/** * Auto configuration for Ribbon (client side load balancing). * Ribbon自动配置 * @author Spencer Gibb * @author Dave Syer * @author Will Tran * @author Gang Li */@Configuration// RestTemplate必须存在于当前工程的环境@ConditionalOnClass(RestTemplate.class)// Spring 的Bean 工程必须有LoadBalanceClient的实现Bean@ConditionalOnBean(LoadBalancerClient.class)@EnableConfigurationProperties(LoadBalancerRetryProperties.class)public LoadBalancerAutoConfiguration { @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } }); } @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, transformers); } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static LoadBalancerInterceptorConfig { // 创建一个LoadBalancerIntercept的Bean,用于实现对客户端发起请求时进行拦截以实现客户端负载均衡。 @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } //2、创建一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerIntercept拦截器。 @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } @Configuration @ConditionalOnClass(RetryTemplate.class) public static RetryAutoConfiguration { @Bean @ConditionalOnMissingBean public LoadBalancedRetryFactory loadBalancedRetryFactory() { return new LoadBalancedRetryFactory() {}; } } @Configuration @ConditionalOnClass(RetryTemplate.class) public static RetryInterceptorAutoConfiguration { @Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory) { return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final RetryLoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } }}

在LoadBalancerAutoConfiguration类头上的注解可以看出,Ribbon的实现需要满足两个条件:

1、 @ConditionalOnClass(RestTemplate.class) // RestTemplate必须存在于当前工程的环境

2、 @ConditionalOnBean(LoadBalancerClient.class) // Spring 的Bean 工程必须有LoadBalanceClient的实现Bean

在自动化类LoadBalancerAutoConfiguration中主要做了三件事:

1、创建一个LoadBalancerIntercept的Bean,用于实现对客户端发起请求时进行拦截以实现客户端负载均衡。

2、创建一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerIntercept拦截器。

3、维护一个被LoadBalance注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来个需要客户端负载均衡的RestTemplate增加LoadBalancerIntercept拦截器。

从上面可以看出LoadBalancerIntercept比较重要,接下来看一下:

package org.springframework.cloud.client.loadbalancer;import java.io.IOException;import java.net.URI;import org.springframework.http.HttpRequest;import org.springframework.http.client.ClientHttpRequestExecution;import org.springframework.http.client.ClientHttpRequestInterceptor;import org.springframework.http.client.ClientHttpResponse;import org.springframework.util.Assert;/** * 当一个被LoadBalanced注解修饰的RestTemplate对象向外发送HTTP请求拦截 * @author Spencer Gibb * @author Dave Syer * @author Ryan Baxter * @author William Tran */public LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } // 拦截核心方法 @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { // 获取URI final URI originalUri = request.getURI(); // 从URI获取HOST String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); // 根据服务名选择实例发起实际请求 return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); }}public AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) { this.loadBalancer = loadBalancer; } public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException { URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); return (ListenableFuture)this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() { public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer); return execution.executeAsync(serviceRequest, body); } }); }}

接下来看一下execute()方法具体如何执行的:

@Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { //根据serviceId 拿到LoadBalancer然后再哪去Server ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 跟踪源码发现用的 loadBalancer.chooseServer("default"); 获取server,下面对ILoadBalancer 接口做详解 Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } // 获取到server后包装成RibbonServer, RibbonServer包含server对象以及serverId RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); } @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if(serviceInstance instanceof RibbonServer) { server = ((RibbonServer)serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { // 回调到AsyncLoadBalancerInterceptor请求中的apply()方法 T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; }

上面发现getServer是通过ILoadBalancer 接口中定义的chooseServer()方法获取的。

ILoadBalancer 接口

1、addServers:向负载均衡服务器中维护的实例列表增加服务实例。

2、chooseServer:通过某种策略,从负载均衡服务器中挑选一个具体的服务实例。

3、markServerDown:用来通知和标识负载均衡服务器中某个具体实例已经停止服务,不然负载均衡服务器在下一次获取服务实例清单前会认为服务实例均是正常的。

4、getReachableServers 获取单签正常服务实例列表。

5、getAllServers:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。

注:接口中的Server对象存储了服务端点元数据,如:host、port。

整理ILoadBalancer 接口之间的关系如图:

ILoadBalancer

查看RibbonClientConfiguration配置类可以看到默认采用ZoneAwareLoadBalancer来实现负载均衡

@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); }

之前提到过RibbonServer对象,下面看一下RibbonServer对象,他实际上实现了ServiceInstance

ServiceInstance接口

public static RibbonServer implements ServiceInstance { private final String serviceId; private final Server server; private final boolean secure; private Map<String, String> metadata; public RibbonServer(String serviceId, Server server) { this(serviceId, server, false, Collections.<String, String> emptyMap()); } public RibbonServer(String serviceId, Server server, boolean secure, Map<String, String> metadata) { this.serviceId = serviceId; this.server = server; this.secure = secure; this.metadata = metadata; } // 省略get,toString方法 ... }

从Ribbon对象代码中可以看到,它除了包含Server对象之外,还存储了服务名,是否使用HTTPS标识以及一个Map类型的元数据集合。

再看一下apply接收到了ServiceInstance具体实例下面如何做

public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer); return execution.executeAsync(serviceRequest, body); }

从apply的实现上,可以看到它具体执行时候,还传入了ServiceRequestWrapper并重写了getURI(),重写后的getURI()方法来重新构建一个URI来进行访问。

public ServiceRequestWrapper extends HttpRequestWrapper { private final ServiceInstance instance; private final LoadBalancerClient loadBalancer; public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) { super(request); this.instance = instance; this.loadBalancer = loadBalancer; } @Override public URI getURI() { URI uri = this.loadBalancer.reconstructURI( this.instance, getRequest().getURI()); return uri; }}

跟踪executeAsync,可以看到里面调用到getURI()fangfa

Override public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { AsyncClientHttpRequestInterceptor interceptor = this.iterator.next(); return interceptor.intercept(request, body, this); } else { URI uri = request.getURI(); HttpMethod method = request.getMethod(); HttpHeaders headers = request.getHeaders(); Assert.state(method != null, "No standard HTTP method"); AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(uri, method); delegate.getHeaders().putAll(headers); if (body.length > 0) { StreamUtils.copy(body, delegate.getBody()); } return delegate.executeAsync(); } } }

再看一下getURI()方法下的reconstructURI()方法:

@Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); // 构建上下文 RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); }

在这里可以看到SpringClientFactory和RibbonLoadBalancerContext,下面简单介绍一下,帮助大家理解:

1、SpringClientFactory类是一个用来创建客户端负载均衡器的工厂类,该工厂类会为每一个不同名的Ribbon客户端端生成Spring上下文。

2、RibbonLoadBalancerContext类是LoadBalancerContext的子类,该类用于存储一些被负载均衡器使用的上下文内容和API操作(reconstructURIWithServer 就是其中之一)。

public URI reconstructURIWithServer(Server server, URI original) { String host = server.getHost(); int port = server.getPort(); String scheme = server.getScheme(); if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) { return original; } if (scheme == null) { scheme = original.getScheme(); } if (scheme == null) { scheme = deriveSchemeAndPortFromPartialUri(original).first(); } try { StringBuilder sb = new StringBuilder(); sb.append(scheme).append("://"); if (!Strings.isNullOrEmpty(original.getRawUserInfo())) { sb.append(original.getRawUserInfo()).append("@"); } sb.append(host); if (port >= 0) { sb.append(":").append(port); } sb.append(original.getRawPath()); if (!Strings.isNullOrEmpty(original.getRawQuery())) { sb.append("?").append(original.getRawQuery()); } if (!Strings.isNullOrEmpty(original.getRawFragment())) { sb.append("#").append(original.getRawFragment()); } URI newURI = new URI(sb.toString()); return newURI; } catch (URISyntaxException e) { throw new RuntimeException(e); } }

从reconstructURIWithServer的实现上,我们可以看到它同reconstructURI类似,只是reconstructURI第一个保存具体服务实例的参数使用了Spring Cloud定义的ServiceInstance,而reconstructURIWithServer中使用了Netflix中定义的Server,所以在RibbonLoadBalancerClient实现reconstructURI的时候,做了一次转换,使用ServiceInstance的host和port信息构建了一个对象给reconstructURIWithServer使用。从reconstructURIWithServer实现逻辑可以看到,它从Server对象中获取host和port信息然后根据以服务名为host的URI对象original中获取其他请求信息,将两者内容进行拼接整合最终要访问的服务实例的具体地址。

总结:通过LoadBalancerInterceptor拦截器对RestTemplate的请求进行拦截,利用Spring Cloud的负载均衡器LoadBalancerClient将以逻辑服务名为host的URI转换成具体的服务实例地址过程。同时通过分析LoadBalancerClient的Ribbon实现RibbonLoadBalancerClient ,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的ILoadBalancer接口实现的,自动化配置会采用ZoneAwareLoadBalancer的实现客户端负载均衡。

资料参考:《Spring Cloud 微服务实战》

0 阅读:0

破局之路课程

简介:感谢大家的关注