目录
- 前言
- 一、默认配置
- 二、负载均衡流程
- 三、负载均衡实现
前言
openFeign默认是通过 RoundRobinLoadBalancer 实现负载均衡。
一、默认配置
1、 LoadBalancerClientFactory;
负载均衡客户端工厂LoadBalancerClientFactory的创建是在LoadBalancerAutoConfiguration中
@ConditionalOnMissingBean
	@Bean
	public LoadBalancerClientFactory loadBalancerClientFactory() {
   
     
		//创建 LoadBalancerClientFactory 
		LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
		//使用LoadBalancerClientSpecification来扩展负载均衡
		clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
		return clientFactory;
	}
2、 LoadBalancerClient;
负载均衡客户端LoadBalancerClient 默认实现是BlockingLoadBalancerClient,创建是在 BlockingLoadBalancerClientAutoConfiguration中
	@Bean
	@ConditionalOnBean(LoadBalancerClientFactory.class)
	@ConditionalOnMissingBean
	public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
			LoadBalancerProperties properties) {
   
     
		return new BlockingLoadBalancerClient(loadBalancerClientFactory, properties);
	}
3、 ServiceInstanceListSupplier;
服务列表提供者,默认创建是在ReactiveSupportConfiguration中
@Bean
		@ConditionalOnBean(ReactiveDiscoveryClient.class)
		@ConditionalOnMissingBean
		@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
				matchIfMissing = true)
		public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
   
     
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
		}
4、 ReactorLoadBalancer;
负载均衡接口,默认实现类创建是在LoadBalancerClientConfiguration中,使用的是 RoundRobinLoadBalancer
@Bean
	@ConditionalOnMissingBean
	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
			LoadBalancerClientFactory loadBalancerClientFactory) {
   
     
		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
		//传入ClientFactoryObjectProvider,包含远程服务名称等
		return new RoundRobinLoadBalancer(
				loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
	}
5、 FeignBlockingLoadBalancerClient;
它是feign 客户端的默认实现,创建是在DefaultFeignLoadBalancerConfiguration中,传入了LoadBalancerClientFactory、LoadBalancerClient等参数
@Bean
	@ConditionalOnMissingBean
	@Conditional(OnRetryNotEnabledCondition.class)
	public Client feignClient(LoadBalancerClient loadBalancerClient, LoadBalancerProperties properties,
			LoadBalancerClientFactory loadBalancerClientFactory) {
   
     
		return new FeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient, properties,
				loadBalancerClientFactory);
	}
二、负载均衡流程
在FeignBlockingLoadBalancerClient中,执行远程服务之前,会先从服务列表中根据负载均衡算法选择合适的服务,下面针对服务的选择过程进行分析。
在FeignBlockingLoadBalancerClient 类的 execute( ) 方法中
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
这里的是loadBalancerClient 是 BlockingLoadBalancerClient
1、 choose();
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
   
     
		//获取默认的 RoundRobinLoadBalancer
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) {
   
     
			return null;
		}
		//负载均衡
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) {
   
     
			return null;
		}
		//返回负载均衡选择的服务
		return loadBalancerResponse.getServer();
	}
2、 负载均衡;
loadBalancer.choose(request)
public Mono<Response<ServiceInstance>> choose(Request request) {
   
     
		//获取服务列表提供者 CachingServiceInstanceListSupplier
		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
				.getIfAvailable(NoopServiceInstanceListSupplier::new);
		//封装获取服务逻辑
		return supplier.get(request).next()
				.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
	}
三、负载均衡实现
1、 CachingServiceInstanceListSupplier的创建;
带缓存的服务列表提供者,它实现了 ServiceInstanceListSupplier 接口,上面已经提到过,下面看一下它的创建过程:
ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
(1)builder( )
创建builder 对象
	static ServiceInstanceListSupplierBuilder builder() {
   
     
		return new ServiceInstanceListSupplierBuilder();
	}
(2)withDiscoveryClient( )
定义了baseCreator 对象,用于创建DiscoveryClientServiceInstanceListSupplier对象CompositeDiscoveryClient
public ServiceInstanceListSupplierBuilder withDiscoveryClient() {
   
     
		if (baseCreator != null && LOG.isWarnEnabled()) {
   
     
			LOG.warn("Overriding a previously set baseCreator with a ReactiveDiscoveryClient baseCreator.");
		}
		this.baseCreator = context -> {
   
     
			//获取到容器中的ReactiveDiscoveryClient ,这里是 CompositeDiscoveryClient,它是服务发现的核心类,下面还会提到
			ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
			return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
		};
		return this;
	}
(3)withCaching( )
定义了cachingCreator ,用于创建带缓存功能的 CachingServiceInstanceListSupplier
public ServiceInstanceListSupplierBuilder withCaching() {
   
     
		if (cachingCreator != null && LOG.isWarnEnabled()) {
   
     
			LOG.warn(
					"Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator.");
		}
		this.cachingCreator = (context, delegate) -> {
   
     
			ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
					.getBeanProvider(LoadBalancerCacheManager.class);
			if (cacheManagerProvider.getIfAvailable() != null) {
   
     
				return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
			}
			if (LOG.isWarnEnabled()) {
   
     
				LOG.warn("LoadBalancerCacheManager not available, returning delegate without caching.");
			}
			return delegate;
		};
		return this;
	}
