動態

詳情 返回 返回

【手寫 RPC】手寫一個RPC框架 使用Netty + java虛擬線程 - 動態 詳情

【手寫RPC框架】如何使用netty手寫一個RPC框架 結合新特性 虛擬線程

什麼是RPC框架

RPC(Remote Procedure Call)遠程過程調用,是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC框架是一種遠程調用的框架,它可以讓你像調用本地方法一樣調用遠程方法。

避免了開發人員自己去封裝網絡請求、連接管理、序列化、反序列化等操作,提高了開發效率。

Netty是什麼?為什麼使用Netty

Netty是一個基於NIO的客户、服務器端編程框架,使用Netty可以快速開發網絡應用,例如服務器和客户端。Netty是一個高性能、異步事件驅動的網絡應用框架,它簡化了網絡編程,提供了一種新的方式來處理網絡通信。

大白話粗略理解:因為Java的NIO的API使用起來比較複雜,Netty是對NIO的封裝,使用起來更加簡單。

所以這也是為什麼我們使用Netty來實現RPC框架的原因,netty也被很多框架證明了它的穩定性和性能。

Java虛擬線程

Java虛擬線程是一個輕量級的線程,它不需要操作系統的線程支持,可以在一個線程中運行多個虛擬線程。Java虛擬線程是一個用户態的線程,它不需要操作系統的線程支持,可以在一個線程中運行多個虛擬線程。

虛擬線程實際上是通過傳統的線程來管理多個虛擬線程,在Java的平台上去調度這些虛擬線程,從而實現了輕量級的線程稱為虛擬線程,想要了解更加細節的可以去看下我的另一篇文章:【虛擬線程】Java虛擬線程 VirtualThread 是什麼黑科技

虛擬線程的優勢:

  1. 輕量級:虛擬線程是輕量級的線程,可以在一個線程中運行多個虛擬線程。
  2. 高效:虛擬線程是用户態的線程,不需要操作系統的線程支持,可以在一個線程中運行多個虛擬線程,線程的切換不涉及內核態和用户態的切換,效率更高。

適合的場景:

  1. 高併發:虛擬線程適合高併發的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。
  2. IO密集型:虛擬線程適合IO密集型的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。
  3. 任務短暫:虛擬線程適合任務短暫的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。

寫一個RPC框架需要哪些步驟

既然我們要寫一個RPC框架,那麼我們需要明確一下我們需要做哪些事情。
我們是從A服務調用B服務,那麼就代表我們的服務A是客户端,服務B是服務端。但是我們的系統正常來説要調用別的服務,也會被別的服務調用,
所以我們的服務A也是服務端,服務B也是客户端。所以我們的系統要同時具備客户端和服務端的功能。

  • 客户端的功能:發現服務、請求(負載均衡、發起連接、發送請求)、接收響應、關閉連接。
  • 服務端的功能:註冊服務、接收請求(接收連接、接收請求)、發送響應、關閉連接。

其實根據上面可以發現,服務端和客户端所做的事情是對應的,是一個鏡像的關係。所以我們就是對應放在一起講。

注意注意注意⚠️:

  1. 示例中的代碼為了方便理解,我只摘取了主要邏輯,且做了簡略,具體的實現可以看我放在最後的項目源碼。
  2. 這裏我們只是簡單的實現一個RPC框架,所以我們只是實現了最基本的功能,實際的RPC框架還有很多功能,比如:熔斷、限流、監控等等,這些功能可以根據實陫的需求來實現擴展。

1 發現服務、註冊服務

註冊服務:服務端想告訴別人我提供了哪些服務(接口的方法),我的地址是什麼。
發現服務:客户端需要知道我調用的一些服務(接口的方法)有哪些地址(ip + 端口)可以調用。

服務發現和註冊的方式有很多種,比如:zookeeper、nacos、consul、etcd等等。本次我們以zookeeper為例。

註冊服務代碼示例:

    private static CuratorFramework client;
    
    // 這裏使用Curator框架來操作zookeeper
    public ZookeeperRegistryCenter() {
       final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();
    
       RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
       final var builder = CuratorFrameworkFactory.builder()
               .connectString(zookeeper.getAddress())
               .namespace(zookeeper.getRootPath());
       client = builder.build();
    }
    // 創建一個zk客户端
    private static void create(String path, CreateMode mode) throws Exception {
        client.create()
                .creatingParentsIfNeeded()
                .withMode(mode)
                .forPath(path);
    }

