博客 / 詳情

返回

線程模型分析:Consumer最後一個發送請求Handler——TransportClientHandler分析

發送請求

Consumer 的Handler處理鏈最後一個是TransportClientHandler,這個Handler主要是:

1.發起連接
2.請求之前執行HttpClientFilter的beforeSendRequest
3.塞入ServiceComb的微服務調用上下文,設置響應回調處理,發送請求

ServiceComb發送請求使用vertx,而vertx網絡發送接收構建在netty之上,因而能夠異步高併發。

java很大一個特點是向建設大樓的腳手架,一個套在一個上面,舉例,serviceComb網絡發送與接收是:最基礎是jdk,上面是netty,然後是vertx,然後是serviceComb。

1.發起連接

image.png

transport實際上是VertxRestTransport,最終到RestClientInvocation的invoke,這個invoke重點分析,

public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception {
...
    Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);
...
}


  Future<HttpClientRequest> createRequest(IpPort ipPort, String path) {
...
    return httpClientWithContext.getHttpClient().request(requestOptions);
...
  }


    public Future<HttpClientRequest> request(RequestOptions options) {
        ContextInternal ctx = this.vertx.getOrCreateContext();
        PromiseInternal<HttpClientRequest> promise = ctx.promise();
        this.doRequest(options, promise);
        return promise.future();
    }


private void doRequest(HttpMethod method, SocketAddress peerAddress, SocketAddress server, String host, int port, Boolean useSSL, String requestURI, MultiMap headers, long timeout, Boolean followRedirects, ProxyOptions proxyOptions, PromiseInternal<HttpClientRequest> requestPromise) {
...
        this.httpCM.getConnection(eventLoopContext, key, timeout, (ar1) -> {
...

                        requestPromise.tryComplete(req);
...
}
...
}

httpCM 建立連接成功後,把requestPromise設置Complete,到此連接建立完成。

2.請求之前執行HttpClientFilter的beforeSendRequest和發送

如下是發送請求邏輯


// 創建連接
    Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);

   // 連接創建成功後執行compose裏的邏輯
    requestFuture.compose(clientRequest -> {
      this.clientRequest = clientRequest;

    
      RestClientRequestImpl restClientRequest =
          new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
      invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);

      Buffer requestBodyBuffer;
      try {
        requestBodyBuffer = restClientRequest.getBodyBuffer();
      } catch (Exception e) {
        return Future.failedFuture(e);
      }
      HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer);

// 發送前執行HttpClientFilter的beforeSendRequest邏輯
      for (HttpClientFilter filter : httpClientFilters) {
        if (filter.enabled()) {
          filter.beforeSendRequest(invocation, requestEx);
        }
      }

// 再次嵌套,把請求放到httpClientWithContext去執行
      // 從業務線程轉移到網絡線程中去發送
      httpClientWithContext.runOnContext(httpClient -> {
        clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout());

// 設置異步響應後的業務處理回調
        clientRequest.response().onComplete(asyncResult -> {
          if (asyncResult.failed()) {
            fail(asyncResult.cause());
            return;
          }
// 異步響應後,回調處理響應response
          handleResponse(asyncResult.result());
        });

// 發送前,設置微服務調用上下文,傳遞到下游被調用方
        processServiceCombHeaders(invocation, operationMeta);
// end是發送觸發器,並設置異步回調
        restClientRequest.end()
            .onComplete((t) -> invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime()));
      });
      return Future.succeededFuture();
    }).onFailure(failure -> {
//失敗處理
    });

走到這裏是main線程
image.png

建立連接的線程transport-eventllop
image.png

連接建立後,進入請求發送,還是transport-eventllop線程
image.png

異步響應傳遞仍是transport-eventloop線程,往下看馬上會傳回main線程
image.png

又轉回到main線程
image.png

3.線程模型分析

通過上面分析可知,整個過程,main線程->transport線程->main線程; transport線程只是負責建立連接和發送,以及接收響應後觸發響應處理,把處理傳遞到main線程。很神奇,這個過程是如何實現的。看代碼

從CseClientHttpRequest中的doInvoke開始,

  protected Response doInvoke(Invocation invocation) {
    return InvokerUtils.innerSyncInvoke(invocation);
  }

進入核心類

public static Response innerSyncInvoke(Invocation invocation) {
    try {

// 創建一個同步器,這個正是main線程,後面會將自己阻塞等待網絡線程的response,進行解碼
      SyncResponseExecutor respExecutor = new SyncResponseExecutor();
      invocation.setResponseExecutor(respExecutor);

// 開始觸發Handler鏈,最後一個Handler是TransportClientHandler
      invocation.next(respExecutor::setResponse);

      Response response = respExecutor.waitResponse(invocation);

      return response;
    } catch (Throwable e) {

    }
  }

接着看RestClientInvocation的網絡線程傳遞到main線程代碼

protected void processResponseBody(Buffer responseBuf) {
    invocation.getInvocationStageTrace().finishReceiveResponse();

// 網絡線程傳遞到main線程
    invocation.getResponseExecutor().execute(() -> {
      try {
        invocation.getInvocationStageTrace().startClientFiltersResponse();
        HttpServletResponseEx responseEx =
            new VertxClientResponseToHttpServletResponse(clientResponse, responseBuf);

// HttpClientFilter接收請求後的處理afterReceiveResponse

        for (HttpClientFilter filter : httpClientFilters) {
          if (filter.enabled()) {
            Response response = filter.afterReceiveResponse(invocation, responseEx);
            if (response != null) {
              complete(response);
              return;
            }
          }
        }
      } catch (Throwable e) {
        fail(e);
      }
    });



// 上述的excute是在SyncResponseExecutor中,把lautch countDown
  @Override
  public void execute(Runnable cmd) {
    this.cmd = cmd;

    // one network thread, many connections, then this notify will be performance bottlenecks
    // if save to a queue, and other thread(s) to invoke countDown, will get good performance
    // but if have multiple network thread, this "optimization" will reduce performance
    // now not change this.
    latch.countDown();
  }

// countDown後InvokerUtils的waitResponse就執行
      Response response = respExecutor.waitResponse(invocation);

// 就是執行SyncResponseExecutor的waitResponse,走到cmd 不為空,執行run
  public Response waitResponse(Invocation invocation) throws InvocationException {
    guardedWait(invocation);

    // cmd為null,是沒走execute,直接返回的場景
    if (cmd != null) {
      cmd.run();
    }

    return response;
  }


// waitResponse執行run實際上就是RestClientInvocation的complete函數
  protected void complete(Response response) {
    invocation.getInvocationStageTrace().finishClientFiltersResponse();
    asyncResp.complete(response);
  }


// 最終RestClientInvocation的complete通過asyncResp設置response,asyncResp其實就是InvokerUtils中innerSyncInvoke函數的如下代碼
      invocation.next(respExecutor::setResponse);

// setResponse也就是SyncResponseExecutor的setResponse九九歸一,繞了一圈,都是圍繞SyncResponseExecutor進行。實現main->網絡線程->main的過程。

總結:CseClientHttpRequest->Invocation(InvocationFactory.forConsumer)->InvokerUtils.innerSyncInvoke(invocation)->Handler處理鏈

4.httpClient創建

CseApplicationListener 初始化創建httpclient

image.png

consumer發送請求時,連接池已包含以創建的client
image.png

//HttpClients 部署client verticle
VertxUtils.blockDeploy(vertx, ClientVerticle.class, deployOptions);

//比較一下服務端部署VertxRestTransport
VertxUtils.blockDeploy(transportVertx, TransportConfig.getRestServerVerticle(), options);

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.