扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
一、背景
创新互联专注骨干网络服务器租用十年,服务更有保障!服务器租用,服务器托管雅安 成都服务器租用,成都服务器托管,骨干网络带宽,享受低延迟,高速访问。灵活、实现低成本的共享或公网数据中心高速带宽的专属高性能服务器。
HTTP协议是无状态的协议,即每一次请求都是互相独立的。因此它的最初实现是,每一个http请求都会打开一个tcp socket连接,当交互完毕后会关闭这个连接。
HTTP协议是全双工的协议,所以建立连接与断开连接是要经过三次握手与四次挥手的。显然在这种设计中,每次发送Http请求都会消耗很多的额外资源,即连接的建立与销毁。
于是,HTTP协议的也进行了发展,通过持久连接的方法来进行socket连接复用。
从图中可以看到:
持久连接的实现有两种:HTTP/1.0+的keep-alive与HTTP/1.1的持久连接。
二、HTTP/1.0+的Keep-Alive
从1996年开始,很多HTTP/1.0浏览器与服务器都对协议进行了扩展,那就是“keep-alive”扩展协议。
注意,这个扩展协议是作为1.0的补充的“实验型持久连接”出现的。keep-alive已经不再使用了,最新的HTTP/1.1规范中也没有对它进行说明,只是很多应用延续了下来。
使用HTTP/1.0的客户端在首部中加上"Connection:Keep-Alive",请求服务端将一条连接保持在打开状态。服务端如果愿意将这条连接保持在打开状态,就会在响应中包含同样的首部。如果响应中没有包含"Connection:Keep-Alive"首部,则客户端会认为服务端不支持keep-alive,会在发送完响应报文之后关闭掉当前连接。
通过keep-alive补充协议,客户端与服务器之间完成了持久连接,然而仍然存在着一些问题:
三、HTTP/1.1的持久连接
HTTP/1.1采取持久连接的方式替代了Keep-Alive。
HTTP/1.1的连接默认情况下都是持久连接。如果要显式关闭,需要在报文中加上Connection:Close首部。即在HTTP/1.1中,所有的连接都进行了复用。
然而如同Keep-Alive一样,空闲的持久连接也可以随时被客户端与服务端关闭。不发送Connection:Close不意味着服务器承诺连接永远保持打开。
四、HttpClient如何生成持久连接
HttpClien中使用了连接池来管理持有连接,同一条TCP链路上,连接是可以复用的。HttpClient通过连接池的方式进行连接持久化。
其实“池”技术是一种通用的设计,其设计思想并不复杂:
所有的连接池都是这个思路,不过我们看HttpClient源码主要关注两点:
4.1 HttpClient连接池的实现
HttpClient关于持久连接的处理在下面的代码中可以集中体现,下面从MainClientExec摘取了和连接池相关的部分,去掉了其他部分:
public class MainClientExec implements ClientExecChain { @Override public CloseableHttpResponse execute( final HttpRoute route, final HttpRequestWrapper request, final HttpClientContext context, final HttpExecutionAware execAware) throws IOException, HttpException { //从连接管理器HttpClientConnectionManager中获取一个连接请求ConnectionRequest final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn; final int timeout = config.getConnectionRequestTimeout(); //从连接请求ConnectionRequest中获取一个被管理的连接HttpClientConnection managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS); //将连接管理器HttpClientConnectionManager与被管理的连接HttpClientConnection交给一个ConnectionHolder持有 final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn); try { HttpResponse response; if (!managedConn.isOpen()) { //如果当前被管理的连接不是出于打开状态,需要重新建立连接 establishRoute(proxyAuthState, managedConn, route, request, context); } //通过连接HttpClientConnection发送请求 response = requestExecutor.execute(request, managedConn, context); //通过连接重用策略判断是否连接可重用 if (reuseStrategy.keepAlive(response, context)) { //获得连接有效期 final long duration = keepAliveStrategy.getKeepAliveDuration(response, context); //设置连接有效期 connHolder.setValidFor(duration, TimeUnit.MILLISECONDS); //将当前连接标记为可重用状态 connHolder.markReusable(); } else { connHolder.markNonReusable(); } } final HttpEntity entity = response.getEntity(); if (entity == null || !entity.isStreaming()) { //将当前连接释放到池中,供下次调用 connHolder.releaseConnection(); return new HttpResponseProxy(response, null); } else { return new HttpResponseProxy(response, connHolder); } }
这里看到了在Http请求过程中对连接的处理是和协议规范是一致的,这里要展开讲一下具体实现。
PoolingHttpClientConnectionManager是HttpClient默认的连接管理器,首先通过requestConnection()获得一个连接的请求,注意这里不是连接。
public ConnectionRequest requestConnection( final HttpRoute route, final Object state) {final Futurefuture = this.pool.lease(route, state, null); return new ConnectionRequest() { @Override public boolean cancel() { return future.cancel(true); } @Override public HttpClientConnection get( final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException { final HttpClientConnection conn = leaseConnection(future, timeout, tunit); if (conn.isOpen()) { final HttpHost host; if (route.getProxyHost() != null) { host = route.getProxyHost(); } else { host = route.getTargetHost(); } final SocketConfig socketConfig = resolveSocketConfig(host); conn.setSocketTimeout(socketConfig.getSoTimeout()); } return conn; } }; }
可以看到返回的ConnectionRequest对象实际上是一个持有了Future
从上面的代码我们应该关注的是:
Future
如何从连接池CPool中获得一个异步的连接,Future
HttpClientConnection conn = leaseConnection(future, timeout, tunit)
如何通过异步连接Future
4.2 Future
看一下CPool是如何释放一个Future
private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit tunit, final Futurefuture) throws IOException, InterruptedException, TimeoutException { //首先对当前连接池加锁,当前锁是可重入锁ReentrantLockthis.lock.lock(); try { //获得一个当前HttpRoute对应的连接池,对于HttpClient的连接池而言,总池有个大小,每个route对应的连接也是个池,所以是“池中池” final RouteSpecificPool pool = getPool(route); E entry; for (;;) { Asserts.check(!this.isShutDown, "Connection pool shut down"); //死循环获得连接 for (;;) { //从route对应的池中拿连接,可能是null,也可能是有效连接 entry = pool.getFree(state); //如果拿到null,就退出循环 if (entry == null) { break; } //如果拿到过期连接或者已关闭连接,就释放资源,继续循环获取 if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } if (entry.isClosed()) { this.available.remove(entry); pool.free(entry, false); } else { //如果拿到有效连接就退出循环 break; } } //拿到有效连接就退出 if (entry != null) { this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } //到这里证明没有拿到有效连接,需要自己生成一个 final int maxPerRoute = getMax(route); //每个route对应的连接最大数量是可配置的,如果超过了,就需要通过LRU清理掉一些连接 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } //当前route池中的连接数,没有达到上线 if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); //判断连接池是否超过上线,如果超过了,需要通过LRU清理掉一些连接 if (freeCapacity > 0) { final int totalAvailable = this.available.size(); //如果空闲连接数已经大于剩余可用空间,则需要清理下空闲连接 if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } //根据route建立一个连接 final C conn = this.connFactory.create(route); //将这个连接放入route对应的“小池”中 entry = pool.add(conn); //将这个连接放入“大池”中 this.leased.add(entry); return entry; } } //到这里证明没有从获得route池中获得有效连接,并且想要自己建立连接时当前route连接池已经到达最大值,即已经有连接在使用,但是对当前线程不可用 boolean success = false; try { if (future.isCancelled()) { throw new InterruptedException("Operation interrupted"); } //将future放入route池中等待 pool.queue(future); //将future放入大连接池中等待 this.pending.add(future); //如果等待到了信号量的通知,success为true if (deadline != null) { success = this.condition.awaitUntil(deadline); } else { this.condition.await(); success = true; } if (future.isCancelled()) { throw new InterruptedException("Operation interrupted"); } } finally { //从等待队列中移除 pool.unqueue(future); this.pending.remove(future); } //如果没有等到信号量通知并且当前时间已经超时,则退出循环 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) { break; } } //最终也没有等到信号量通知,没有拿到可用连接,则抛异常 throw new TimeoutException("Timeout waiting for connection"); } finally { //释放对大连接池的锁 this.lock.unlock(); } }
上面的代码逻辑有几个重要点:
到这里为止,程序已经拿到了一个可用的CPoolEntry实例,或者抛异常终止了程序。
4.3 HttpClientConnection
protected HttpClientConnection leaseConnection( final Futurefuture, final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException { final CPoolEntry entry; try { //从异步操作Future 中获得CPoolEntry entry = future.get(timeout, tunit); if (entry == null || future.isCancelled()) { throw new InterruptedException(); } Asserts.check(entry.getConnection() != null, "Pool entry with no connection"); if (this.log.isDebugEnabled()) { this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute())); } //获得一个CPoolEntry的代理对象,对其操作都是使用同一个底层的HttpClientConnection return CPoolProxy.newProxy(entry); } catch (final TimeoutException ex) { throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool"); } }
五、HttpClient如何复用持久连接?
在上一章中,我们看到了HttpClient通过连接池来获得连接,当需要使用连接的时候从池中获得。
对应着第三章的问题:
我们在第四章中看到了HttpClient是如何处理1、3的问题的,那么第2个问题是怎么处理的呢?
即HttpClient如何判断一个连接在使用完毕后是要关闭,还是要放入池中供他人复用?再看一下MainClientExec的代码
//发送Http连接 response = requestExecutor.execute(request, managedConn, context); //根据重用策略判断当前连接是否要复用 if (reuseStrategy.keepAlive(response, context)) { //需要复用的连接,获取连接超时时间,以response中的timeout为准 final long duration = keepAliveStrategy.getKeepAliveDuration(response, context); if (this.log.isDebugEnabled()) { final String s; //timeout的是毫秒数,如果没有设置则为-1,即没有超时时间 if (duration > 0) { s = "for " + duration + " " + TimeUnit.MILLISECONDS; } else { s = "indefinitely"; } this.log.debug("Connection can be kept alive " + s); } //设置超时时间,当请求结束时连接管理器会根据超时时间决定是关闭还是放回到池中 connHolder.setValidFor(duration, TimeUnit.MILLISECONDS); //将连接标记为可重用 connHolder.markReusable(); } else { //将连接标记为不可重用 connHolder.markNonReusable(); }
可以看到,当使用连接发生过请求之后,有连接重试策略来决定该连接是否要重用,如果要重用就会在结束后交给HttpClientConnectionManager放入池中。
那么连接复用策略的逻辑是怎么样的呢?
public class DefaultClientConnectionReuseStrategy extends DefaultConnectionReuseStrategy { public static final DefaultClientConnectionReuseStrategy INSTANCE = new DefaultClientConnectionReuseStrategy(); @Override public boolean keepAlive(final HttpResponse response, final HttpContext context) { //从上下文中拿到request final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST); if (request != null) { //获得Connection的Header final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION); if (connHeaders.length != 0) { final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null)); while (ti.hasNext()) { final String token = ti.nextToken(); //如果包含Connection:Close首部,则代表请求不打算保持连接,会忽略response的意愿,该头部这是HTTP/1.1的规范 if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) { return false; } } } } //使用父类的的复用策略 return super.keepAlive(response, context); } }
看一下父类的复用策略
if (canResponseHaveBody(request, response)) { final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN); //如果reponse的Content-Length没有正确设置,则不复用连接 //因为对于持久化连接,两次传输之间不需要重新建立连接,则需要根据Content-Length确认内容属于哪次请求,以正确处理“粘包”现象 //所以,没有正确设置Content-Length的response连接不能复用 if (clhs.length == 1) { final Header clh = clhs[0]; try { final int contentLen = Integer.parseInt(clh.getValue()); if (contentLen < 0) { return false; } } catch (final NumberFormatException ex) { return false; } } else { return false; } } if (headerIterator.hasNext()) { try { final TokenIterator ti = new BasicTokenIterator(headerIterator); boolean keepalive = false; while (ti.hasNext()) { final String token = ti.nextToken(); //如果response有Connection:Close首部,则明确表示要关闭,则不复用 if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) { return false; //如果response有Connection:Keep-Alive首部,则明确表示要持久化,则复用 } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) { keepalive = true; } } if (keepalive) { return true; } } catch (final ParseException px) { return false; } } //如果response中没有相关的Connection首部说明,则高于HTTP/1.0版本的都复用连接 return !ver.lessEquals(HttpVersion.HTTP_1_0);
总结一下:
从代码中可以看到,其实现策略与我们第二、三章协议层的约束是一致的。
六、HttpClient如何清理过期连接
在HttpClient4.4版本之前,在从连接池中获取重用连接的时候会检查下是否过期,过期则清理。
之后的版本则不同,会有一个单独的线程来扫描连接池中的连接,发现有离最近一次使用超过设置的时间后,就会清理。默认的超时时间是2秒钟。
public CloseableHttpClient build() { //如果指定了要清理过期连接与空闲连接,才会启动清理线程,默认是不启动的 if (evictExpiredConnections || evictIdleConnections) { //创造一个连接池的清理线程 final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm, maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS, maxIdleTime, maxIdleTimeUnit); closeablesCopy.add(new Closeable() { @Override public void close() throws IOException { connectionEvictor.shutdown(); try { connectionEvictor.awaitTermination(1L, TimeUnit.SECONDS); } catch (final InterruptedException interrupted) { Thread.currentThread().interrupt(); } } }); //执行该清理线程 connectionEvictor.start(); }
可以看到在HttpClientBuilder进行build的时候,如果指定了开启清理功能,会创建一个连接池清理线程并运行它。
public IdleConnectionEvictor( final HttpClientConnectionManager connectionManager, final ThreadFactory threadFactory, final long sleepTime, final TimeUnit sleepTimeUnit, final long maxIdleTime, final TimeUnit maxIdleTimeUnit) { this.connectionManager = Args.notNull(connectionManager, "Connection manager"); this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory(); this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime; this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime; this.thread = this.threadFactory.newThread(new Runnable() { @Override public void run() { try { //死循环,线程一直执行 while (!Thread.currentThread().isInterrupted()) { //休息若干秒后执行,默认10秒 Thread.sleep(sleepTimeMs); //清理过期连接 connectionManager.closeExpiredConnections(); //如果指定了最大空闲时间,则清理空闲连接 if (maxIdleTimeMs > 0) { connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS); } } } catch (final Exception ex) { exception = ex; } } }); }
总结一下:
七、本文总结
上面的研究是基于HttpClient源码的个人理解,如果有误,希望大家积极留言讨论。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对创新互联的支持。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流