發現服務代碼示例:

    // 發現服務,只要監聽註冊中心的變化
    public void watch() {
    
        // 觀察者模式,監聽註冊中心的變化
        registryCenter.watch((change, providerInfo) -> {
            switch (change) {
                case Change.ADD -> addServiceAddress(providerInfo);
                case Change.UPDATE -> updateServiceAddress(providerInfo);
                case Change.REMOVE -> deleteServiceAddress(providerInfo);
            }
        });
    }

    private void addOrUpdateServiceAddress(String methodStr, Pair<String, Integer> address) {
        // 這裏使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地緩存服務地址,key是接口名+方法名,value是服務地址
        SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
                .add(address);
    }

2 請求、接收

請求代碼示例:

    // 請求
    public Object send(RpcRequestMessage msg, Method method, Set<Pair<String, Integer>> addressSet) throws LRPCTimeOutException {
        // 負載均衡選擇服務地址
        final var address = clazzToAddress(method, addressSet);
        // 獲取連接池
        final var channelPool = getChannelPool(address);
        // 在連接池中執行請求
        return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
    }

2.1 負載均衡

負載均衡:客户端在發現了服務的地址之後,可能有多個服務的地址,這時候需要做負載均衡,選擇一個服務的地址來調用。

    // 選擇服務地址,負載均衡
    private Pair<String, Integer> clazzToAddress(Method method, Set<Pair<String, Integer>> addressSet) {
        if (addressSet != null && !addressSet.isEmpty()) {
            // 若指定了服務地址,則在指定的服務地址中選擇
            return loadBalancer.selectServiceAddress(method, addressSet);
        }
        addressSet = serviceManager.getServiceAddress(method);
        // 若未指定服務地址,則在註冊中心的服務地址中選擇
        return loadBalancer.selectServiceAddress(method, addressSet);
    }

2.2 發起連接、接收連接

因為我們的rpc的調用會比較頻繁,所以我們需要保持長連接,避免頻繁的創建連接和斷開,這裏我們使用連接池來管理連接。

發起連接:客户端在知道了服務的地址之後,需要和服務端建立連接,建立連接後,再發送請求。

