基于 Flink 構(gòu)建大規(guī)模實(shí)時(shí)風(fēng)控系統(tǒng)在阿里巴巴的落地
- 基于 Flink 構(gòu)建風(fēng)控系統(tǒng)
- 阿里風(fēng)控實(shí)戰(zhàn)
- 大規(guī)模風(fēng)控技術(shù)難點(diǎn)
目前 Flink 基本服務(wù)于集團(tuán)的所有 BU ,在雙十一峰值的計(jì)算能力達(dá)到 40 億條每秒,計(jì)算任務(wù)達(dá)到了 3 萬(wàn)多個(gè),總共使用 100 萬(wàn)+ Core ;幾乎涵蓋了集團(tuán)內(nèi)的所有具體業(yè)務(wù),比如:數(shù)據(jù)中臺(tái)、AI 中臺(tái)、風(fēng)控中臺(tái)、實(shí)時(shí)運(yùn)維、搜索推薦等。
01基于 Flink 構(gòu)建風(fēng)控系統(tǒng)
風(fēng)控是一個(gè)很大的話題,涉及到規(guī)則引擎、NoSQL DB、CEP 等等,本章主要講一些風(fēng)控的基本概念。在大數(shù)據(jù)側(cè),我們把風(fēng)控劃分成 3 × 2 的關(guān)系:
- 2 代表風(fēng)控要么是基于規(guī)則的,要么是基于算法或模型的;
- 3 代表包括三種風(fēng)控類型:事先風(fēng)控、事中風(fēng)控和事后風(fēng)控。
1.1 三種風(fēng)控業(yè)務(wù)
對(duì)于事中風(fēng)控和事后風(fēng)控來(lái)講,端上的感知是異步的,對(duì)于事先風(fēng)控來(lái)講,端上的感知是同步的。
對(duì)于事先風(fēng)控這里稍做一些解釋,事先風(fēng)控是把已經(jīng)訓(xùn)練好的模型或者把已經(jīng)計(jì)算好的數(shù)據(jù)存在 Redis 、MongoDB 等數(shù)據(jù)庫(kù)中;
- 一種方式是端上有類似 Sidden 、Groovy 、Drools 這樣的規(guī)則引擎直接去 Redis 、MongoDB 取數(shù)據(jù)來(lái)返回結(jié)果;
- 另外一種方式是基于 Kubeflow KFserving ,端上請(qǐng)求過(guò)來(lái)之后基于訓(xùn)練好的算法和模型返回結(jié)果。
整體來(lái)講這兩種方式的時(shí)延都在 200 毫秒左右,可以作為一個(gè)同步的 RPC 或 HTTP 請(qǐng)求。
對(duì)于 Flink 相關(guān)的大數(shù)據(jù)場(chǎng)景是一個(gè)異步的風(fēng)控請(qǐng)求,它的異步時(shí)效性非常低,通常是一秒或者兩秒。如果追求超低時(shí)延,則可以認(rèn)為它是一種事中的風(fēng)控,風(fēng)控決策過(guò)程可以由機(jī)器介入處理。
很常見(jiàn)的一種類型是用 Flink SQL 做指標(biāo)閾值的統(tǒng)計(jì)、用 Flink CEP 做行為序列規(guī)則分析,還有一種是用 Tensorflow on Flink ,在 Tensorflow 中進(jìn)行算法描述,然后用 Flink 來(lái)執(zhí)行 Tensorflow 規(guī)則的計(jì)算。
1.2 Flink 是規(guī)則風(fēng)控最佳選擇
目前 Flink 是阿里集團(tuán)內(nèi)的風(fēng)控最佳選擇,主要有三個(gè)原因:
- 事件驅(qū)動(dòng)
- 毫秒級(jí)的延遲
- 流批一體
1.3 規(guī)則風(fēng)控三要素
在規(guī)則風(fēng)控里面有三個(gè)要素,后面講的所有內(nèi)容都是圍繞這三者展開的:
- 事實(shí) Facts:是指風(fēng)控事件,可能來(lái)自業(yè)務(wù)方或者日志埋點(diǎn),是整個(gè)風(fēng)控系統(tǒng)的輸入;
- 規(guī)則 Rules:往往是由業(yè)務(wù)側(cè)來(lái)定義,即這個(gè)規(guī)則要滿足什么樣的業(yè)務(wù)目標(biāo);
- 閾值 Threshold:規(guī)則所對(duì)應(yīng)描述的嚴(yán)重程度。
1.4 Flink 規(guī)則表達(dá)增強(qiáng)
對(duì)于 Flink 來(lái)說(shuō),可以分成無(wú)狀態(tài)規(guī)則和有狀態(tài)規(guī)則兩類,其中有狀態(tài)規(guī)則是 Flink 風(fēng)控的核心:
- 無(wú)狀態(tài)規(guī)則:主要是做數(shù)據(jù)的 ETL,一種場(chǎng)景是當(dāng)某個(gè)事件的一個(gè)字值段大于 X 就觸發(fā)當(dāng)前的風(fēng)控行為;另一種場(chǎng)景是 Flink 任務(wù)的下游是一個(gè)基于模型或算法的風(fēng)控,在 Flink 側(cè)不需要做規(guī)則判斷,只是把數(shù)據(jù)向量化、歸一化,例如多流關(guān)聯(lián)、Case When 判斷等把數(shù)據(jù)變成 0/1 的向量,然后推送到下游的 TensorFlow 做預(yù)測(cè)。
- 有狀態(tài)規(guī)則:
- 統(tǒng)計(jì)型規(guī)則:基于統(tǒng)計(jì)分析的計(jì)算規(guī)則,比如 5 分鐘以內(nèi)訪問(wèn)次數(shù)大于 100 次,則認(rèn)為觸發(fā)了風(fēng)控;
- 序列型規(guī)則:事件序列中,某事件對(duì)前序后序事件有影響,比如點(diǎn)擊、加入購(gòu)物車、刪掉三個(gè)事件,這種連續(xù)的行為序列是一個(gè)特殊行為,可能認(rèn)為這個(gè)行為在惡意降低商家商品的評(píng)價(jià)分?jǐn)?shù),但這三個(gè)事件獨(dú)立來(lái)看并不是一個(gè)風(fēng)控事件;阿里云實(shí)時(shí)計(jì)算 Flink 完善了基于序列的規(guī)則能力,為云上和集團(tuán)內(nèi)的電商交易場(chǎng)景提供技術(shù)護(hù)航;
- 混合型規(guī)則:統(tǒng)計(jì)型和序列性兩者組合。
02阿里風(fēng)控實(shí)戰(zhàn)
本章主要介紹阿里在工程上是如何滿足上面提到的風(fēng)控三要素。
從整體的技術(shù)來(lái)看,目前分成感知、處置和洞察三個(gè)模塊:
- 感知:目的是感知所有的異常以及提前發(fā)現(xiàn)問(wèn)題,比如捕捉一些與常見(jiàn)數(shù)據(jù)分布不同的數(shù)據(jù)類型,并輸出這種異常的列表;又比如說(shuō)某年因?yàn)轵T行政策的調(diào)整頭盔銷售量升高,連帶著就會(huì)出現(xiàn)相關(guān)產(chǎn)品的點(diǎn)擊率、轉(zhuǎn)化率上升,這種情況需要及時(shí)被感知捕捉到,因?yàn)樗且粋€(gè)正常的行為而非作弊;
- 處置:即如何做規(guī)則的執(zhí)行,現(xiàn)在有小時(shí)、實(shí)時(shí)、離線三道防線,相比于之前單條策略的匹配,關(guān)聯(lián)和集成之后的準(zhǔn)確性會(huì)更高,比如就關(guān)聯(lián)最近一段時(shí)間內(nèi)某些用戶的持續(xù)行為來(lái)進(jìn)行綜合研判;
- 洞察:為了發(fā)現(xiàn)一些當(dāng)前沒(méi)有感知,同時(shí)也沒(méi)有辦法直接用規(guī)則描述的風(fēng)控行為,比如風(fēng)控需要對(duì)樣本進(jìn)行高度抽象來(lái)進(jìn)行表示,要先投影到合適的子空間,然后再結(jié)合時(shí)間維度在高維里面發(fā)現(xiàn)一些特征來(lái)做新異常的識(shí)別。
2.1 階段一:SQL 實(shí)時(shí)關(guān)聯(lián) & 實(shí)時(shí)統(tǒng)計(jì)
在這個(gè)階段有一個(gè)基于 SQL 評(píng)價(jià)風(fēng)控系統(tǒng),用簡(jiǎn)單的 SQL 做一些實(shí)時(shí)的關(guān)聯(lián)、統(tǒng)計(jì),比如用 SQL 進(jìn)行聚合操作 SUM(amount) > 50 ,其中規(guī)則就是 SUM(amount),規(guī)則對(duì)應(yīng)的閾值是 50;假設(shè)現(xiàn)在有 10、20、50、100 這 4 種規(guī)則同時(shí)在線上運(yùn)行,因?yàn)閱蜦link SQL作業(yè)只能執(zhí)行一種規(guī)則,那么就需要為這4個(gè)閾值分別申請(qǐng) 4 個(gè) Flink Job。優(yōu)點(diǎn)是開發(fā)邏輯簡(jiǎn)單,作業(yè)隔離性高,但缺點(diǎn)是極大浪費(fèi)計(jì)算資源。
2.2 階段二:Broadcast Stream
階段一的風(fēng)控規(guī)則主要問(wèn)題是規(guī)則和閾值不可變,在 Flink 社區(qū)目前會(huì)有一些解決方案,比如基于 BroadcastStream 來(lái)實(shí)現(xiàn),在下面的圖中 Transaction Source 負(fù)責(zé)事件的接入,Rule Source 則是一個(gè)BroadcastStream,當(dāng)有新的閾值時(shí)可以通過(guò) BroadcastStream 廣播到各個(gè)算子。
舉個(gè)例子,判斷在一分鐘以內(nèi)連續(xù)訪問(wèn)超過(guò) 10 次的風(fēng)控對(duì)象,但是在 618 或雙 11 可能要把它變成 20 或 30 次,才會(huì)被風(fēng)控系統(tǒng)下游的在線系統(tǒng)感知到。
如果在第一階段的話,只有兩種選擇:第一種是所有的作業(yè)全量在線上跑;第二種是在某一刻停止掉一個(gè)Flink作業(yè),新拉起一個(gè)基于新指標(biāo)的作業(yè)。
如果是基于 BroadcastStream 就可以實(shí)現(xiàn)規(guī)則指標(biāo)閾值的下發(fā),直接修改線上指標(biāo)閾值而不需要作業(yè)重啟。
2.3 階段三:Dynamic CEP
階段二的主要問(wèn)題是只能做到指標(biāo)閾值的更新,雖然它極大的方便了個(gè)業(yè)務(wù)系統(tǒng),但實(shí)際上很難滿足上層業(yè)務(wù)。訴求主要有兩個(gè):結(jié)合 CEP 以實(shí)現(xiàn)行為序列的感知;結(jié)合 CEP 后依然能做到動(dòng)態(tài)修改閾值甚至是規(guī)則本身。
階段三,阿里云 Flink 做了 CEP 相關(guān)的高度抽象,解耦了 CEP 規(guī)則和 CEP 執(zhí)行節(jié)點(diǎn),也就是說(shuō)規(guī)則可以存在 RDS、Hologres 等外部第三方存儲(chǔ)里,CEP 作業(yè)發(fā)布上去之后,就可以加載數(shù)據(jù)庫(kù)中的 CEP 規(guī)則來(lái)做到動(dòng)態(tài)替換,因此作業(yè)的表達(dá)能力會(huì)增強(qiáng)。
其次是作業(yè)的靈活性會(huì)增強(qiáng),比如想看到某一個(gè) APP 下面的一些行為并對(duì)這個(gè)行為的指標(biāo)閾值做更新,可以通過(guò)第三方存儲(chǔ)更新 CEP 規(guī)則而非 Flink 本身。
這樣做還有一個(gè)優(yōu)勢(shì)是可以把規(guī)則給暴露給上層業(yè)務(wù)方,來(lái)讓業(yè)務(wù)真真正正的撰寫風(fēng)控規(guī)則,我們成為一個(gè)真正的規(guī)則中臺(tái),這就是動(dòng)態(tài) CEP 能力所帶來(lái)的好處。在阿里云的服務(wù)中,動(dòng)態(tài) CEP 能力已經(jīng)被集成在最新版本中,阿里云全托管 Flink 服務(wù)極大的簡(jiǎn)化了風(fēng)控場(chǎng)景的開發(fā)周期。
2.4 階段四:Shared Computing
在階段三的基礎(chǔ)上再往前一步,阿里云實(shí)踐出 "共享計(jì)算" 的解決方案。這套共享計(jì)算的方案中,CEP 規(guī)則完全可以被建模平臺(tái)來(lái)描述,暴露給上層客戶或業(yè)務(wù)方一個(gè)非常友好的規(guī)則描述平臺(tái),可以通過(guò)類似拖拉拽或者其他的方式進(jìn)行耦合,然后在調(diào)度引擎上選擇事件接入源來(lái)運(yùn)行規(guī)則。比如現(xiàn)在兩個(gè)建模都是服務(wù)于淘寶 APP,完全可以落到同一個(gè) Fact 的 Flink CEP 作業(yè)上,這樣就可以把業(yè)務(wù)方、執(zhí)行層和引擎層完全解耦。當(dāng)前阿里云共享計(jì)算的解決方案已經(jīng)非常成熟,有豐富的客戶落地實(shí)踐。
2.5 階段五:業(yè)務(wù)開發(fā)和平臺(tái)建設(shè)分離
在引擎?zhèn)取⑵脚_(tái)側(cè)和業(yè)務(wù)側(cè)三方之間,階段四可以做到引擎?zhèn)群推脚_(tái)側(cè)之間的解耦,但是對(duì)業(yè)務(wù)側(cè)來(lái)講依然是高度綁定的。兩者的工作模式依然是甲方和乙方的協(xié)同關(guān)系,即 業(yè)務(wù)側(cè)掌握著業(yè)務(wù)規(guī)則,平臺(tái)側(cè)接受業(yè)務(wù)團(tuán)隊(duì)的風(fēng)控需求,從而進(jìn)行風(fēng)控規(guī)則的開發(fā)。但平臺(tái)團(tuán)隊(duì)通常人員優(yōu)先,而業(yè)務(wù)團(tuán)隊(duì)隨著業(yè)務(wù)發(fā)展會(huì)越來(lái)越壯大。
這個(gè)時(shí)候業(yè)務(wù)側(cè)本身可以抽象出來(lái)一些基本概念,沉淀出一些業(yè)務(wù)共性的規(guī)范,并組裝成一個(gè)比較友好的 DSL ,然后通過(guò)阿里云完全解耦的 Open API 實(shí)現(xiàn)作業(yè)的提交。
由于要同時(shí)支持集團(tuán)內(nèi)接近 100 個(gè) BU,沒(méi)有辦法為每一個(gè) BU 都做定制化的支持,只能把引擎的能力盡可能的開放出去,然后業(yè)務(wù)側(cè)通過(guò) DSL 的封裝提交到平臺(tái)上,真正做到了只暴露一個(gè)中臺(tái)給客戶。
03大規(guī)模風(fēng)控技術(shù)難點(diǎn)
本章主要介紹一些大規(guī)模風(fēng)控的技術(shù)難點(diǎn),以及阿里云在全托管 Flink 商業(yè)化產(chǎn)品中如何突破這些技術(shù)難點(diǎn)。
3.1 細(xì)粒度資源調(diào)整
在流計(jì)算系統(tǒng)中,數(shù)據(jù)源往往不是阻塞的節(jié)點(diǎn)。上游的數(shù)據(jù)讀取節(jié)點(diǎn)由于沒(méi)有計(jì)算邏輯不存在性能問(wèn)題,下游的數(shù)據(jù)處理節(jié)點(diǎn)才是整個(gè)任務(wù)的性能瓶頸。
由于 Flink 的作業(yè)是以 Slot 來(lái)做資源劃分的,默認(rèn) Source 節(jié)點(diǎn)和工作節(jié)點(diǎn)具有相同的并發(fā)度。在這種情況下我們希望可以單獨(dú)調(diào)整 Source 節(jié)點(diǎn)和 CEP 工作節(jié)點(diǎn)的并發(fā)度,比如在下圖中可以看到某個(gè)作業(yè)的 CEP 工作節(jié)點(diǎn)并發(fā)度可以達(dá)到 2000,而 Source 節(jié)點(diǎn)則只需要 2 個(gè)并行度,這樣可以極大的提升 CEP 節(jié)點(diǎn)的工作性能。
另外是對(duì) CEP 工作節(jié)點(diǎn)所在的 TM 內(nèi)存、CPU 資源的劃分,在開源 Flink 中 TM 整體同構(gòu)的,也就是說(shuō) Source 節(jié)點(diǎn)和工作節(jié)點(diǎn)是完全相同的規(guī)格。從節(jié)省資源的角度考慮,真實(shí)生產(chǎn)環(huán)境下 Source 節(jié)點(diǎn)并不需要 CEP 節(jié)點(diǎn)一樣多的內(nèi)存、CPU 資源, Source 節(jié)點(diǎn)只需要較小的 CPU 和內(nèi)存就已經(jīng)能夠滿足數(shù)據(jù)抓取。
阿里云全托管 Flink 可以實(shí)現(xiàn)讓 Source 節(jié)點(diǎn)和 CEP 節(jié)點(diǎn)運(yùn)行在異構(gòu)的 TM 上,即 CEP 工作節(jié)點(diǎn) TM 資源顯著大于 Source 節(jié)點(diǎn) TM 資源,CEP 工作執(zhí)行效率會(huì)變得更高??紤]細(xì)粒度資源調(diào)整帶來(lái)的優(yōu)化,云上全托管服務(wù)相比自建 IDC Flink 可節(jié)約 20% 成本。
3.2 流批一體 & 自適應(yīng) Batch Scheduler
流引擎和批引擎如果沒(méi)有采用相同一套執(zhí)行模式往往會(huì)遇到數(shù)據(jù)口徑不一致的情況,出現(xiàn)這種問(wèn)題的原因是流規(guī)則在批規(guī)則下很難真正的完全描述出來(lái);比如在 Flink 中有一個(gè)特殊的 UDF,但是在 Spark 引擎中卻并沒(méi)有對(duì)應(yīng)的 UDF。當(dāng)這種數(shù)據(jù)口徑不一致的時(shí)候,選擇哪一方面的數(shù)據(jù)口徑就成為了一個(gè)非常重要的問(wèn)題。
在 Flink 流批一體的基礎(chǔ)上,用流模式描述的 CEP 規(guī)則,完全可以在批模式下以相同的口徑再跑一次并得到一樣的結(jié)果,這樣就不需要再去開發(fā)批模式相關(guān)的 CEP 作業(yè)。
在此之上,阿里實(shí)現(xiàn)了自適應(yīng)的 Batch Scheduler。其實(shí) CEP 規(guī)則每天的效果產(chǎn)出并不一定是均衡的,比如說(shuō)今天的行為序列中并沒(méi)有任何異常行為,下游只有很少的數(shù)據(jù)輸入,此時(shí)會(huì)為批分析預(yù)留一個(gè)彈性的集群;當(dāng) CEP 的結(jié)果很少時(shí),下游的批分析只需要很小的資源,甚至每個(gè)批分析工作節(jié)點(diǎn)的并行度都不需要在一開始的時(shí)候就指定,工作節(jié)點(diǎn)可以根據(jù)上游數(shù)據(jù)的輸出以及任務(wù)負(fù)載來(lái)自動(dòng)調(diào)整批模式下的并行度,真正做到了彈性批分析,這是阿里云 Flink 流批一體 Batch Scheduler 的獨(dú)特優(yōu)勢(shì)。
3.3 合并讀取降低公共層壓力
這是在實(shí)踐中遇到的問(wèn)題,當(dāng)前的開發(fā)模式基本都是基于數(shù)據(jù)中臺(tái)的,比如實(shí)時(shí)數(shù)倉(cāng)。在實(shí)時(shí)數(shù)倉(cāng)的場(chǎng)景下,數(shù)據(jù)源可能不會(huì)很多,但是中間層 DWD 會(huì)變得很多,中間層可能會(huì)被演化成很多 DWS 層,甚至也會(huì)演變成很多數(shù)據(jù)集市給到各個(gè)部門來(lái)使用,這種情況下單表的讀取壓力會(huì)很大。
通常多個(gè)源表彼此關(guān)聯(lián)(打?qū)挘亩纬梢粋€(gè) DWD 層 ,從單個(gè)源表的視角看,它會(huì)被多個(gè) DWD 表依賴。DWD 層也會(huì)被多個(gè)不同業(yè)務(wù)域的作業(yè)消費(fèi)形成 DWS?;谶@種情況阿里實(shí)現(xiàn)了基于 Source 的合并,只需要讀一次 DWD 在 Flink 側(cè)會(huì)幫你加工成多張業(yè)務(wù)域的 DWS 表,可以非常大的減緩對(duì)公共層的執(zhí)行壓力。
3.4 KV 分離設(shè)計(jì)的狀態(tài)后端
CEP 節(jié)點(diǎn)在執(zhí)行的時(shí)候,會(huì)涉及到非常大規(guī)模的本地?cái)?shù)據(jù)讀取,尤其是在行為序列的計(jì)算模式下,因?yàn)樾枰彺媲懊嫠械臄?shù)據(jù)或者是一定時(shí)間內(nèi)的行為序列。
在這種情況下,比較大的一個(gè)問(wèn)題是對(duì)后端狀態(tài)存儲(chǔ)(比如:RocksDB)有非常大的性能開銷,進(jìn)而會(huì)影響 CEP 節(jié)點(diǎn)的性能。目前阿里實(shí)現(xiàn)了 KV 分離設(shè)計(jì)的狀態(tài)后端,阿里云 Flink 默認(rèn)使用 Gemini 作為狀態(tài)后段,CEP 場(chǎng)景下實(shí)測(cè)性能至少有 100% 的提升。
3.5 維度數(shù)據(jù)分區(qū)加載
風(fēng)控在很多情況下是要基于歷史行為來(lái)做分析的,歷史的行為數(shù)據(jù)一般都會(huì)存在 Hive 或 ODPS 表里,這個(gè)表的規(guī)??赡苁?TB 級(jí)別的。開源的 Flink 默認(rèn)需要在每一個(gè)維表節(jié)點(diǎn)上加載這個(gè)超級(jí)大的維度表,這種方式實(shí)際上是不現(xiàn)實(shí)的。阿里云實(shí)現(xiàn)了基于 Shuffle 來(lái)做內(nèi)存數(shù)據(jù)的分割,維表節(jié)點(diǎn)只會(huì)加載屬于當(dāng)前這個(gè) Shuffle 分區(qū)的數(shù)據(jù)。