摘要:本文整理自阿里云開源大數據平臺技術專家畢巖(尋徑)在 Apache Con ASIA 的分享。本篇內容主要分為四個部分:
1.湖格式& Hudi & CDC
2.湖格式設計實現 CDC 的思考
3.Hudi CDC 實現
4.湖格式 Streaming 的優化
2021年中 Databricks 發布了一篇基于 Delta Lake 實現 CDC 場景的介紹文檔,2022年初我們在阿里云EMR 內部 Delta Lake 版本實現的 CDC 的能力,同期在 Apache Hudi 提案了 Hudi 基于 Spark 實現 CDC 的設計文檔和實現代碼。
結合這些經驗,今天以 Apache Hudi 為主,分享一下數據湖格式上實現 CDC 的一些思考和注意點,以及一些流式 streaming 通用的優化點,和 Hudi CDC 的后續規劃。
湖格式 & Hudi & CDC
該部分主要介紹下此次分享涉及到的一些概念,包括湖倉、數據湖格式、Apache Hudi,以及 Change Data Capture(CDC)的一些需要了解的東西。
湖倉 & 湖格式
相信大家對于數據倉庫,數據湖,進而到兩者結合的湖倉的演變有了一些了解,這里就不過多介紹了。湖倉(LakeHouse)有以下一些關鍵特性。

介紹幾個關鍵特性:
- ACID 事務:同一張表經常會同時有多個工作流來讀寫,事務保證了我們能夠讀、寫到正確的數據;
- Schema Enforcement 和數據管理:可以加上 Schema Evolution。Enforcement 在 Databicks 相關文章解釋上等同于 Valication,在寫入數據時,嚴格檢測 schema 并要求和目標表定義的一致。Schema Evolution 允許修改表的字段信息(如增刪字段,修改字段類型和描述等)。另外,湖倉還應提供健壯的湖表治理和審計的能力;
- 支持結構/非結構數據,支持多類API:湖倉架構能夠支持半/非結構化數據(如JSON,圖像,語音等)的存儲,以及提供除了基本SQL之外豐富的API來處理數據,應用在如機器學習等場景;
- 批流一體:數據湖提出之初,很重要的就是替代 Lambda 架構,批流一體能夠有效的簡化流式和離線兩條數據鏈路的開發和運維成本;
- 存算分離:成本是各個公司都需要關注的。如果存儲和計算都能按需伸縮,會更便于精細化控制成本。
我們發現大部分的湖倉關鍵特性是需要由底層存儲之上的數據組織方式,即數據湖格式來提供的,我認為這也是 DeltaLake、Apache Hudi,Apache Iceberg 近兩年興起的主要背景和原因吧。
Apache Hudi

Apache Hudi 是一個構建于自管理數據庫層之上的,使用增量數據流來構建數據湖的一個功能豐富的平臺。相對于其他湖格式,Hudi具備更細粒度的數據布局(FileGroup),支持多種索引提升 Upsert 性能,以及在開源版本上較為豐富的自動化湖表管理能力。
CDC
Change Data Capture:定義了一種場景,即識別并捕獲數據庫表中數據的變更,并交付給下游進一步處理。CDC是對針對行級數據記錄的。其中數據的變更信息,即 CDC 的數據結構,包括變更是什么樣的操作(有三類:insert,update,delete),變更發生的時間點,以及變更前后的數據。顯然對于insert操作該記錄的變更信息中是沒有舊值的,對于 delete 操作該記錄的變更信息中是沒有新值(當前值)的。

CDC 典型方法
CDC 不是數據湖格式特有的概念和場景,它存在已久。并且在傳統數據庫有些一些典型的方法:
- 時間戳/版本號:是在表上添加一個類似于 created_time 和 last_modified_time 這樣的字段,標識記錄的創建時間和最新修改時間,查詢時根據 modified_time 做過濾,得到變化的數據。這個方法有幾個明顯的缺點:
- 不能感知到delete的變化。
- 不能直接獲取得到update的舊值,因此這類方案僅適用于沒有delete操作,且不關注舊值的業務場景。
- 由于沒有快照或者版本的概念,不能準確的捕捉每次變更。
CDC 場景示例