接收連接:服務端需要接收客户端的連接,接收到連接後,再接收請求。

    private FixedChannelPool getChannelPool(Pair<String, Integer> address) {
        final var host = address.left;
        final var port = address.right;
        return serviceManager.getChannelPool(address,
                // 創建連接池
                _ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
    }
    public FixedChannelPool getChannelPool(Pair<String, Integer> address, Function<String, FixedChannelPool> mappingFunction) {
        final var host = address.left;
        final var port = address.right;
        return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
    }

接收連接其實就是bossGroup的處理邏輯,這裏就不貼代碼了,可以看最後我貼的項目源碼。

2.3 發送請求、接收請求

發送請求:客户端在建立連接後,在調用服務的方法時,需要發送報文體,發送本地需要保存請求ID和Promise(用於接收調用結果,netty包裝一層的future)的映射關係,用來接收響應時,根據請求ID找到對應的請求。

接收請求:服務端在接收到客户端的連接後,需要接收到客户端的請求,解析請求,調用對應的方法。

我們本次使用自定義協議,所以需要約定好報文體的格式

報文體:16字節協議約定內容 + 請求體;

16字節協議約定內容:

  (1):4個字節的長度來表示協議的魔數:就是一個固定的值,用來標識這是我們自定義的協議,這裏使用'L'、'R'、'P'、'C'。

  (2):1個字節的版本號:標識這個協議的版本號,這裏因為是第一個版本,所以使用1。

  (3):1個字節的序列化算法:標識這個協議使用的序列化算法,對應了序列化算法在枚舉中的數組下標,這裏使用的是0,表示使用JSON序列化。

  (4):4個字節的請求ID:標識這個請求的ID,用來標識這個請求的唯一性,這裏使用UUID生成,可以在客户端和服務端都保存一個Map,用來保存請求ID和請求的映射關係。

  (5):1個字節的消息類型:標識這個消息的類型,是請求還是響應,這裏使用1表示請求消息,2表示響應消息。

  (6):4個字節的請求體的長度:使用Integer類型,表示請求體的長度,在接收請求時,根據這個長度來解析請求體。

  (7):1個字節的補充位;無實際意義,只是為了對齊16字節。

   請求體:序列化後轉成字節數組,內容有:接口名 + 方法名 + 返回參數類型 + 請求參數類型數組 + 請求參數值數組。

按剛剛上面約定好的協議格式解析,然後將請求體的內容反序列化,得到消息類型,使用LengthFieldBasedFrameDecoder解碼器,解決粘包和拆包問題,得到請求體的字節數組,然後反序列化,

得到消息後,獲取到接口名、方法名、返回參數類型、請求參數類型數組、請求參數值數組,使用動態代理調用對應的方法,得到返回值。


    public <T> T getProxy(Class<T> clazz, Set<Pair<String, Integer>> serviceAddress) {
        // 使用代理的方式,調用方法
        final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
            RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
            return consumerManager.send(msg, method, serviceAddress);
        });
        return clazz.cast(proxyInstance);
    }
    
    public Object executeWithChannelPool(ChannelPool channelPool,
                                                  BiFunction<Channel, RpcRequestMessage, Promise<Object>> function,
                                                  RpcRequestMessage msg) throws LRPCTimeOutException {
        // 1. 從連接池中獲取連接,等待超市時間,未獲取連接則拋出異常
        final Future<Channel> future = channelPool.acquire();
        Channel channel = future.get();
        final var promise = function.apply(channel, msg);
        try {
            return getResult(promise, msg.getMessageId());
        } finally {
            // 這裏的釋放需要放在拿到結果之後,否則會導臃連接被釋放
            channelPool.release(channel);
        }
    }
    
    private static BiFunction<Channel, RpcRequestMessage, Promise<Object>> channelExeFunction() {
        // 發送請求,且處理寫失敗
        return (channel, msg) -> {
            final var promise = new DefaultPromise<>(channel.eventLoop());
            RpcRespHandler.addPromise(msg.getMessageId(), promise);
            // 發送請求,且處理寫失敗
            final var channelFuture = channel.writeAndFlush(msg);
            channelFuture.addListener(processAftermath(promise, msg));
            return promise;
        };
    }

接收處理請求


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {

        log.info("接收到消息 {}", JSON.toJSON(msg));
        final var interfaceName = msg.getInterfaceName();
        final var methodName = msg.getMethodName();
        
        // 根據接口名獲取服務的本地實例
        final var service = serviceManager.getService(interfaceName);

        final var response = new RpcResponseMessage();
        response.setMessageId(msg.getMessageId());
        try {
            // 使用反射調用方法
            final Class<?> aClass = service.getClass();
            final var method = aClass.getMethod(methodName, msg.getParameterTypes());
            final var result = method.invoke(service, msg.getParameterValues());
            response.setReturnValue(result);
        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
            log.error("e : ", e);
            response.setExceptionValue(new Error(e.getCause().getMessage()));
        }
        
        // 以下屬於發送響應的邏輯
        ctx.writeAndFlush(response).addListener(future -> {
            if (future.isSuccess()) {
                log.info("消息響應成功 {}", JSON.toJSON(msg));
                return;
            }
            log.error("發送消息時有錯誤發生: ", future.cause());
        });
    }

3 發送響應、接收響應

得到第6步的返回值後,需要將返回值封裝成響應報文體,發送給客户端。

這裏發送響應的方式其實是和發送請求的方式是一樣的,只是消息類型不一樣,這裏是響應消息。
客户端接收到響應後,根據請求ID找到對應的請求,將響應的內容返回給調用方。

發送響應


        // 在剛剛接收請求處理的channelRead0函數中,處理髮送響應的邏輯
        ctx.writeAndFlush(response).addListener(future -> {
            if (future.isSuccess()) {
                log.info("消息響應成功 {}", JSON.toJSON(msg));
                return;
            }
            log.error("發送消息時有錯誤發生: ", future.cause());
        });

