發送請求
Consumer 的Handler處理鏈最後一個是TransportClientHandler,這個Handler主要是:
1.發起連接
2.請求之前執行HttpClientFilter的beforeSendRequest
3.塞入ServiceComb的微服務調用上下文,設置響應回調處理,發送請求
ServiceComb發送請求使用vertx,而vertx網絡發送接收構建在netty之上,因而能夠異步高併發。
java很大一個特點是向建設大樓的腳手架,一個套在一個上面,舉例,serviceComb網絡發送與接收是:最基礎是jdk,上面是netty,然後是vertx,然後是serviceComb。
1.發起連接
transport實際上是VertxRestTransport,最終到RestClientInvocation的invoke,這個invoke重點分析,
public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception {
...
Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);
...
}
Future<HttpClientRequest> createRequest(IpPort ipPort, String path) {
...
return httpClientWithContext.getHttpClient().request(requestOptions);
...
}
public Future<HttpClientRequest> request(RequestOptions options) {
ContextInternal ctx = this.vertx.getOrCreateContext();
PromiseInternal<HttpClientRequest> promise = ctx.promise();
this.doRequest(options, promise);
return promise.future();
}
private void doRequest(HttpMethod method, SocketAddress peerAddress, SocketAddress server, String host, int port, Boolean useSSL, String requestURI, MultiMap headers, long timeout, Boolean followRedirects, ProxyOptions proxyOptions, PromiseInternal<HttpClientRequest> requestPromise) {
...
this.httpCM.getConnection(eventLoopContext, key, timeout, (ar1) -> {
...
requestPromise.tryComplete(req);
...
}
...
}
httpCM 建立連接成功後,把requestPromise設置Complete,到此連接建立完成。
2.請求之前執行HttpClientFilter的beforeSendRequest和發送
如下是發送請求邏輯
// 創建連接
Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);
// 連接創建成功後執行compose裏的邏輯
requestFuture.compose(clientRequest -> {
this.clientRequest = clientRequest;
RestClientRequestImpl restClientRequest =
new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);
Buffer requestBodyBuffer;
try {
requestBodyBuffer = restClientRequest.getBodyBuffer();
} catch (Exception e) {
return Future.failedFuture(e);
}
HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer);
// 發送前執行HttpClientFilter的beforeSendRequest邏輯
for (HttpClientFilter filter : httpClientFilters) {
if (filter.enabled()) {
filter.beforeSendRequest(invocation, requestEx);
}
}
// 再次嵌套,把請求放到httpClientWithContext去執行
// 從業務線程轉移到網絡線程中去發送
httpClientWithContext.runOnContext(httpClient -> {
clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout());
// 設置異步響應後的業務處理回調
clientRequest.response().onComplete(asyncResult -> {
if (asyncResult.failed()) {
fail(asyncResult.cause());
return;
}
// 異步響應後,回調處理響應response
handleResponse(asyncResult.result());
});
// 發送前,設置微服務調用上下文,傳遞到下游被調用方
processServiceCombHeaders(invocation, operationMeta);
// end是發送觸發器,並設置異步回調
restClientRequest.end()
.onComplete((t) -> invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime()));
});
return Future.succeededFuture();
}).onFailure(failure -> {
//失敗處理
});
走到這裏是main線程
建立連接的線程transport-eventllop
連接建立後,進入請求發送,還是transport-eventllop線程
異步響應傳遞仍是transport-eventloop線程,往下看馬上會傳回main線程
又轉回到main線程
3.線程模型分析
通過上面分析可知,整個過程,main線程->transport線程->main線程; transport線程只是負責建立連接和發送,以及接收響應後觸發響應處理,把處理傳遞到main線程。很神奇,這個過程是如何實現的。看代碼
從CseClientHttpRequest中的doInvoke開始,
protected Response doInvoke(Invocation invocation) {
return InvokerUtils.innerSyncInvoke(invocation);
}
進入核心類
public static Response innerSyncInvoke(Invocation invocation) {
try {
// 創建一個同步器,這個正是main線程,後面會將自己阻塞等待網絡線程的response,進行解碼
SyncResponseExecutor respExecutor = new SyncResponseExecutor();
invocation.setResponseExecutor(respExecutor);
// 開始觸發Handler鏈,最後一個Handler是TransportClientHandler
invocation.next(respExecutor::setResponse);
Response response = respExecutor.waitResponse(invocation);
return response;
} catch (Throwable e) {
}
}
接着看RestClientInvocation的網絡線程傳遞到main線程代碼
protected void processResponseBody(Buffer responseBuf) {
invocation.getInvocationStageTrace().finishReceiveResponse();
// 網絡線程傳遞到main線程
invocation.getResponseExecutor().execute(() -> {
try {
invocation.getInvocationStageTrace().startClientFiltersResponse();
HttpServletResponseEx responseEx =
new VertxClientResponseToHttpServletResponse(clientResponse, responseBuf);
// HttpClientFilter接收請求後的處理afterReceiveResponse
for (HttpClientFilter filter : httpClientFilters) {
if (filter.enabled()) {
Response response = filter.afterReceiveResponse(invocation, responseEx);
if (response != null) {
complete(response);
return;
}
}
}
} catch (Throwable e) {
fail(e);
}
});
// 上述的excute是在SyncResponseExecutor中,把lautch countDown
@Override
public void execute(Runnable cmd) {
this.cmd = cmd;
// one network thread, many connections, then this notify will be performance bottlenecks
// if save to a queue, and other thread(s) to invoke countDown, will get good performance
// but if have multiple network thread, this "optimization" will reduce performance
// now not change this.
latch.countDown();
}
// countDown後InvokerUtils的waitResponse就執行
Response response = respExecutor.waitResponse(invocation);
// 就是執行SyncResponseExecutor的waitResponse,走到cmd 不為空,執行run
public Response waitResponse(Invocation invocation) throws InvocationException {
guardedWait(invocation);
// cmd為null,是沒走execute,直接返回的場景
if (cmd != null) {
cmd.run();
}
return response;
}
// waitResponse執行run實際上就是RestClientInvocation的complete函數
protected void complete(Response response) {
invocation.getInvocationStageTrace().finishClientFiltersResponse();
asyncResp.complete(response);
}
// 最終RestClientInvocation的complete通過asyncResp設置response,asyncResp其實就是InvokerUtils中innerSyncInvoke函數的如下代碼
invocation.next(respExecutor::setResponse);
// setResponse也就是SyncResponseExecutor的setResponse九九歸一,繞了一圈,都是圍繞SyncResponseExecutor進行。實現main->網絡線程->main的過程。
總結:CseClientHttpRequest->Invocation(InvocationFactory.forConsumer)->InvokerUtils.innerSyncInvoke(invocation)->Handler處理鏈
4.httpClient創建
CseApplicationListener 初始化創建httpclient
consumer發送請求時,連接池已包含以創建的client
//HttpClients 部署client verticle
VertxUtils.blockDeploy(vertx, ClientVerticle.class, deployOptions);
//比較一下服務端部署VertxRestTransport
VertxUtils.blockDeploy(transportVertx, TransportConfig.getRestServerVerticle(), options);