上圖定義了一個通用的數倉場景,user_city_tbl 和 user_name_tbl 維表做 Join 操作將打寬后的數據更新到 user_name_city_tbl中(類似于 ODS 到 DWD 的鏈路)。
user_name_city_tbl 按 city 聚合統計出城市的常駐人口后同步成 city_population_tbl(類似 DWD 到 DWS 的鏈路)。
在沒有實際 CDC 功能的場景下,傳統數倉的解決方案一般是基于 Kafka 里的流式數據生成 ODS、DWD、DWS 的各層數據,便于查詢實時的數據,同時小時/天粒度做離線的全表/分區的ETL修復數據用于歷史查詢使用。如上例離線方式,user_city_tbl 每次更新后,需要全表和 user_name_tbl 做 Join,然后 overwrite 到 user_name_city_tbl,后續也需要全表按 city 聚合 overwrite 到完整的 city_population 表中。很難兼得增量數據處理、實時性、低管理運維成本這幾個方面。
在具備 CDC 的場景下,只需要一個流式 workflow 即可。如 user_city 表變更后得到change data,僅將這部分數據和 user_name 表做 Join,就可以針對 user1 更新 city 信息,將新數據 user5 對應的記錄通過 upsert 語法(Merge Into)同步至 user_name_city_tbl 中。同時得到 user_name_city_tbl 的變更數據(user1的 city 從 bj 變更為 hz,新增 user5 記錄及對應的 name 和 city 的值),實現對 bj city 的人口數減一,對 hz 和 wh 兩個 city 字段加一。僅使用一條流式 workflow,能夠實時的將增量變更信息同步更新至下游。
舉例補充另外一個場景:銀行通過獲取當天凌晨和前一天凌晨之間的賬戶金額變更來檢測賬戶的健康程度。若有兩個賬戶,A全天賬戶沒任何操作,而賬戶B全天操作上百次,但最終金額也沒有變化。如果單純的比較前一天凌晨和當天的賬戶金額是否變化是沒辦法判斷賬戶的健康指標的。對于類似場景,CDC 的解決方案中需要具備追溯每一次的變更的能力。
綜合這些場景,梳理了 CDC 這個場景應該具備以下能力:
- 變更信息中應該包含所有的表字段,不能僅包括主鍵的;
- 對于 delete/update 操作,應該包括操作前的舊值;
- 能追溯每一次的變更。
CDC 輸出格式

最后,我們來看下 CDC 的輸出格式。
關于 CDC 的輸出格式,我認為只要包含了全局的變更信息,包括操作類型,操作時間或對應版本,以及前后值就足夠。這里列出了典型的 debezium 的格式,和上面場景示例中使用的格式,也是 deltalake 所采用的。debezium 是一個較為通用的格式,便于集成到已有系統中,而在部分場景 deltalake 自定義的格式對SQL查詢更友好。
湖格式設計實現 CDC 的思考
CDC 設計

前面介紹了在數據庫中典型的 CDC 方法,但數據湖格式較數據庫、傳統基于 Hive 的數倉還是有很大不同的。湖格式的特點:
- 支持多版本/多快照?;谶@個特性,至少可以使用表 Diff 的方法來獲取 CDC 數據。
- 每個版本準確的映射到一組有效數據文件,且單次操作會在文件元數據層面記錄數據文件的變更,即每次操作(如insert,merge into等)會將新增數據文件標記為有效的,歷史部分數據文件在當前版本被標記為無效。
- 沒有常駐服務,操作依賴現有的計算引擎 Spark 或者 Flink。沒有后臺處理進程,那類似于數據庫觸發器的方法,是沒辦法由后臺執行,如果需要只能在當前操作時間內完成,增加部分寫開銷。
在設計湖格式支持 CDC 需要考慮到:
- CDC 各場景能力的覆蓋,包括上述提到的較為復雜的。至于提供的信息是否被需要由下游場景決定,但湖格式本身要能將需要的 CDC 相關信息都交付給下游。
- 讀寫性能。類似于觸發器或者事務日志的方式,需要記錄額外的信息,要考慮到對寫入性能的影響;同樣對于表 Diff 這樣的方式,要考慮到查詢讀取性能。
- 約定輸出格式。

