博客 / 詳情

返回

5 倍性能提升,Apache Doris TopN 全局優化詳解|Deep Dive

在日常的數據分析和業務報表中,TopN 查詢幾乎無處不在:無論是尋找銷量最高的前十件商品,還是篩選訪問量最多的前幾條日誌,開發者和數據分析師都在頻繁處理“前 N 條數據”。然而,當表的列數達到百餘或更多時,一個看似簡單的 SELECT \* … ORDER BY … LIMIT N 查詢,背後可能隱藏着巨大的性能瓶頸。儘管我們只關心某一列的前 N 條結果,數據庫依然可能掃描整張表的所有列,從而導致 IO 讀放大(Read Amplification),拖慢查詢速度。在大數據場景下,這種低效不僅浪費存儲帶寬,還直接影響業務決策的實時性。

為了幫助用户快速獲取目標數據,Apache Doris針對 TopN 類型查詢進行了全局優化,可將此類查詢的性能提升約 5 倍;同時,優化範圍也從單表進一步拓展至數據湖場景與多表關聯查詢,顯著擴大了適用範圍

TopN 查詢優化思路

為直觀説明 TopN 查詢的性能瓶頸,我們不妨將其簡化為列式存儲文件的讀取場景,比如訪問 Apache Doris 內部 Segment 文件,或訪問數據湖中常見的 Parquet / ORC 文件

假設需要找“第二列”中,數值最大的那條記錄:SELECT * FROM table ORDER BY col2 LIMIT 1。由於查詢需要返回整行,傳統做法通常是先掃描表的所有列,排序後再定位到對應記錄。

而 Apache Doris 原生列式存儲的物理佈局能夠提供更優解:由於各列獨立存放,因此可先僅讀取第二列的數據,快速計算出最大值所在的行號;再利用文件元數據,直接按行號提取該行的完整記錄,無需掃描無關列。相比傳統方式,這種方法顯著減少 IO 讀放大並降低內存佔用

這一優化對於湖倉分析場景尤為關鍵,因其直接關乎成本及性能。 對於 Iceberg、Paimon 等開放湖格式,數據通常存放在 S3 等對象存儲中,其 IO 性能普遍低於本地磁盤,且常按訪問流量或請求次數計費。數據掃描次數的減少,意味着更低的延遲與更少的費用。特別是在數據量龐大、查詢頻繁的分析業務中,TopN 的優化不僅能大幅提升響應速度,更能帶來切實的成本節約,實現性能與經濟的雙重收益。

全局 TopN 優化實現

基於上述思路指引,Apache Doris 完成了對 TopN 的全局優化。對於單表的 TopN,利用單節點內的 Runtime Filter 對內部表查詢進行動態過濾,有效減少 IO 並提升執行性能。在前不久不發的 4.0 版本中,也進一步提升了 TopN 查詢性能,通過引入 MaterializeNode,實現了兩階段數據訪問機制,並將優化範圍從單表進一步拓展至數據湖場景與多表關聯查詢,顯著擴大了其適用範圍

接下來,我們將深入解析 TopN Runtime Filter、單表兩階段 TopN 以及多表關聯 TopN 的具體優化實現。

01 採用 Runtime Filter

Runtime Filter 是一種運行時數據裁剪技術。Doris 在執行 SQL 時動態生成過濾條件,並將這些條件下推到後續數據處理環節,利用運行時信息進行數據裁剪,從而降低 IO 開銷並提升性能。在兩表 Join 場景中,這一技術的典型應用是將 build 側的 key 集合通過 IN-list、Bloom Filter 等形式下推到 probe 側,儘早過濾掉無關數據,減少掃描和傳輸。

TopN Runtime Filter 同樣採用這一思路,在運行時維護排序列的值範圍,並生成 Runtime Filter 以裁剪後續掃描,從而提升單節點上的 TopN 查詢性能

01 採用 Runtime Filter.PNG

在單機測試中,基於 Runtime Filter 優化後的 TopN 查詢耗時從 3 秒降到 1 秒,性能提升約 3 倍

SELECT * FROM lineitem ORDER BY l_orderkey LIMIT 1000;

02 兩階段數據訪問機制

基於 Runtime Filter 的方法雖然能夠在運行時動態過濾數據,但仍需讀取所有列,無法徹底消除讀放大。為此,我們引入了兩階段數據訪問機制,進一步減少列的讀取與 IO 開銷。其執行流程示意圖如下:

