【手寫RPC框架】如何使用netty手寫一個RPC框架 結合新特性 虛擬線程
什麼是RPC框架
RPC(Remote Procedure Call)遠程過程調用,是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC框架是一種遠程調用的框架,它可以讓你像調用本地方法一樣調用遠程方法。
避免了開發人員自己去封裝網絡請求、連接管理、序列化、反序列化等操作,提高了開發效率。
Netty是什麼?為什麼使用Netty
Netty是一個基於NIO的客户、服務器端編程框架,使用Netty可以快速開發網絡應用,例如服務器和客户端。Netty是一個高性能、異步事件驅動的網絡應用框架,它簡化了網絡編程,提供了一種新的方式來處理網絡通信。
大白話粗略理解:因為Java的NIO的API使用起來比較複雜,Netty是對NIO的封裝,使用起來更加簡單。
所以這也是為什麼我們使用Netty來實現RPC框架的原因,netty也被很多框架證明了它的穩定性和性能。
Java虛擬線程
Java虛擬線程是一個輕量級的線程,它不需要操作系統的線程支持,可以在一個線程中運行多個虛擬線程。Java虛擬線程是一個用户態的線程,它不需要操作系統的線程支持,可以在一個線程中運行多個虛擬線程。
虛擬線程實際上是通過傳統的線程來管理多個虛擬線程,在Java的平台上去調度這些虛擬線程,從而實現了輕量級的線程稱為虛擬線程,想要了解更加細節的可以去看下我的另一篇文章:【虛擬線程】Java虛擬線程 VirtualThread 是什麼黑科技
虛擬線程的優勢:
- 輕量級:虛擬線程是輕量級的線程,可以在一個線程中運行多個虛擬線程。
- 高效:虛擬線程是用户態的線程,不需要操作系統的線程支持,可以在一個線程中運行多個虛擬線程,線程的切換不涉及內核態和用户態的切換,效率更高。
適合的場景:
- 高併發:虛擬線程適合高併發的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。
- IO密集型:虛擬線程適合IO密集型的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。
- 任務短暫:虛擬線程適合任務短暫的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。
寫一個RPC框架需要哪些步驟
既然我們要寫一個RPC框架,那麼我們需要明確一下我們需要做哪些事情。
我們是從A服務調用B服務,那麼就代表我們的服務A是客户端,服務B是服務端。但是我們的系統正常來説要調用別的服務,也會被別的服務調用,
所以我們的服務A也是服務端,服務B也是客户端。所以我們的系統要同時具備客户端和服務端的功能。
- 客户端的功能:發現服務、請求(負載均衡、發起連接、發送請求)、接收響應、關閉連接。
- 服務端的功能:註冊服務、接收請求(接收連接、接收請求)、發送響應、關閉連接。
其實根據上面可以發現,服務端和客户端所做的事情是對應的,是一個鏡像的關係。所以我們就是對應放在一起講。
注意注意注意⚠️:
- 示例中的代碼為了方便理解,我只摘取了主要邏輯,且做了簡略,具體的實現可以看我放在最後的項目源碼。
- 這裏我們只是簡單的實現一個RPC框架,所以我們只是實現了最基本的功能,實際的RPC框架還有很多功能,比如:熔斷、限流、監控等等,這些功能可以根據實陫的需求來實現擴展。
1 發現服務、註冊服務
註冊服務:服務端想告訴別人我提供了哪些服務(接口的方法),我的地址是什麼。
發現服務:客户端需要知道我調用的一些服務(接口的方法)有哪些地址(ip + 端口)可以調用。
服務發現和註冊的方式有很多種,比如:zookeeper、nacos、consul、etcd等等。本次我們以zookeeper為例。
註冊服務代碼示例:
private static CuratorFramework client;
// 這裏使用Curator框架來操作zookeeper
public ZookeeperRegistryCenter() {
final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
final var builder = CuratorFrameworkFactory.builder()
.connectString(zookeeper.getAddress())
.namespace(zookeeper.getRootPath());
client = builder.build();
}
// 創建一個zk客户端
private static void create(String path, CreateMode mode) throws Exception {
client.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path);
}
發現服務代碼示例:
// 發現服務,只要監聽註冊中心的變化
public void watch() {
// 觀察者模式,監聽註冊中心的變化
registryCenter.watch((change, providerInfo) -> {
switch (change) {
case Change.ADD -> addServiceAddress(providerInfo);
case Change.UPDATE -> updateServiceAddress(providerInfo);
case Change.REMOVE -> deleteServiceAddress(providerInfo);
}
});
}
private void addOrUpdateServiceAddress(String methodStr, Pair<String, Integer> address) {
// 這裏使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地緩存服務地址,key是接口名+方法名,value是服務地址
SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
.add(address);
}
2 請求、接收
請求代碼示例:
// 請求
public Object send(RpcRequestMessage msg, Method method, Set<Pair<String, Integer>> addressSet) throws LRPCTimeOutException {
// 負載均衡選擇服務地址
final var address = clazzToAddress(method, addressSet);
// 獲取連接池
final var channelPool = getChannelPool(address);
// 在連接池中執行請求
return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
}
2.1 負載均衡
負載均衡:客户端在發現了服務的地址之後,可能有多個服務的地址,這時候需要做負載均衡,選擇一個服務的地址來調用。
// 選擇服務地址,負載均衡
private Pair<String, Integer> clazzToAddress(Method method, Set<Pair<String, Integer>> addressSet) {
if (addressSet != null && !addressSet.isEmpty()) {
// 若指定了服務地址,則在指定的服務地址中選擇
return loadBalancer.selectServiceAddress(method, addressSet);
}
addressSet = serviceManager.getServiceAddress(method);
// 若未指定服務地址,則在註冊中心的服務地址中選擇
return loadBalancer.selectServiceAddress(method, addressSet);
}
2.2 發起連接、接收連接
因為我們的rpc的調用會比較頻繁,所以我們需要保持長連接,避免頻繁的創建連接和斷開,這裏我們使用連接池來管理連接。
發起連接:客户端在知道了服務的地址之後,需要和服務端建立連接,建立連接後,再發送請求。
接收連接:服務端需要接收客户端的連接,接收到連接後,再接收請求。
private FixedChannelPool getChannelPool(Pair<String, Integer> address) {
final var host = address.left;
final var port = address.right;
return serviceManager.getChannelPool(address,
// 創建連接池
_ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
}
public FixedChannelPool getChannelPool(Pair<String, Integer> address, Function<String, FixedChannelPool> mappingFunction) {
final var host = address.left;
final var port = address.right;
return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
}
接收連接其實就是bossGroup的處理邏輯,這裏就不貼代碼了,可以看最後我貼的項目源碼。
2.3 發送請求、接收請求
發送請求:客户端在建立連接後,在調用服務的方法時,需要發送報文體,發送本地需要保存請求ID和Promise(用於接收調用結果,netty包裝一層的future)的映射關係,用來接收響應時,根據請求ID找到對應的請求。
接收請求:服務端在接收到客户端的連接後,需要接收到客户端的請求,解析請求,調用對應的方法。
我們本次使用自定義協議,所以需要約定好報文體的格式
報文體:16字節協議約定內容 + 請求體;
16字節協議約定內容:
(1):4個字節的長度來表示協議的魔數:就是一個固定的值,用來標識這是我們自定義的協議,這裏使用'L'、'R'、'P'、'C'。
(2):1個字節的版本號:標識這個協議的版本號,這裏因為是第一個版本,所以使用1。
(3):1個字節的序列化算法:標識這個協議使用的序列化算法,對應了序列化算法在枚舉中的數組下標,這裏使用的是0,表示使用JSON序列化。
(4):4個字節的請求ID:標識這個請求的ID,用來標識這個請求的唯一性,這裏使用UUID生成,可以在客户端和服務端都保存一個Map,用來保存請求ID和請求的映射關係。
(5):1個字節的消息類型:標識這個消息的類型,是請求還是響應,這裏使用1表示請求消息,2表示響應消息。
(6):4個字節的請求體的長度:使用Integer類型,表示請求體的長度,在接收請求時,根據這個長度來解析請求體。
(7):1個字節的補充位;無實際意義,只是為了對齊16字節。
請求體:序列化後轉成字節數組,內容有:接口名 + 方法名 + 返回參數類型 + 請求參數類型數組 + 請求參數值數組。
按剛剛上面約定好的協議格式解析,然後將請求體的內容反序列化,得到消息類型,使用LengthFieldBasedFrameDecoder解碼器,解決粘包和拆包問題,得到請求體的字節數組,然後反序列化,
得到消息後,獲取到接口名、方法名、返回參數類型、請求參數類型數組、請求參數值數組,使用動態代理調用對應的方法,得到返回值。
public <T> T getProxy(Class<T> clazz, Set<Pair<String, Integer>> serviceAddress) {
// 使用代理的方式,調用方法
final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
return consumerManager.send(msg, method, serviceAddress);
});
return clazz.cast(proxyInstance);
}
public Object executeWithChannelPool(ChannelPool channelPool,
BiFunction<Channel, RpcRequestMessage, Promise<Object>> function,
RpcRequestMessage msg) throws LRPCTimeOutException {
// 1. 從連接池中獲取連接,等待超市時間,未獲取連接則拋出異常
final Future<Channel> future = channelPool.acquire();
Channel channel = future.get();
final var promise = function.apply(channel, msg);
try {
return getResult(promise, msg.getMessageId());
} finally {
// 這裏的釋放需要放在拿到結果之後,否則會導臃連接被釋放
channelPool.release(channel);
}
}
private static BiFunction<Channel, RpcRequestMessage, Promise<Object>> channelExeFunction() {
// 發送請求,且處理寫失敗
return (channel, msg) -> {
final var promise = new DefaultPromise<>(channel.eventLoop());
RpcRespHandler.addPromise(msg.getMessageId(), promise);
// 發送請求,且處理寫失敗
final var channelFuture = channel.writeAndFlush(msg);
channelFuture.addListener(processAftermath(promise, msg));
return promise;
};
}
接收處理請求
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {
log.info("接收到消息 {}", JSON.toJSON(msg));
final var interfaceName = msg.getInterfaceName();
final var methodName = msg.getMethodName();
// 根據接口名獲取服務的本地實例
final var service = serviceManager.getService(interfaceName);
final var response = new RpcResponseMessage();
response.setMessageId(msg.getMessageId());
try {
// 使用反射調用方法
final Class<?> aClass = service.getClass();
final var method = aClass.getMethod(methodName, msg.getParameterTypes());
final var result = method.invoke(service, msg.getParameterValues());
response.setReturnValue(result);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
log.error("e : ", e);
response.setExceptionValue(new Error(e.getCause().getMessage()));
}
// 以下屬於發送響應的邏輯
ctx.writeAndFlush(response).addListener(future -> {
if (future.isSuccess()) {
log.info("消息響應成功 {}", JSON.toJSON(msg));
return;
}
log.error("發送消息時有錯誤發生: ", future.cause());
});
}
3 發送響應、接收響應
得到第6步的返回值後,需要將返回值封裝成響應報文體,發送給客户端。
這裏發送響應的方式其實是和發送請求的方式是一樣的,只是消息類型不一樣,這裏是響應消息。
客户端接收到響應後,根據請求ID找到對應的請求,將響應的內容返回給調用方。
發送響應
// 在剛剛接收請求處理的channelRead0函數中,處理髮送響應的邏輯
ctx.writeAndFlush(response).addListener(future -> {
if (future.isSuccess()) {
log.info("消息響應成功 {}", JSON.toJSON(msg));
return;
}
log.error("發送消息時有錯誤發生: ", future.cause());
});
接收響應
private static Object getResult(Promise<Object> promise, Integer messageId) throws LRPCTimeOutException {
try {
// 超時等待
if (promise.await(5, TimeUnit.SECONDS)) {
if (promise.isSuccess()) {
return promise.getNow();
} else {
throw new RuntimeException(promise.cause());
}
} else {
throw new LRPCTimeOutException("請求超時");
}
} catch (InterruptedException e) {
throw new RuntimeException("操作被中斷", e);
} finally {
// 確保 promise 被移除
RpcRespHandler.removePromise(messageId);
}
}
4 關閉連接
關閉連接:客户端和服務端在完成請求和響應後,會把連接放回連接池,等待下一次的調用,等連接池關閉時,會關閉連接,服務端感應到連接關閉,會關閉連接。
怎麼將虛擬線程和Netty結合起來
分析
前面我們説過,虛擬線程適合高併發、IO密集型的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。
看一下netty的服務端網絡通信的架構簡圖:
在netty中,一個NioEventLoop中有一個Selector,一個Selector可以註冊多個Channel,一個Channel對應一個連接,一個線程可以處理多個連接,這就是netty的高性能的原因。
在每次循環中,Selector就會阻塞監聽Channel的事件,當有事件發生時,就會處理這個事件。
所以在這過程中,線程的數量,影響着Selector的數量,影響着Channel的數量,但是在傳統的線程中,線程的數量是有限的,所以這就限制了Selector的數量,影響着Channel的數量,影響着性能,
所以我們可以使用虛擬線程來解決這個問題,虛擬線程可以在一個線程中運行多個虛擬線程,且虛擬線程會在其中一個虛擬線程阻塞時,會切換到其他虛擬線程,且沒有系統級別的上下文切換,所以可以帶來更高的性能。
所以我們這裏主要是改變workerGroup的線程模型,使用虛擬線程來代替workerGroup裏的傳統的線程。
實現
根據netty的NioEventGroup的源碼,線程來自三個地方:
- 構造函數的入參的線程工廠;
- 構造參數的入參的executor;
- 父類io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的線程工廠;
這裏我們以重寫父類的newDefaultThreadFactory()方法為例,來實現虛擬線程。
private NioEventLoopGroup getWorker() {
final var workerMax = lrpcProperties.getServer().getWorkerMax();
// 創建workerGroup
return new NioEventLoopGroup(workerMax) {
// 直接在創建的時候重寫newDefaultThreadFactory()方法
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
}
};
}
// 這裏是重寫的ThreadFactory
public class VirtualThreadFactory extends DefaultThreadFactory {
public VirtualThreadFactory(Class<?> poolType, int priority) {
super(poolType, priority);
}
@Override
protected Thread newThread(Runnable r, String name) {
// 這裏使用FastThreadLocalThread,是因為FastThreadLocalThread是netty提供的一個線程,裏面的方法有些功能,所以我們這裏直接繼承它,然後重寫start()方法
return new FastThreadLocalThread(threadGroup, r, name){
// 這裏的Thread.ofVirtual().unstarted(this)是創建一個虛擬線程
@Override
public void start() {
final var unstarted = Thread.ofVirtual().unstarted(this);
unstarted.setName(this.getName());
unstarted.start();
}
};
}
}
總結
本次我們實現了一個簡單的RPC框架,使用了netty作為底層通信框架,使用了zookeeper作為服務發現和註冊中心,使用了虛擬線程代替服務端的workerGroup的線程模型,擴展了可管控的Selector的數量,且在線程的切換上,沒有系統級別的上下文切換,提高了性能。
這裏只是一個簡單的實現,實際的RPC框架還有很多功能,比如:熔斷、限流、監控等等,這些功能可以根據實陫的需求來實現,而且在實際的實現過程中,還會遇到很多問題,比如:序列化和反序列化擴展、線程安全問題等等,都值得我們去深入研究。
這裏分享一下我的實現的代碼,麻煩老哥們幫忙點個star 😭 ,謝謝!有問題可以留言,我會在第一有空閒的時間回覆。
項目地址:JGZHAN/lrpc 戳這裏去點star