傳統數據庫的 CDC 方法中,時間戳或者版本號的方法在場景支持上的缺陷;湖格式沒有自己常駐服務,不存在事務日志,因此我們先來看下表 Diff 的實現方式。其原理就是基于湖格式本身的 time-travel 查詢歷史版本的能力,和后續的版本逐一做 Diff 得到 CDC 數據。優點是不會增加寫入開銷,缺點是查詢性能差。如果我們想要進一步優化查詢性能,可能的思路就是降低 Diff 或 Join 的粒度,從最差做全表的 Diff,到僅做當前 commit 涉及到的分區級 Diff,再進一步按桶等等。DeltaLake 可以通過當前 commit 內新增文件和被標記無效的文件之間來做。Hudi 本身具備分區級更小的 FileGroup 文件組的概念,也可以減少 Diff 的數據量。
再看一下 DeltaLake 在 CDC 場景的實現方案 CDF,Change Data Feed。其核心在于數據寫入時直接持久化 CDC 數據。類似于數據庫 trigger 的方式,直接保存 CDC 需要的所有信息,查詢時直接加載這部分數據。優點是查詢性能最好,缺點是增加了寫開銷。如果繼續在這個方案下做一些優化:
- 根據不同的操作,避免每次commit都持久化。比如數據首次寫入表,那么所有的數據本身一定都是insert操作類型的。這種就不需要再額外雙寫一份CDC數據。
- 如果表有主鍵,那可以僅持久化主鍵,然后將前后兩個版本含主鍵的數據加載成兩個map結構,分別以點查的方式獲取舊值和當前值。若前值為空,該變更為insert操作,若新值為空,該操作為delete操作,否則即為update操作;這樣減少持久化的數據量,也就減少了寫時開銷;當然也可以同時持久化記錄的操作類型,來更準確的獲取舊值和當前值。
我們結合以下兩個場景來考慮這兩個方案:
- 上游數據完成一次 commit,下游何時消費是不確定的,很可能一次性消費幾個 commit 的 CDC,采用表 Diff 的方式,需要 Diff 每兩個相鄰版本,性能隨 commit 的數量成倍下降;
- 實際的 streaming 場景,每 batch 更新全表數據量占比很小,我接觸的場景占比小于0.0003。當然不同場景占比是不同的,但顯然較低的占比,使得寫入時增加的開銷是可以接受的。
基于以上考慮,Databricks 和阿里云EMR 在 DeltaLake 的實現都采用了 CDF 的方案,同時阿里云EMR 團隊貢獻到 Apache Hudi 社區的也是基于此實現。
CDC 實現

確定了以 Change Data Feed 作為設計方案后,就需要考慮是具體實現上的注意事項了。
首先是寫入。
- 針對不同湖格式各類寫操作,明確涉及到的文件元數據變化。CDF 方案下可優化的第一點是根據不同的操作,判斷是否需要持久化 CDC 數據。也就是一部分操作的變更信息直接讀取 CDC 文件,而另一部分操作的表更信息就從普通的數據文件中提取轉換。但是不同的湖格式對不同的操作有自己的定義,因此要具體湖格式具體分析。比如 DeltaLake 的 Insert Into/Insert Overwrite、Update,Delete,Merge 等操作就是我們正常認知的。而 hudi 由于引入了主鍵 recordKey 和比較建 preCombineField 的概念,即使是簡單的 Insert Into 的 SQL 語法可能對應實際邏輯是 Update 操作。
- 針對 CDC 場景涉及到的寫操作,明確需要拓展 CDF 能力的場景。湖格式內置很多湖表管理操作,如 DeltaLake 的 vacuum 和 Hudi 的 Clean 用于清理歷史數據, DeltaLake 的 optimize 和 Hudi 的 clustering 用于合并小文件和做 zorder,Hudi 的 compaction 針對 mor 表合并增量數據文件等,這些操作都不會涉及到實際表的數據文件,僅僅是清理文件或者對數據重分布而已,是不需要關注和處理的,在查詢時遇到這些操作,可直接忽略。而其他 DML 操作是直接修改表的數據,需要感知并處理的。這里舉例說明下:
- 對于 Insert Into 操作,DeltaLake 不會讀取任何其他已有文件,僅新增數據文件。因此 DeltaLake 實現 CDF 方案執行 Insert Into 不需額外再持久化,而僅需查詢時加載到這批文件,將數據轉換成約定的輸出格式。但在 Hudi 內由于有數據 combine 的邏輯或者將數據寫入現存的小文件的優化,會讀取已有文件再重寫,因此就需要持久化 CDC 數據。
- 對于 drop partition 操作,DeltaLake(社區版本不支持 drop partition,阿里云EMR 版本支持)和 Hudi 都直接將該分區的所有數據文件都標記為刪除。這樣也不需要持久化任何信息,查詢時找到這些文件,加載并將每條記錄標記為delete的操作類型,添加上其他如時間戳信息即可。
- 對于 update 操作,最底層的操作一定是加載滿足 where 條件記錄的數據文件,更新滿足 where 條件的那部分數據,然后連同未修改的數據直接一起寫入到新文件。這樣的情況下,就需要拓展 CDF 能力。
最后在具體實現上,我們還要注意以下幾點:
- 持久化的 cdc 數據需要保存到文件系統中,可能會改變原本的表的文件布局,這樣的改變可能會對其他操作語義,如湖管理功能造成影響,必要時需要聯動調整;
- 額外的 cdc 寫入操作依然要保證 ACID 的語義。