02 兩階段數據訪問機制.png

以如下 SQL 為例:

SELECT * FROM table ORDER BY colA LIMIT 10;

第 1 階段:只讀取排序列

在該階段的 Scan 任務中,系統只讀取排序列colA,並增加一個輔助列 __DORIS_ROWID_COL__。相當於執行:

SELECT colA, __DORIS_ROWID_COL__ FROM table ORDER BY colA LIMIT 10;

該方法跳過了非排序列的讀取,僅掃描與排序相關的數據並記錄其位置信息。DORIS_ROWID_COL 用於唯一標識數據所在文件與行號,其具體編碼設計將在後續章節詳細説明。

第 2 階段:基於 RowID 的完整數據獲取

新增的 MaterializeNode 接收第一階段的結果後,會根據 __DORIS_ROWID_COL__ 向對應 Backend 發起基於行號(RowID)的數據拉取請求。藉助文件中記錄的位置信息,Doris 可以快速定位並讀取對應記錄;由於已完成 TopN 計算,第二階段通常只需讀取有限行(例如示例中的 10 行)。

得益於該階段可通過 RPC 跨節點執行,打破了單節點執行限制,兩階段訪問機制也自然擴展至多表關聯的 TopN 場景,例如:

SELECT * FROM 
lineitem JOIN  orders 
ON l_orderkey = o_orderkey
WHERE o_orderdate < DATE '1995-03-15' 
ORDER BY l_partkey LIMIT 100;

其執行規劃示意如下:

02 兩階段數據訪問機制-1.png

執行計劃中,MaterializeNode 在第二階段可以穿透 Join 節點,從掃描節點獲取最終數據。

優化前後性能表現

Apache Doris 對於 TopN 的優化已在多種場景上得到驗證。我們在 Doris 內表、Parquet 及 ORC 格式的 Hive 表上,基於 TPCH 100G 標準數據集中的 lineitem 表,分別構建了單表多表 TopN 查詢場景,系統對比了優化前後的性能表現。

  • 單表 TopN 查詢示例(選取不同排序列):

    • -- Q1 - Q3:
      select * from lineitem order by l_orderkey limit 1000;
      select * from lineitem order by l_partkey limit 1000;
  • 多表 TopN 查詢示例(不同的表數、JOIN 方式與 SELECT 列數):

    • -- Q4:
      SELECT * FROM lineitem JOIN orders ON l_orderkey = o_orderkey
      WHERE o_orderdate < DATE '1995-03-15' ORDER BY l_partkey LIMIT 100;
      
      -- Q5:
      SELECT * FROM customer, orders, lineitem
      WHERE c_mktsegment = 'BUILDING'
          AND c_custkey = o_custkey
          AND l_orderkey = o_orderkey
          AND o_orderdate < DATE '1995-03-15'
          AND l_shipdate > DATE '1995-03-15'
      ORDER BY o_orderdate LIMIT 10;
      
      -- Q6:
      SELECT lineitem.*
      FROM customer, orders, lineitem
      WHERE c_mktsegment = 'BUILDING'
          AND c_custkey = o_custkey
          AND l_orderkey = o_orderkey
          AND o_orderdate < DATE '1995-03-15'
          AND l_shipdate > DATE '1995-03-15'
      ORDER BY o_orderdate LIMIT 10;
      
      -- Q7:
      SELECT l_shipdate, l_orderkey, l_linenumber
      FROM customer, orders, lineitem
      WHERE c_mktsegment = 'BUILDING'
          AND c_custkey = o_custkey
          AND l_orderkey = o_orderkey
          AND o_orderdate < DATE '1995-03-15'
          AND l_shipdate > DATE '1995-03-15'
      ORDER BY o_orderdate LIMIT 10;
      
      -- Q8:
      SELECT * FROM supplier, lineitem l1, orders, nation
      WHERE s_suppkey = l1.l_suppkey
          AND o_orderkey = l1.l_orderkey
          AND o_orderstatus = 'F'
          AND l1.l_receiptdate > l1.l_commitdate
          AND EXISTS (SELECT * FROM lineitem l2 WHERE l2.l_orderkey = l1.l_orderkey AND l2.l_suppkey <> l1.l_suppkey)
          AND NOT EXISTS (SELECT * FROM lineitem l3
              WHERE l3.l_orderkey = l1.l_orderkey
                  AND l3.l_suppkey <> l1.l_suppkey
                  AND l3.l_receiptdate > l3.l_commitdate)
          AND s_nationkey = n_nationkey
          AND n_name = 'SAUDI ARABIA'
      ORDER BY s_name LIMIT 100;
      
      -- Q9:
      SELECT s_name, s_address, s_phone, s_acctbal, l_shipdate, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipinstruct, o_orderdate, o_totalprice, o_orderpriority, n_name
      FROM supplier, lineitem l1, orders, nation
      WHERE s_suppkey = l1.l_suppkey
          AND o_orderkey = l1.l_orderkey
          AND o_orderstatus = 'F'
          AND l1.l_receiptdate > l1.l_commitdate
          AND EXISTS (SELECT * FROM lineitem l2 WHERE l2.l_orderkey = l1.l_orderkey   AND l2.l_suppkey <> l1.l_suppkey)
          AND NOT EXISTS (SELECT * FROM lineitem l3
              WHERE l3.l_orderkey = l1.l_orderkey
                AND l3.l_suppkey <> l1.l_suppkey
                AND l3.l_receiptdate > l3.l_commitdate)
          AND s_nationkey = n_nationkey
          AND n_name = 'SAUDI ARABIA'
      ORDER BY s_name LIMIT 100;
      
      -- Q10:
      SELECT s_name, s_nationkey, l_orderkey, o_orderstatus, n_name
      FROM supplier, lineitem l1, orders, nation
      WHERE s_suppkey = l1.l_suppkey
          AND o_orderkey = l1.l_orderkey
          AND o_orderstatus = 'F'
          AND l1.l_receiptdate > l1.l_commitdate
          AND EXISTS (SELECT * FROM lineitem l2 WHERE l2.l_orderkey = l1.l_orderkey   AND l2.l_suppkey <> l1.l_suppkey)
          AND NOT EXISTS (SELECT * FROM lineitem l3
              WHERE l3.l_orderkey = l1.l_orderkey
                AND l3.l_suppkey <> l1.l_suppkey
                AND l3.l_receiptdate > l3.l_commitdate)
          AND s_nationkey = n_nationkey
          AND n_name = 'SAUDI ARABIA'

下表彙總了優化帶來的平均性能提升(查詢時間縮短的百分比區間):

優化前後的性能表現.png

數據表明,TopN 優化在多種數據格式與查詢模式下均能顯著提升性能。平均可降低查詢時間 30% 至 40%在部分多表關聯場景中,性能提升幅度最高可達 80%,效果尤為突出。這證明了兩階段訪問機制有效減少了不必要的 IO,在不同存儲格式和複雜查詢中均能帶來可觀的收益。

TopN 執行邏輯解析

前文簡要介紹了 TopN 的兩階段執行邏輯,在實際實現中,該流程面臨幾項核心挑戰:

  • Pipeline 執行線程的阻塞:第二階段數據拉取涉及網絡 IO,若在 Pipeline 執行線程中同步進行,會導致線程被阻塞,降低系統整體吞吐。
  • 多表查詢的支持:Join 算子涉及多張表的物化,需要準確識別對應需要物化的列。
  • 內外表格式的統一:Doris 內表與 Parquet、ORC 等開放格式在行號管理上機制不同,需設計統一的行標識抽象,以確保內外表邏輯一致。
  • 資源隔離管控:延遲物化階段的 IO 操作需納入 Workload Group 進行統一資源管控,避免干擾線上其他查詢,保證系統穩定性。

針對上述挑戰,Doris 通過混合任務調度器、全局行標識編碼 與 智能優化器規則 協同工作,系統性地解決了這些問題。以下我們將逐一展開其設計實現。

01 混合調度器

為解決 Pipeline 執行線程在網絡 IO 場景下易被阻塞的問題,我們重構了 Doris 的 Pipeline 執行框架,引入了混合任務調度器(HybridTaskScheduler),從調度層面分離阻塞與非阻塞任務,顯著降低了 IO 等待對執行效率的影響。其核心設計如下圖所示:

01 混合調度器.png

具體實現上,原有統一的 TaskScheduler 被拆分為兩類調度器,共同構成新的 HybridTaskScheduler

  • NonBlockingScheduler:專門調度非阻塞型任務(如純計算操作)。調度器線程數量跟 CPU 核數相等。能夠確保充分利用 CPU 資源。
  • BlockingScheduler:用於調度可能阻塞的任務,如涉及磁盤 IO、網絡 IO 等操作。該調度器線程數可動態調整,默認為 CPU 核數的兩倍,以更好地容納 IO 等待。

通過將任務按是否阻塞分類調度,系統有效避免了阻塞型任務對計算密集型任務的資源搶佔。例如,TopN 查詢第二階段中的 Materialization Node 會被自動提交至 BlockingScheduler 執行,從而大幅減少 IO 阻塞對全局 Pipeline 執行線程的佔用。

02 全局 ID 編碼與資源管控

上文提到的 __DORIS_ROWID_COL__用於在第二階段精確定位數據行,其編碼設計兼顧了效率、跨格式一致性與資源管控。編碼格式如下:

編碼格式: [version:uint8] + [backend_id:uint64] + [file_id:uint32] + [row_id:uint32]
  • version:標識編碼格式版本,用於後續擴展與兼容。
  • backend_id:BE 節點 ID。該字段實現了精準的 RPC 定向——第二階段請求可直接發送至對應節點,避免廣播開銷。同時,接收請求的節點會將數據讀取任務提交至該查詢所屬的 Workload Group,從而確保資源隔離與統一管控。
  • file_id:系統為查詢涉及到的每個文件生成唯一 ID,並在內存中維護 ID 到實際文件路徑的映射。通過唯一 ID 可以減少第二階段發送文件信息的請求大小,減少網絡資源開銷。

    • 對於內表,文件名編碼為:tabet_id-rowset_id-segment_id
    • 對於 Parquet/ORC,文件名編碼為:filename-rowgroup_id
  • row_id:用於標識數據在對應文件中的行號。同時,針對 OUTER JOIN 等可能會生成 NULL 值的場景,row_id 可以編碼為 NULL,從而在第二階段直接跳過請求,進一步提升效率。

03 全局延遲物化算法

為系統支持兩階段數據訪問,Doris 優化器引入了全新的全局延遲物化算法。該算法在編譯階段自動識別可延遲讀取的列,從而在保證語義正確的前提下,最大限度減少第一階段的數據掃描量。其執行流程可概括如下:

  • 列集合劃分:優化器將需要訪問的列分為關鍵列集 K 和 延遲列集 D。K 列是在第一階段需要讀取的列,D 列是需要在第二階段延遲讀取的列。
  • 自頂向下遍歷:算法自頂向下遍歷執行計劃數的每個算子,將需要參與計算的列(如條件過濾列,Join 列等)加入到 K 集合中,其餘列加入到 D 集合中。
  • 字段轉換:如遇到投影節點(Projection Operator)或集合操作節點(Set Operator)等產生字段變化的節點,則會將 K 中相應的字段轉換成下層節點的字段。
  • 結果推導:最終推導出 Scan 節點需讀取的 K 集合,以及上游各算子對應的 D 集合。

以如下執行計劃片段為例:

FILTER(x > 10) --> PROJECT(a+b as x) --> SCAN(T)
  • FILTER 節點依賴列 x,因此將 x 加入 K。
  • PROJECT 節點將 x 映射為底層表達式 a + b,因此從 K 中移除 x,並加入 ab
  • 最終傳遞至 SCAN 的 K 集合為 {a, b},即僅需在第一階段讀取列 ab

該算法在語法樹層面實現了列讀取的智能推遲,為高效的兩階段執行奠定了編譯基礎。

結束語

TopN 優化極大地強化了從海量數據中高效提取核心信息的能力,可廣泛應用於實時排行榜、熱點分析、銷量統計、告警排序等高價值業務場景

在方案設計過程中,我們也研究了業界其他系統的實現思路。以 DuckDB 為例,其在處理單表 TopN 時,會將其轉換為一個特殊的semi Join 操作:左節點去掃描整表,右節點在掃描排序列後取出其 TopN 行,並且會藉助 Runtime Filter 減少左表掃描數據量。該方案的優勢在於複用了成熟的 Join 框架,但在某些場景下——例如排序列不是主鍵,或面對 Parquet 等格式的 Row Group 時——過濾效率可能受到影響,適用性存在一定邊界。

未來我們計劃進一步進行深度開發,包括

  • 集合運算(UNION/EXCEPT/INTERSECT)等複雜算子的 TopN 支持。
  • 動態自適應物化閾值調整。

我們將持續追蹤數據查詢領域的前沿技術,並不斷探索其在真實業務場景中的落地實踐,致力於為用户提供持續領先的查詢性能體驗。

"秀幹終成棟,精鋼不作鈎", 在“極致性能”的探索路上,Apache Doris 永不止步。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.