接收響應

    private static Object getResult(Promise<Object> promise, Integer messageId) throws LRPCTimeOutException {
        try {
            // 超時等待
            if (promise.await(5, TimeUnit.SECONDS)) {
                if (promise.isSuccess()) {
                    return promise.getNow();
                } else {
                    throw new RuntimeException(promise.cause());
                }
            } else {
                throw new LRPCTimeOutException("請求超時");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("操作被中斷", e);
        } finally {
            // 確保 promise 被移除
            RpcRespHandler.removePromise(messageId);
        }
    }

4 關閉連接

關閉連接:客户端和服務端在完成請求和響應後,會把連接放回連接池,等待下一次的調用,等連接池關閉時,會關閉連接,服務端感應到連接關閉,會關閉連接。

怎麼將虛擬線程和Netty結合起來

分析

前面我們説過,虛擬線程適合高併發、IO密集型的場景,可以在一個線程中運行多個虛擬線程,減少線程的創建和銷燬,提高性能。

看一下netty的服務端網絡通信的架構簡圖:

img.png

在netty中,一個NioEventLoop中有一個Selector,一個Selector可以註冊多個Channel,一個Channel對應一個連接,一個線程可以處理多個連接,這就是netty的高性能的原因。
在每次循環中,Selector就會阻塞監聽Channel的事件,當有事件發生時,就會處理這個事件。
所以在這過程中,線程的數量,影響着Selector的數量,影響着Channel的數量,但是在傳統的線程中,線程的數量是有限的,所以這就限制了Selector的數量,影響着Channel的數量,影響着性能,
所以我們可以使用虛擬線程來解決這個問題,虛擬線程可以在一個線程中運行多個虛擬線程,且虛擬線程會在其中一個虛擬線程阻塞時,會切換到其他虛擬線程,且沒有系統級別的上下文切換,所以可以帶來更高的性能。
所以我們這裏主要是改變workerGroup的線程模型,使用虛擬線程來代替workerGroup裏的傳統的線程。

實現

根據netty的NioEventGroup的源碼,線程來自三個地方:

  1. 構造函數的入參的線程工廠;
  2. 構造參數的入參的executor;
  3. 父類io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的線程工廠;

這裏我們以重寫父類的newDefaultThreadFactory()方法為例,來實現虛擬線程。

    private NioEventLoopGroup getWorker() {
        final var workerMax = lrpcProperties.getServer().getWorkerMax();
        // 創建workerGroup
        return new NioEventLoopGroup(workerMax) {
            // 直接在創建的時候重寫newDefaultThreadFactory()方法
            @Override
            protected ThreadFactory newDefaultThreadFactory() {
                return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
            }
        };
    }

// 這裏是重寫的ThreadFactory
public class VirtualThreadFactory extends DefaultThreadFactory {
   public VirtualThreadFactory(Class<?> poolType, int priority) {
      super(poolType, priority);
   }

   @Override
   protected Thread newThread(Runnable r, String name) {
      // 這裏使用FastThreadLocalThread,是因為FastThreadLocalThread是netty提供的一個線程,裏面的方法有些功能,所以我們這裏直接繼承它,然後重寫start()方法
      return new FastThreadLocalThread(threadGroup, r, name){
         // 這裏的Thread.ofVirtual().unstarted(this)是創建一個虛擬線程
         @Override
         public void start() {
            final var unstarted = Thread.ofVirtual().unstarted(this);
            unstarted.setName(this.getName());
            unstarted.start();
         }
      };
   }
}

總結

本次我們實現了一個簡單的RPC框架,使用了netty作為底層通信框架,使用了zookeeper作為服務發現和註冊中心,使用了虛擬線程代替服務端的workerGroup的線程模型,擴展了可管控的Selector的數量,且在線程的切換上,沒有系統級別的上下文切換,提高了性能。

這裏只是一個簡單的實現,實際的RPC框架還有很多功能,比如:熔斷、限流、監控等等,這些功能可以根據實陫的需求來實現,而且在實際的實現過程中,還會遇到很多問題,比如:序列化和反序列化擴展、線程安全問題等等,都值得我們去深入研究。
這裏分享一下我的實現的代碼,麻煩老哥們幫忙點個star 😭 ,謝謝!有問題可以留言,我會在第一有空閒的時間回覆。
項目地址:JGZHAN/lrpc 戳這裏去點star
img_1.png

user avatar u_16297326 頭像 u_16502039 頭像 debuginn 頭像 xuxueli 頭像 tech 頭像 u_11365552 頭像 lvlaotou 頭像 lu_lu 頭像 aipaobudezuoyeben 頭像 boxuegu 頭像 chaochenyinshi 頭像 wuliaodechaye 頭像
點贊 46 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.