社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
本篇重点介绍httpclient连接池的相关原理以及介绍,顺带的介绍httpclient发送请求时的简单介绍,并会带上一些源码分析。本篇博文是基于httpclient的4.5.2版本进行介绍的
。
在介绍架构原理前,先介绍几个类和接口,方便读者对httpclient的整体设计有个大概的概念。
HttpClient:一个接口,即http客户端的抽象,主要就是用它发送请求http请求。它的主要实现有CloseableHttpClient
,相信读者们比较熟悉。
HttpRequestBase:一个抽象类,是请求内容的抽象。包括了请求协议、uri、还有一些配置。我们常用的HttpGet和HttpPost都是它的子类。
HttpClientConnectionManager:一个接口,连接管理的抽象。一般要发送http请求前,需要和目标服务建立连接,然后再发送数据包。这个连接管理器可以对连接以池的方式进行管理。
HttpRoute:一个final类,用来表示目标服务器(ip+端口)。
一个HttpRequestBase在被httpclient执行后,会经过一个链路被一个个组件处理。这里使用了职责链的设计模式,一个组件处理完后,就会交给下一个组件处理。这样做的好处就是如果要移除一个组件或者添加一个新的组件来实现对请求的一些处理非常方便。这里要说一下,上图列的组件中的一些是根据配置决定是否加入到该执行链中。
我们一般通过CloseableHttpClient httpClient = HttpClients.custom().build();
获取到一个httpClient。这里返回的实际对象其实是InternalHttpClient
类,所以执行httpclient.execute(request)
时候最终会调用InternalHttpClient#doExecute()
。我们看下对应的源码
protected CloseableHttpResponse doExecute(
final HttpHost target,
final HttpRequest request,
final HttpContext context) throws IOException, ClientProtocolException {
Args.notNull(request, "HTTP request");
HttpExecutionAware execAware = null;
if (request instanceof HttpExecutionAware) {
execAware = (HttpExecutionAware) request;
}
try {
final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);
final HttpClientContext localcontext = HttpClientContext.adapt(
context != null ? context : new BasicHttpContext());
RequestConfig config = null;
if (request instanceof Configurable) {
config = ((Configurable) request).getConfig();
}
if (config == null) {
final HttpParams params = request.getParams();
if (params instanceof HttpParamsNames) {
if (!((HttpParamsNames) params).getNames().isEmpty()) {
config = HttpClientParamConfig.getRequestConfig(params);
}
} else {
config = HttpClientParamConfig.getRequestConfig(params);
}
}
if (config != null) {
localcontext.setRequestConfig(config);
}
setupContext(localcontext);
final HttpRoute route = determineRoute(target, wrapper, localcontext);
return this.execChain.execute(route, wrapper, localcontext, execAware);
} catch (final HttpException httpException) {
throw new ClientProtocolException(httpException);
}
}
通过代码可以看到,这里主要是做两件事
之后就交给处理链处理请求。
处理链的构造是在InternalHttpClient的构造中完成的,也就是HttpClients.custom().build()
方法中构造起来的。我们看下处理链的构造代码
public CloseableHttpClient build() {
...
ClientExecChain execChain = createMainExec(
requestExecCopy,
connManagerCopy,
reuseStrategyCopy,
keepAliveStrategyCopy,
new ImmutableHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
targetAuthStrategyCopy,
proxyAuthStrategyCopy,
userTokenHandlerCopy);
execChain = decorateMainExec(execChain);
...
execChain = new ProtocolExec(execChain, httpprocessorCopy);
execChain = decorateProtocolExec(execChain);
// Add request retry executor, if not disabled
if (!automaticRetriesDisabled) {
HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;
if (retryHandlerCopy == null) {
retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;
}
execChain = new RetryExec(execChain, retryHandlerCopy);
}
HttpRoutePlanner routePlannerCopy = this.routePlanner;
if (routePlannerCopy == null) {
SchemePortResolver schemePortResolverCopy = this.schemePortResolver;
if (schemePortResolverCopy == null) {
schemePortResolverCopy = DefaultSchemePortResolver.INSTANCE;
}
if (proxy != null) {
routePlannerCopy = new DefaultProxyRoutePlanner(proxy, schemePortResolverCopy);
} else if (systemProperties) {
routePlannerCopy = new SystemDefaultRoutePlanner(
schemePortResolverCopy, ProxySelector.getDefault());
} else {
routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
}
}
// Add redirect executor, if not disabled
if (!redirectHandlingDisabled) {
RedirectStrategy redirectStrategyCopy = this.redirectStrategy;
if (redirectStrategyCopy == null) {
redirectStrategyCopy = DefaultRedirectStrategy.INSTANCE;
}
execChain = new RedirectExec(execChain, routePlannerCopy, redirectStrategyCopy);
}
// Optionally, add service unavailable retry executor
final ServiceUnavailableRetryStrategy serviceUnavailStrategyCopy = this.serviceUnavailStrategy;
if (serviceUnavailStrategyCopy != null) {
execChain = new ServiceUnavailableRetryExec(execChain, serviceUnavailStrategyCopy);
}
// Optionally, add connection back-off executor
if (this.backoffManager != null && this.connectionBackoffStrategy != null) {
execChain = new BackoffStrategyExec(execChain, this.connectionBackoffStrategy, this.backoffManager);
}
...
return new InternalHttpClient(
execChain,
connManagerCopy,
routePlannerCopy,
cookieSpecRegistryCopy,
authSchemeRegistryCopy,
defaultCookieStore,
defaultCredentialsProvider,
defaultRequestConfig != null ? defaultRequestConfig : RequestConfig.DEFAULT,
closeablesCopy);
}
由于这个方法太长,所以只保留了处理链的那部分代码。
下面介绍处理链中各个组件的一个大概功能:
上图中的HttpRequestExecutor只是MainClientExec中的组件,用于真正的发送http请求给目标服务器。
通过下面这段代码,我们可以给httpclient设置连接管理器
private static CloseableHttpClient createHttpClient() {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
// 将最大连接数添加
cm.setMaxTotal(200);
// 将每一个路由基础的连接添加
cm.setDefaultMaxPerRoute(40);
HttpHost httpHost = new HttpHost("www.baidu.com", 80);
HttpRoute httpRoute = new HttpRoute(httpHost);
cm.setMaxPerRoute(httpRoute, 80);
HttpRequestRetryHandler httpRetryHandler = new DefaultHttpRequestRetryHandler(5, false);
//默认连接配置
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(10000)
.setConnectTimeout(10000).setSocketTimeout(10000).build();
return HttpClients.custom()
.setDefaultRequestConfig(defaultRequestConfig)
.evictExpiredConnections()
.evictIdleConnections(10, TimeUnit.SECONDS)
.setConnectionManager(cm)
.setRetryHandler(httpRetryHandler).build();
}
在上面的例子中,我们初始化了一个基于连接池的连接管理器。这个连接池中最多持有200个连接,每个目标服务器最多持有40个连接。其中,我们专门设定了www.baidu.com:80
的目标服务器的可以持有的最大连接数是80个。
如果我们不给httpclient配置指定的连接管理器,在默认情况下,httpclient也会自动使用PoolingHttpClientConnectionManager作为连接管理器。但是PoolingHttpClientConnectionManager默认的maxConnPerRoute和maxConnTotal分别是是2和20。也就是对于每个服务器最多只会维护2个连接,看起来有点少。所以,在日常使用时我们尽量使用自己配置的连接管理器比较好。
连接池管理器会为每个httpRoute单独的维护一个连接池。管理着available、leased、pending这些对象。同时,CPool本身也会维护一个总的available、leased、pending对象,是那些routeToPool中available、leased、pending的总和。这里介绍一下上图的几个概念
可以看做是每个连接的抽象。它维护了一些信息:
ManagedHttpClientConnection:它是一个httpClient连接,真正建立连接后,其会bind绑定一个socket,用于传输HTTP报文
route:路由信息,就是对于目标服务器的信息
created:连接创建时间
updated:连接更新时间,释放连接回连接池时会更新
validityDeadline:用于初始化expire用,规则是if(timeToLive>0){validityDeadline=created+timeToLive}
,否则validityDeadline就取Long.MAX_VALUE。其中timeToLive是在构造连接时指定的连接存活时间,默认构造的timeToLive=-1。
expire:过期时间,人为规定的连接池可以保有连接的时间,除了初始化时等于validityDeadline,每次释放连接时也会更新,但是从newExpiry和validUnit取最小值。newExpiry的获取下面会讲。
available 表示可用的连接,他们是用LinkedList来存放的。
当连接被使用完后,会被放入链表的头部。同时,当需要取连接时,也是从链表的头部开始遍历,直到获取可用的连接为止。这样做的目的是为了尽快的获取到可用的连接,因为在链表头部的都是刚放入链表的连接,离过期时间肯定是最远的。如果从链表尾部获取的话,那么很可能会获取到失效的连接。
同时,删除链表的失效连接时从链表尾部开始遍历的。
leased存放正在被使用的连接。如果一个连接被创建或者从available 链表中取出,就会先放入leased集合中。同时,用完连接后,就从leased集合中移除掉掉。因为就add和remove操作,所以使用HashSet的数据结构,效率很高。
maxTotal的配置就是available链表和leased集合的总和限制。
当从池中获取连接时,如果available链表没有现成可用的连接,且当前路由或连接池已经达到了最大数量的限制,也不能创建连接了,此时不会阻塞整个连接池,而是将当前线程用于获取连接的Future放入pending链表的末尾,之后当前线程调用await(),释放持有的锁,并等待被唤醒。
当有连接被release()释放回连接池时,会从pending链表头获取future,并唤醒其线程继续获取连接,做到了先进先出。
每个RouteToPool都会管理一个池,也就是持有pending 、leased 、available 这些对象。
同时,CPool本身也会维护一个总的available、leased、pending对象,是那些routeToPool中链表和集合的总和。
连接分配的过程:
当分配到PoolEntry连接实体后,会调用establishRoute(),建立socket连接并与ManagedHttpClientConnection绑定。
连接是否被标记成可重用是根据连接是否可以保持。
客户端如果希望保持长连接,应该在发起请求时告诉服务器希望服务器保持长连接(http 1.0设置connection字段为keep-alive,http 1.1字段默认保持)。根据服务器的响应来确定是否保持长连接,判断原则如下:
连接保持时,会更新PoolEntry的expiry到期时间,计算逻辑为:
newExpiry = System.currentTimeMillis() + timeout;
。如果timeout值不存在或者为负数,则newExpiry = Long.MAX_VALUE
。expire=Math.min(oldExpire,newExpire)
httpclient会将我们的请求响应封装成CloseableHttpResponse对象,我们可以通过这个对象获取响应的内容。获取内容时,最终是通过底层的InputStream.read()进行读取的,httpclient中对于该InputStream的实现是org.apache.http.conn.EofSensorInputStream
,它会在read过程中不断的检查流是否读完,一旦检测到读完,就会自动根据该连接是否可重用来选择把连接返回给连接池或者关闭连接。
同时,我们也可以人为功能CloseableHttpResponse.close()方法来回收该连接或者关闭该连接。
每次从连接池中获取连接时,都会先检测该连接是否已经过期或者关闭,如果是的话,就分别从routeToPool、httpConnPool的available队列移除,然后继续获取下一个连接。
每个连接都有一个expire时间,这个过期时间是连接管理器用来管理连接的,并不是说过了这个时间tcp连接就不能用了。只是连接管理器不会再使用这个连接了。
可能连接被服务端单方面关闭。那么,httpclient是怎么判断连接被关闭的呢?
httpclient会通过socket输入流尝试读取数据,它将soTimeout设置为1ms,然后读取数据。如果返回的字节数小于0,则说明该连接关闭了。
/**BHttpConnectionBase#isStale()*/
public boolean isStale() {
if (!isOpen()) {
return true;
}
try {
final int bytesRead = fillInputBuffer(1);
return bytesRead < 0;
} catch (final SocketTimeoutException ex) {
return false;
} catch (final IOException ex) {
return true;
}
socket输入流读取数据的底层也是通过recv系统指令完成的,执行recv指令时,在阻塞的情况下,如果返回的值是-1,则表示连接被异常关闭。如果返回的是0,则表示连接被关闭了。我们这里讲soTimeout设置为1ms,所以最多只会阻塞1ms就返回,如果连接被服务端单方面关闭的话,这里就会返回-1,我们马上就知道连接被关闭了。
如果每次获取连接时都要去判断连接是否过期或者关闭,会造成一定的性能损耗。另外如果连接长时间没用,长期闲置在那也是一种资源浪费。所以httpclient提供了一个机制,也就是开启后台线程定时的清除过期和闲置过久的连接。注意,4.5.2版本默认是有这个机制的,以前的版本不太确定有没有,如果没有我们也可以自己写一个。PoolingHttpClientConnectionManager提供了两个方法,closeExpiredConnections
和closeIdleConnections
,分别用来清除过期的连接以及闲置过久的连接。
这个线程默认是不开启的,我们可以在构建httpclient的时候设置
return HttpClients.custom()
.setDefaultRequestConfig(defaultRequestConfig)
//开启后台线程清除过期的连接
.evictExpiredConnections()
//开启后台线程清除闲置30秒以上的连接
.evictIdleConnections(30, TimeUnit.SECONDS)
.setConnectionManager(cm)
.setRetryHandler(httpRetryHandler).build();
evictExpiredConnections和evictIdleConnections中只要有一个被调用了,就会开启那个后台线程。
if (!this.connManagerShared) {
if (closeablesCopy == null) {
closeablesCopy = new ArrayList<Closeable>(1);
}
final HttpClientConnectionManager cm = connManagerCopy;
//只要有一个为true,就开启清除线程
if (evictExpiredConnections || evictIdleConnections) {
final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS);
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
connectionEvictor.shutdown();
}
});
//启动线程
connectionEvictor.start();
}
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
cm.shutdown();
}
});
}
线程开启后,会执行下面的方法
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
//休眠sleepTimeMs时间
Thread.sleep(sleepTimeMs);
//清除过期的连接
connectionManager.closeExpiredConnections();
if (maxIdleTimeMs > 0) {
//清除闲置过久的连接
connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
}
}
} catch (final Exception ex) {
exception = ex;
}
}
//关闭过期的连接
@Override
public void closeExpiredConnections() {
this.log.debug("Closing expired connections");
this.pool.closeExpired();
}
public void closeExpired() {
final long now = System.currentTimeMillis();
enumAvailable(new PoolEntryCallback<T, C>() {
@Override
public void process(final PoolEntry<T, C> entry) {
if (entry.isExpired(now)) {
entry.close();
}
}
});
}
//关闭闲置太久的连接
@Override
public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
if (this.log.isDebugEnabled()) {
this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
}
this.pool.closeIdle(idleTimeout, tunit);
}
public void closeIdle(final long idletime, final TimeUnit tunit) {
Args.notNull(tunit, "Time unit");
long time = tunit.toMillis(idletime);
if (time < 0) {
time = 0;
}
final long deadline = System.currentTimeMillis() - time;
enumAvailable(new PoolEntryCallback<T, C>() {
@Override
public void process(final PoolEntry<T, C> entry) {
if (entry.getUpdated() <= deadline) {
entry.close();
}
}
});
}
另外,从上面的代码可以看出,这个线程其实只会关闭过期的连接以及闲置太久的连接,对于那些被服务端异常关闭的连接,是不会处理的。
httpclient的介绍大概就到这里。由于时间有限,并没有很深入的研究其源码实现,但是对其架构也有了一定的认识。再遇到相关的问题时也可以较快的分析出来,实在不行也可以跟踪源码查。
另外,本文如果哪里有说的不对的地方,欢迎指出,一起交流~
本文参考链接:
https://blog.csdn.net/umke888/article/details/54881946
https://blog.csdn.net/szwandcj/article/details/51291967
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!