美文网首页
StreamAllocation

StreamAllocation

作者: 嗯哼嗯哼嗯哼嗯哼 | 来源:发表于2019-11-25 11:19 被阅读0次

OkHttp

StreamAllocation

此类协调了三个类之间的关系:

Connection:连接到远程服务器的物理Socket连接,Connection建立起来可能会比较慢,并且能够取消已经建立的Connection

Stream:建立在Connection上的逻辑上的HTTP的请求对的数据流,每个连接都有自己的allocation limit,allocation limit 定义了Connection同时可以并发请求的数据流的个数,Http 1.1 连接一次只能分配一个Stream,Http 2 通常可以分配多个流(多路复用)

Calls:Stream的逻辑顺序,通常是初始请求以及重定向请求。并且希望单个请求的所有流都被保存在同一个Connection上,以实现更好的行为

在Connection和StreamAllocation之间的关系是,多个Stream可以共用一个Socket连接,每个tcp连接都是通过一个socket来完成的,socket对应一个host和port,如果有多个Stream(也就是多个Request)都是连接在一个host和port上,那么它们就可以共用同一个socket,这样做的好处就是可以减少tcp的三次握手时间。在OkHttp里面,记录一次连接的是RealConnection,这个负责连接,在这个类里面用socket来连接,用HandShake来处理握手。

在讲解这个类之前,先熟悉3个概念:请求,连接,流。我们要明白Http通信执行网络“请求”需要在“连接”上建立一个新的“流”,我们将StreamAllocation称之流的桥梁,它负责为一次“请求”寻找“连接”并建立“流”,从而完成远程通信。所以说StreamAllocation与“请求”,“连接”,“流”都有关。

从注释我们看到。Connection是建立在Socket之上的物理通信信道,而Stream则是代表逻辑的流,至于Call是对一次请求过程的封装。之前也说过一个Call可能会涉及多个流(比如重定向或者auth认证等情况)。所以我们想一下,如果StreamAllocation要想解决上述问题,需要两个步骤,一是寻找连接,二是获取流。所以StreamAllocation里面应该包含一个Stream(上文已经说到了,OKHttp里面的流是HttpCodec);还应该包含连接Connection。如果想找到合适的连接,还需要一个连接池ConnectionPool属性。所以应该有一个获取流的方法在StreamAllocation里面是newStream();找到合适的流的方法findConnection();还应该有完成请求任务的之后finish()的方法来关闭流对象,还有终止和取消等方法,以及释放资源的方法。

  public final Address address;//地址
  private RouteSelector.Selection routeSelection;
  private Route route;
  private final ConnectionPool connectionPool;//连接池
  public final Call call;//请求
  public final EventListener eventListener;
  private final Object callStackTrace;

  // State guarded by connectionPool.
  private final RouteSelector routeSelector;
  private int refusedStreamCount;
  private RealConnection connection;//连接
  private boolean reportedAcquired;
  private boolean released;
  private boolean canceled;
  private HttpCodec codec;

根据请求,基于连接Connection构建出一个流

  public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //寻找到一个可用的连接,下面会在这个连接上构造流
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //构造出HttpCodec,就是跟服务端的输入输出流
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
  1. 寻找到一个可用的连接Connection
  2. 下面流的构造会在基于连接Connection上
  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
        //调用Connection,获取一个符合要求的RealConnection
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      //确认连接Connection必须是健康的,如果是健康的则重用,否者重新查找RealConnection
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }




  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      //尝试去用已经分配的连接
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        releasedConnection = null;
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        //尝试从连接池里面获取可用的连接
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    //如果已经从已经分配的或者从ConnectionPool里面找到了Connection,则返回 
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }

    //如果上面没找到,那么切换路由,接着寻找连接
    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        //构造出一个新的连接Connection
        result = new RealConnection(connectionPool, selectedRoute);
        //把当前StreamAllocation添加到连接Connection中的allocations列表中
        acquire(result, false);
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    //做TCP和TLS的连接,握手,阻塞操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      //把这个连接放入到ConnectionPool中
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

HttpCodec

HttpCodec是一个接口,用于与服务端做输入输出流的包装,内部是用Okio来做的,我们这里只看Http1Codec,表示只用于Http1协议。Http1Codec是一个Socket连接,用于发送 Http 1的消息,这个类严格遵守以下的生命周期:

  1. 写请求头
  2. 写请求体
  3. 读取响应头
  4. 读取响应体
//连接处于空闲状态,准备好写请求头了
private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
private static final int STATE_OPEN_REQUEST_BODY = 1;//准备写请求体
private static final int STATE_WRITING_REQUEST_BODY = 2;//写入请求体
private static final int STATE_READ_RESPONSE_HEADERS = 3;//读取响应头
private static final int STATE_OPEN_RESPONSE_BODY = 4;//准备读取响应体
private static final int STATE_READING_RESPONSE_BODY = 5;//读取响应体
private static final int STATE_CLOSED = 6;//关闭
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
final OkHttpClient client;
/** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
final StreamAllocation streamAllocation;//请求对应的流

final BufferedSource source;//从服务器读取的响应流
final BufferedSink sink;//向服务器写入的请求流
int state = STATE_IDLE;//处于的生命周期
private long headerLimit = HEADER_LIMIT;
  1. 看的出来一个HttpCodec有7种状态,对应着4个动作

下面只分析一个写入请求头的方法,其他的跟这个类似,只是还是会将请求体和响应体细分为固定长度和非固定长度,分别用不同流来表示。

  /**
   * Prepares the HTTP headers and sends them to the server.
   *
   * <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the
   * output stream has been written to. Otherwise the body would need to be buffered!
   *
   * <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the
   * output stream has been written to and closed. This ensures that the {@code Content-Length}
   * header field receives the proper value.
   */
   //写入请求头
  @Override public void writeRequestHeaders(Request request) throws IOException {
    String requestLine = RequestLine.get(
        request, streamAllocation.connection().route().proxy().type());
    writeRequest(request.headers(), requestLine);
  }
  
    /** Returns bytes of a request header for sending on an HTTP transport. */
  public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    //写入请求行和请求头
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    //状态改变为准备写入请求体
    state = STATE_OPEN_REQUEST_BODY;
  }
  
  
  
    /**
   * Returns the request status line, like "GET / HTTP/1.1". This is exposed to the application by
   * {@link HttpURLConnection#getHeaderFields}, so it needs to be set even if the transport is
   * HTTP/2.
   */
   //返回请求行,例如“GET / HTTP/1.1”
  public static String get(Request request, Proxy.Type proxyType) {
    StringBuilder result = new StringBuilder();
    result.append(request.method());
    result.append(' ');

    if (includeAuthorityInRequestLine(request, proxyType)) {
      result.append(request.url());
    } else {
      result.append(requestPath(request.url()));
    }

    result.append(" HTTP/1.1");
    return result.toString();
  }

看到上面的向服务器写入的数据,也可以看到OkHttp的框架属于哪一层?OkHttp是与UrlConnection同一层的网络框架,直接对Socket进行封装。其实OkHttp就是实现了Http协议,并且对接口做了比较好的封装的框架。

相关文章

网友评论

      本文标题:StreamAllocation

      本文链接:https://www.haomeiwen.com/subject/ufpdwctx.html