(4)build(context)
创建出CachingServiceInstanceListSupplier
public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
   
     
		Assert.notNull(baseCreator, "A baseCreator must not be null");
		
		//获取 DiscoveryClientServiceInstanceListSupplier
		ServiceInstanceListSupplier supplier = baseCreator.apply(context);
		
		//使用 creators 进行包装
		for (DelegateCreator creator : creators) {
   
     
			supplier = creator.apply(context, supplier);
		}
		
		//获取 CachingServiceInstanceListSupplier
		if (this.cachingCreator != null) {
   
     
			supplier = this.cachingCreator.apply(context, supplier);
		}
		return supplier;
	}
2、 DiscoveryClientServiceInstanceListSupplier;
(1)构造方法
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
   
     
		this.serviceId = environment.getProperty(PROPERTY_NAME);
		resolveTimeout(environment);
		//获取服务列表,delegate.getInstances(serviceId),其中delegate 是 CompositeDiscoveryClient
		this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId)))
				.subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> {
   
     
					logTimeout();
					return Flux.just(new ArrayList<>());
				})).onErrorResume(error -> {
   
     
					logException(error);
					return Flux.just(new ArrayList<>());
				});
	}
(2)CompositeDiscoveryClient
在CompositeDiscoveryClientAutoConfiguration 中 ,创建 compositeDiscoveryClient对象,传入的参数是根据容器中的discoveryClients,容器中的DiscoveryClient 有NacosDiscoveryClient、SimpleDiscoveryClient
@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore(SimpleDiscoveryClientAutoConfiguration.class)
public class CompositeDiscoveryClientAutoConfiguration {
   
     
	@Bean
	@Primary
	public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
   
     
		return new CompositeDiscoveryClient(discoveryClients);
	}
}
(3)DiscoveryClient
创建NacosDiscoveryClient,里面包含了 NacosServiceDiscovery
public class NacosDiscoveryClientConfiguration {
   
     
	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
   
     
		return new NacosDiscoveryClient(nacosServiceDiscovery);
	}
}
(4)NacosServiceDiscovery
在NacosDiscoveryAutoConfiguration 中 创建了 NacosServiceDiscovery ,传入的参数有NacosDiscoveryProperties 和 NacosServiceManager
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {
   
     
	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
   
     
		return new NacosDiscoveryProperties();
	}
	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
   
     
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}
}
流程好像封装的有些复杂,现在理一下类之间的关系,从前到后依次是包含关系:
a)服务发现关键类之间的关系:
CompositeDiscoveryClient-》NacosDiscoveryClient-》NacosServiceDiscovery-》NacosServiceManager -》NacosNamingService -》hostReactor-》serviceInfoMap
其中最里面的 serviceInfoMap 包含了远程服务信息,也就是将 CompositeDiscoveryClient 与服务列表联系了起来。
b)服务列表提供者关键类之间的关系:
CachingServiceInstanceListSupplier -》DiscoveryClientServiceInstanceListSupplier -》CompositeDiscoveryClient
c)这样 ServiceInstanceListSupplier 与 CompositeDiscoveryClient 就关联了起来,而 CompositeDiscoveryClient 与 serviceInfoMap 又关联了起来。
3、 获取服务列表;
从delegate.getInstances(serviceId) 开始分析,也就是 CompositeDiscoveryClient 类中的getInstances( ) 方法
	CompositeDiscoveryClient.java
	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
   
     
		if (this.discoveryClients != null) {
   
     
			for (DiscoveryClient discoveryClient : this.discoveryClients) {
   
     
				//调用 NacosDiscoveryClient 的 getInstances()
				List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
				if (instances != null && !instances.isEmpty()) {
   
     
					return instances;
				}
			}
		}
		return Collections.emptyList();
	}
@Override
	public List<ServiceInstance> getInstances(String serviceId) {
   
     
		try {
   
     
			//调用 NacosServiceDiscovery 的 getInstances()
			return serviceDiscovery.getInstances(serviceId);
		}
		catch (Exception e) {
   
     
			throw new RuntimeException(
					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
		}
	}
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
   
     
		String group = discoveryProperties.getGroup();
		//获取服务列表
		List<Instance> instances = namingService().selectInstances(serviceId, group,
				true);
		return hostToServiceInstanceList(instances, serviceId);
	}
进入服务发现的流程,最终获取到对应 serviceId 的服务列表信息
4、 负载均衡实现;
获取到服务列表以后,回到 RoundRobinLoadBalancer 类中 processInstanceResponse 方法
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
			List<ServiceInstance> serviceInstances) {
   
     
		//从服务列表中选择一个服务
		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
   
     
			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
		}
		return serviceInstanceResponse;
	}
具体的负载均衡算法,这里使用的是轮询
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
   
     
		if (instances.isEmpty()) {
   
     
			if (log.isWarnEnabled()) {
   
     
				log.warn("No servers available for service: " + serviceId);
			}
			return new EmptyResponse();
		}
		// TODO: enforce order?
		//位置加1
		int pos = Math.abs(this.position.incrementAndGet());
		
		//获取对应位置的服务
		ServiceInstance instance = instances.get(pos % instances.size());
		return new DefaultResponse(instance);
	}