众所周知springcloud ribbon 是用来做负载均衡的。那么它是怎么做到使restTemplate拥有负载均衡呢?带着这个疑问咱们来源码分析。
本文会详细介绍阅读源码的方式。如果从多实现中找到具体实现。(非idea断点方式)
demo 配置文件
server:
port: 8081
spring:
application:
name: spring-cloud-order-service
# 没有依赖eureka注册中心 使用配置文件配置server
spring-cloud-user-service:
ribbon:
listOfServers: localhost:8082,localhost:8083
远程调用demo
@Bean
@LoadBalanced //负载均衡注解
public RestTemplate restTemplate(RestTemplateBuilderrestTemplateBuilder) {
return restTemplateBuilder.build();
}
@Autowired
private RestTemplate restTemplate;
@RequestMapping("/test")
public Object test(){
return restTemplate.getForObject("http://spring-cloud-ser-service/getUserInfo",String.class);
}
上述代码可以看到在注册restTemplate时加上了@LoadBalanced,那么LoadBalanced的作用是什么呢?
/**
* Annotation to mark a RestTemplate or WebClient bean to be configured to use a
* LoadBalancerClient.
* @author Spencer Gibb
*/
@Target({
ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
我们可以看到LoadBalanced注解其实是继承了spring注解的Qualifier。
Qualifier相当于一个标记,它可以将所有被Qualifier标记的bean统一管理。有兴趣的同学可以深入研究一下。这里不做具体阐述
LoadBalanced这个注解在哪里用到的呢?让我们来看loadBalanced的自动装配类LoadBalancerAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
// 这里留个伏笔
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
// 省略部分代码 ...
可以看到被LoadBalanced所标记的依赖注入是个列表。这个列表里所注入的是所有被@LoadBalanced标记的RestTemplate。
让我们思考一下,如果想让我们自己实现restTemplate拥有负载均衡,那么有什么方案呢?例如拦截器?是的。ribbon也是这么做的。让我们来分析初始化ribbonInterceptor。
还是在LoadBalancerAutoConfiguration我们发现有这样一个装配
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.suport.RetryTemplate")
static class LoadBalancerInterceptorConfig {
// 初始化LoadBalancerInterceptor
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return newLoadBalancerInterceptor(loadBalancerClient,requestFactory);
}
// 将所有restTemplate设置loadBalancerInterceptor
// 这块是RestTemplateCustomizer的匿名内部类
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptorloadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = newArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
这是LoadBalancerAutoConfiguration的内部类。
看到这里我们在反过来看刚刚留下伏笔的Bean
// ObjectProvider<List<RestTemplateCustomizer>> ObjectProvider是延迟加载 为了上述代码加载后这里再加载
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
restTemplateCustomizers.ifAvailable() 这个方法作用是如果找到这个RestTemplateCustomizer bean那么就执行其中的方法(lambda)
customizer.customize(restTemplate);这句话其实是调用的restTemplateCustomizer(final LoadBalancerInterceptorloadBalancerInterceptor)这个方法的匿名内部类。这里比较绕。
到这里自动装配基本就告一段落了。那么ribbonInterceptor配置完在哪里去用到呢?
@Autowired
private RestTemplate restTemplate;
@RequestMapping("/test")
public Object test(){
return restTemplate.getForObject("http://spring-cloud-ser-service/getUserInfo",String.class);
}
让我们从RestTemplate.getForObject()方法入手
RestTemplate.getForObject() 会调用doExecute
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
// 创建request
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
// 执行获取响应结果
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
这个方法中有两个主线方法
了解设计模式的同学应该马上就能做出选择,没错是AbstractClientHttpRequest抽象类
@Override
public final ClientHttpResponse execute() throws IOException {
assertNotExecuted();
// 这里又是多个实现类
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
// 模板方法
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput)
throws IOException;
上述代码中的executeInternal(this.headers);这块又是多个实现 如下:
那么具体用的是哪个呢?这回咱们在反过来看ClientHttpRequest request = createRequest(url, method);具体返回的是哪个实现类 如下:
org.springframework.http.client.support.HttpAccessor#createRequest
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
initialize(request);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
// 省略部分代码 ...
getRequestFactory()调用的其实是HttpAccessor子类的InterceptingHttpAccessor中的getRequestFactory()
org.springframework.http.client.support.InterceptingHttpAccessor#getRequestFactory
private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();
// 呼应LoadBalancerAutoConfiguration中的restTemplateCustomizer()方法
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
Assert.noNullElements(interceptors, "'interceptors' must not contain null elements");
// Take getInterceptors() List as-is when passed in here
if (this.interceptors != interceptors) {
this.interceptors.clear();
this.interceptors.addAll(interceptors);
AnnotationAwareOrderComparator.sort(this.interceptors);
}
}
public ClientHttpRequestFactory getRequestFactory() {
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
// 因自动装配这里肯定不会空
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
// 实际创建的request对象(ClientHttpRequest子类)
// 将自动装配的拦截器放list入此类中后面的讲解会用到
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
// 省略部分代码
以上代码会发现看到了拦截器的影子。getInterceptors();现在我们回到createRequest()方法中。为了加深印象。下面的代码块是上一步骤
org.springframework.http.client.support.HttpAccessor#createRequest
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
// createRequest调用了AbstractClientHttpRequestFactoryWrapper.createRequest(url, method);
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
initialize(request);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
// 省略部分代码 ...
注: 以上代码的createRequest方法又出现了多个实现类
同理先进AbstractClientHttpRequestFactoryWrapper.createRequest(url, method);
@Override
public final ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
// 模板方法InterceptingClientHttpRequestFactory.createRequest(uri, httpMethod, this.requestFactory);
return createRequest(uri, httpMethod, this.requestFactory);
}
上述代码其实就是调用了InterceptingClientHttpRequestFactory.createRequest(uri, httpMethod, this.requestFactory);
看到这里是不是感觉很绕?是的spring的源码确实很绕。但是看多了,就会找到技巧了。
好了让我们回到AbstractClientHttpRequest.execute();
@Override
public final ClientHttpResponse execute() throws IOException {
assertNotExecuted();
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
上述代码executeInternal又出现了多个实现
同理我们还是先进入AbstractBufferingClientHttpRequest内部
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
byte[] bytes = this.bufferedOutput.toByteArray();
if (headers.getContentLength() < 0) {
headers.setContentLength(bytes.length);
}
ClientHttpResponse result = executeInternal(headers, bytes);
this.bufferedOutput = new ByteArrayOutputStream(0);
return result;
}
// 模板方法
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput)
throws IOException;
executeInternal() 方法会有以下实现类
InterceptingClientHttpRequest这个类有没有很熟悉?没错就是它。它就是createRequest返回的。所以我们进到InterceptingClientHttpRequest.executeInternal(headers, bytes);方法中
@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
// InterceptingClientHttpRequest的内部类
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
// 重点
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
// 将createRequest方法中放入的ClientHttpRequestInterceptor遍历
// 不为空执行拦截器
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
// 默认走此方法
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
上述重点代码我们会看到遍历后会执行nextInterceptor.intercept(request, body, this);实现类如下
会调用LoadBalancerInterceptor.intercept(request, body, this);为什么呢?
因为LoadBalancerAutoConfiguration自动装配装配的就是LoadBalancerInterceptor。大家可以回顾一下。
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
以上代码重点在this.loadBalancer.execute方法我们直接看
org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute(java.lang.String, org.springframework.cloud.client.loadbalancer.LoadBalancerRequest, java.lang.Object)
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 负载均衡(重点)
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
上述代码getServer就是用负载均衡规则来获取服务地址的。
我们直接来看com.netflix.loadbalancer.BaseLoadBalancer#chooseServer
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 根据不同规则获取
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
上述代码可以看出来默认使用RoundRobinRule轮训规则进行负载
那么它是怎么知道所以注册的机器呢?
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private static Logger logger = LoggerFactory
.getLogger(BaseLoadBalancer.class);
private final static IRule DEFAULT_RULE = new RoundRobinRule();
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
private static final String DEFAULT_NAME = "default";
private static final String PREFIX = "LoadBalancer_";
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
// 监听
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
// 监听
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
// 部分代码 ...
上述代码中的两个监听证明了有多少机器是UpServerList和全部机器allServerList
至于如何监听本章节不做阐述。留在注册中心eureka中做说明。
LoadBalancerRequest.intercept()
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
final byte[] body, final AsyncClientHttpRequestExecution execution)
throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
@Override
public ListenableFuture<ClientHttpResponse> apply(
final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
通过以上讲解我们发现其实ribbon就是用的拦截器对restTemplate做了支撑。如果有拦截器。那么就会执行拦截器中的逻辑。如果没有那么就会走正常逻辑(无负载等)下面我为大家整理一下主线流程
文章浏览阅读3.8k次,点赞9次,收藏28次。直接上一个工作中碰到的问题,另外一个系统开启多线程调用我这边的接口,然后我这边会开启多线程批量查询第三方接口并且返回给调用方。使用的是两三年前别人遗留下来的方法,放到线上后发现确实是可以正常取到结果,但是一旦调用,CPU占用就直接100%(部署环境是win server服务器)。因此查看了下相关的老代码并使用JProfiler查看发现是在某个while循环的时候有问题。具体项目代码就不贴了,类似于下面这段代码。while(flag) {//your code;}这里的flag._main函数使用while(1)循环cpu占用99
文章浏览阅读347次。idea shift f6 快捷键无效_idea shift +f6快捷键不生效
文章浏览阅读135次。Ecmacript 中没有DOM 和 BOM核心模块Node为JavaScript提供了很多服务器级别,这些API绝大多数都被包装到了一个具名和核心模块中了,例如文件操作的 fs 核心模块 ,http服务构建的http 模块 path 路径操作模块 os 操作系统信息模块// 用来获取机器信息的var os = require('os')// 用来操作路径的var path = require('path')// 获取当前机器的 CPU 信息console.log(os.cpus._node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是
文章浏览阅读10w+次,点赞435次,收藏3.4k次。SPSS 22 下载安装过程7.6 方差分析与回归分析的SPSS实现7.6.1 SPSS软件概述1 SPSS版本与安装2 SPSS界面3 SPSS特点4 SPSS数据7.6.2 SPSS与方差分析1 单因素方差分析2 双因素方差分析7.6.3 SPSS与回归分析SPSS回归分析过程牙膏价格问题的回归分析_化工数学模型数据回归软件
文章浏览阅读7.5k次。如何利用hutool工具包实现邮件发送功能呢?1、首先引入hutool依赖<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.19</version></dependency>2、编写邮件发送工具类package com.pc.c..._hutool发送邮件
文章浏览阅读867次,点赞2次,收藏2次。docker安装elasticsearch,elasticsearch-head,kibana,ik分词器安装方式基本有两种,一种是pull的方式,一种是Dockerfile的方式,由于pull的方式pull下来后还需配置许多东西且不便于复用,个人比较喜欢使用Dockerfile的方式所有docker支持的镜像基本都在https://hub.docker.com/docker的官网上能找到合..._docker安装kibana连接elasticsearch并且elasticsearch有密码
文章浏览阅读1.3w次,点赞57次,收藏92次。整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)近年来,随着机器学习的兴起,有一门编程语言逐渐变得火热——Python。得益于其针对机器学习提供了大量开源框架和第三方模块,内置..._beeware
文章浏览阅读7.9k次。//// ViewController.swift// Day_10_Timer//// Created by dongqiangfei on 2018/10/15.// Copyright 2018年 飞飞. All rights reserved.//import UIKitclass ViewController: UIViewController { ..._swift timer 暂停
文章浏览阅读986次,点赞2次,收藏2次。1.硬性等待让当前线程暂停执行,应用场景:代码执行速度太快了,但是UI元素没有立马加载出来,造成两者不同步,这时候就可以让代码等待一下,再去执行找元素的动作线程休眠,强制等待 Thread.sleep(long mills)package com.example.demo;import org.junit.jupiter.api.Test;import org.openqa.selenium.By;import org.openqa.selenium.firefox.Firefox.._元素三大等待
文章浏览阅读3k次,点赞4次,收藏14次。Java软件工程师职位分析_java岗位分析
文章浏览阅读2k次。Java:Unreachable code的解决方法_java unreachable code
文章浏览阅读1w次。1、html中设置标签data-*的值 标题 11111 222222、点击获取当前标签的data-url的值$('dd').on('click', function() { var urlVal = $(this).data('ur_如何根据data-*属性获取对应的标签对象