基于 CDF 方案的實現了寫操作后,查詢變更數據時就會遇到這三類文件:
- 持久化的cdc數據文件。在正常情況下,CDC 數據文件記錄了完全的 cdc 數據信息,包括變化數據的操作類型,舊值和新值,可以直接加載讀取返回。
- 全為新數據的文件。如 Insert Into 操作引起的,查詢時對每條來自這樣文件的數據添加上值為insert的操作類型字段,和其他信息。
- 全為被刪除的文件。如 drop partition 操作引起的,查詢時對每條來自這樣文件的數據添加上值為delete的操作類型字段,和其他信息。
- 上圖右側為抽象的一個數據結構 CDCFileSplit,主要的字段是 cdcFileType 和 filePath:
- filePath 為 CDC 查詢時涉及到的數據文件,可能為 CDC 數據文件或者普通的數據文件。
- cdcFileType 標識 filePath 是哪類文件,也因此決定了如何從該文件中抽取解析 cdc 數據的具體邏輯。
- 在討論 CDF 方案可優化點時提到,對于有主鍵概念的表可以僅持久化主鍵(和操作類型)來減少寫時開銷。那么就需要另外兩個包含了該主鍵對應的舊值和當前值的數據文件 preImageFilePath 和 postImageFilePath,來平衡對寫時開銷較為在意的場景。
Hudi CDC 實現
結合 Apache Hudi 我們來看下具體實現。
Hudi CDC Write 實現

由于引入了主鍵和比較鍵的概念,Hudi 抽象了自己的寫操作類型,如上圖左側所示。其中有普通 Insert/Upsert/Insert Overwrite 等常規寫操作,也有 Cluster/Cmpact 等這樣的湖管理操作。同上面提到寫時優化的一些思考,我們僅需要關注標紅的普通寫操作,而可以直接忽略湖表管理操作。
同樣由于有了自己的寫操作語義,Hudi 抽象了兩類寫處理方式:其中HoodieWriteHandle(上圖中有筆誤)的子類 HoodieMergeHandler 是數據執行 upsert 的核心邏輯,將新數據和同一個主鍵的老數據(如果存在)合并,最終寫入數據文件。而 HoodieWriteHandle 的其他子類處理的是非合并場景下的寫入操作,如 bulk_insert 等。因此 HoodieMergeHandle 也就是我們之前提到的在必要的場景下要拓展 CDF 能力的地方。

DeltaLake 和 Hudi 的 CDC 查詢流程基本一致,但由于數據布局的不同,在實現細節上也有不同。以 Hudi 為例來看一下湖格式上完整的 cdc 查詢流程:
- 根據請求指定的 start,end 區間,獲取關聯到的 commit 信息,然后根據 commit 中的寫操作過濾掉湖表管理這些不影響數據的 commit;
- 根據每個 commit 的寫操作,或是讀取 cdc 數據文件,或是加載當前版本的數據文件,或是加載前一個版本的數據文件,得到一個類似于前面提到的 CDCFileSplit 的對象列表(對應到 Hudi 代碼中的 HoodieCDCFileSplit)。
- 按 CDCFileSplit 中的 cdcFileType 定義的加載策略,從文件中提取、解析成 cdc 數據,直接 union 返回。
Hudi CDC 使用示例

Hudi 端開啟 CDC 的方式也是很簡單的。建表 SQL 時或者 Spark Dataframe 寫入時開啟 hoodie.table.cdc.enabled 參數即可。這樣數據寫入時會自動持久化必要的 CDC 數據。查詢時指定 cdc 的查詢類型,及 start 和 end 的區間。
補充說明:
- 按當前實現查詢時需要將 query.type 的配置調整為:
hoodie.datasource.query.type = incremental hoodie.datasource.query.incremental.format = cdc
- 同時支持 CDC 持久化選擇持久化的類型,參數為:
hoodie.table.cdc.supplemental.logging.name
默認值為 data_before_after,會持久化所有 CDC 數據,查詢時不再需要加載其他數據文件,利用提高 CDC 查詢效率。其他可選值為 op_key_only 和 data_before,僅持久化操作類型和主鍵,或者額外多持久化舊值,這樣寫入時減少了一點 overhead,但查詢時需要加載其他數據文件來獲取確實的舊值當前值。
Hudi CDC 后續規劃
目前 Hudi 已經完整支持了 Flink 和 Spark 兩個引擎的 CDC 讀寫功能。
后續將會繼續拓展 Spark SQL 語法,便于查詢 CDC 數據;同時支持類似 DeltaLake 的扁平化的 CDC 輸出格式,給 Hudi 用戶另外一種選擇集成到自己的數倉場景中。

目前 Hudi 已經完整支持了 Flink 和 Spark 兩個引擎的 CDC 讀寫功能。
后續將會繼續拓展 Spark SQL 語法,便于查詢 CDC 數據;同時支持類似 DeltaLake 的扁平化的 CDC 輸出格式,給 Hudi 用戶另外一種選擇集成到自己的數倉場景中。
湖格式Streaming 的優化

CDC 大多還是用于 streaming 場景,構建增量實時數倉,遇到的問題也是 streaming 通用的。這里列舉兩類
- Apache Hudi 或者阿里云EMR 版本的 DeltaLake 等湖格式都具備表的自管理能力,如定期清理歷史數據文件的 vacuum(DeltaLake)和 clean(Hudi)操作,和合并小文件做 Zorder 優化的 optimize(DeltaLake)和 clustering(Hudi)操作。但這些如果放到 streaming 任務中某次 batch提交后操作,會占用當前 batch 的執行時間,影響寫入性能,甚至有些場景直接導致任務失敗,影響正常的寫入流程。
- Streaming 任務實現復雜:基于 Spark Streaming 開發流式任務目前需要通過 dataframe 的 API 來實現,沒有像離線場景的 SQL 語法那樣簡單。
針對這兩類問題,介紹以下阿里云EMR 團隊的解決方案。

應對第一類問題,阿里云EMR 在 Data Lake Formation 數據湖構建產品上支持了自動化湖表管理。
在實際生產使用中,我們可以在流式 workflow 中關閉 DeltaLake 或者 Hudi 的定期執行表管理的功能,推送 commit 信息到 DLF 服務端。DLF 會結合我們定義的策略,結合表的實時指標,來判斷和采取相應的湖表管理操作。
不同于定期執行的無腦式執行,DLF 可以精細化的感知湖格式表的狀態,比如實時分析歷史過期數據的占比,根據對應策略中的閾值判斷是否提交 clean 或 vacuum 任務來清理,再比如自動感知以時間分區的表的分區狀態,自動對剛完成寫入的分區執行小文件合并類的任務。另外,DLF 執行的湖管理任務是單獨啟動,不在當前的流式任務中,不影響正常的寫入和性能。

應對第二類 Streaming 任務實現復雜問題,阿里云EMR 拓展了 Spark 的 SQL 語法,實現了 StreamingSQL。
如上述示例,左側分別創建了 Hudi 目標表,和一張 Kafka 的源表。右側是拓展的 StreamingSQL 語法,CREATE SCAN 創建一個關聯到 Kafka 源表的流式視圖;CREATE STREAM 語法創建流,消費 Kafka 數據并按用戶給定的 SQL(示例中位 Merge Into 語法)完成寫入操作。這樣,我們可以極大的簡化任務的開發和運維成本。
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
鍋爐之家客服熱線:











