初探 Kubeflow: MLOps From Training to Deployment

Kubeflow 是一個架構在 Kubernetes 的 MLOps 平台,從開發到部署都擁有各種 solution,包含大家最愛用的 jupyter notebook(?)、支援各種模型訓練 framework 的 Trainer、超參數調整的 Katlib、以及整合各種推論 runtime 的 KServe,當然 Kubeflow 也積極整合各種 LLM 所需的推論架構。

ref: https://www.kubeflow.org/docs/started/images/ai-lifecycle-kubeflow.drawio.svg

在這個專案中,我自建了一個 kubernetes cluster,並且部署了一套 Kubeflow,並在其中嘗試模型開發、訓練與部署,最後也試著在 in-cluster, out-of-cluster 的環境發送模型的推論請求。

主要用到的 Kubeflow 服務包括:

  • Kubeflow Trainer:定義模型,並且在 cluster 裡面做 DDP 的模型訓練
  • Kubeflow KServe:部署模型,並且利用 Knative autoscaler 做 autosacling
  • Kubeflow Dashboard 與 Notebooks:開發與觀測,包括實驗、監控與協作環境

Github repo: kubeflow-mlops-starter

Step 0: Build Kind Cluster

Bash
kind create cluster --name=kubeflow  --config kubeflow-example-config.yaml

這邊直接用 kind 建立一個 local cluster,建好之後可以搭配 k9s 檢測 cluster 狀態。

Step 1: Deploy Kubeflow Platform

Kubeflow 的 components 稍微多了一點,我這邊是用 Kubeflow Manifests v1.0 的 branch 去做部署,有一些資源耗費較大或暫時用不到的部件,像是 Katib, Kubeflow Pipelines, Spark Operator,在這個專案裡就先沒有裝了。裝好之後,應該會看到 k9s 裡面有一堆 pod。

Step 2: Model Training

Kubeflow 的架構本身就是 model agnostic 的,所以 text, image, tabular data 自然都可以被處理,這邊我們嘗試利用一個廣告相關的資料訓練 CTR prediction model。這個資料集是 TaobaoAd_x1,主要就是利用 user, item features 去預測點擊機率 (binary classification),模型方面我們採用 Google 提出的 DCN v2,這樣子的模型在現代自然是一個很小的模型,但整個預測表現還是不錯的。

在訓練的部分,我們利用 Kubeflow Trainer 進行 torch model distributed training ( Distributed Data Parallel (DDP)),順帶一提,最近 Trainer 好像剛上 v2,某些 function 可能在新版會有不同的寫法。但總之,定義好 dataset 處理、模型、forward 之後就可以硬 train 一波了。

送出 Training job 之後,根據你定義的 worker 數量,k8 會啟動相對應的 pod,並且執行你定義的 training function,至於訓練的結果看是要送去 object storage,還是哪裡都是可以,我這邊就直接放到 kubeflow notebook 開的 PVC 裡面。模型訓練的好壞暫不是這個專案的重點,但 kubeflow 是有提供 tensorboard 的介面來視覺化模型的訓練過程,也方便實時監控。

Step 3: Define Custom Model

為了要讓 model 可以在 cluster 裡面部署,我們會需要準備 model inference 的 runtime,這邊參考了 KServe 的 Deploy Custom Python Serving Runtime with InferenceService。在這個專案裡,我們主要定義 preprocess handler 如何做 data preprocessing,其他細節就麻煩參考 GitHub Repo。

至於 image 的部分,我這邊用了 BuildPacks 去打包 image,雖然是蠻方便的,但我覺得用 docker 包可能會更容易用一點 XD

Bash
pack build --builder=heroku/builder:24 ${DOCKER_USER}/dcnv2:v1

Step 4:  Deploy the Service in Kubernetes

接下來,就可以利用剛包好的 image 搭配 KServe 來讓模型上線,實務上就是寫一個 yaml 來部署 InferenceService:

Bash
  apiVersion: serving.kserve.io/v1beta1
  kind: InferenceService
  metadata:
    name: dcnv2
    namespace: kubeflow-user-example-com
    annotations:
      sidecar.istio.io/inject: "false"
  spec:
    predictor:
      scaleTarget: 1
      scaleMetric: concurrency
      maxReplicas: 10
      containers:
        - name: kserve-container
          image: boboru/dcnv2:v1
          resources:
            requests:
              cpu: "100m"
              memory: "512Mi"
            limits:
              cpu: "1"
              memory: "1Gi"
          env:
            - name: PROTOCOL
              value: v2
            - name: MODEL_PATH
              value: /mnt/models/model_weights.pth
            - name: ENCODER_PATH
              value: /mnt/models/preprocess_metadata.pkl
            - name: DENSE_COLS
              value: price
            - name: SPARSE_COLS
              value: userid,cms_segid,cms_group_id,final_gender_code,age_level,pvalue_level,shopping_level,occupation,new_user_class_level,adgroup_id,cate_id,campaign_id,customer,brand,pid,btag
            - name: STORAGE_URI
              value: pvc://torch-workspace

這邊的 STORAGE_URI 是 InferenceService 提供的寫法,他會幫忙 mount torch-workspace 這一個 PVC 到 /mnt/models/ 這個位置,所以我們所訓練的模型權重、資料前處理所需的資料都可以這樣被提供,當然這也可以掛載到 S3, GCS 之類的地方。

而在 Autoscaling 的部分,可以定義在 predictor(如果有額外定義 transformer service 也是一樣的道理):

Bash
  predictor:
    scaleTarget: 1
    scaleMetric: concurrency
    maxReplicas: 10

這樣就可以直接依據 concurrency 的水位來決定要有幾個 replica,那這邊因為是用 serverless 的方式做部署,所以會用 Knative 的 autoscaling,如果是要用 KEDA 做 autoscaling 或者部署 LLM,可以轉用 raw deployment 的部署方式。

一旦部署完,k9 就可以看到那一個 InferenceService,而 Central Dashboard 上面的 Kserve endpoints 也可以看到:

k9s InferenceService CR
Kubeflow Central Dashboard – Kserve Endpoints

Step 5: Test Inference Service

Out-of-cluster

假設要在 cluster 外面發出推論請求,會需要帶 token 並且因為這中間有過一層 Ingress,所以 request header 會需要帶 Host:

Bash
curl -v \
-H "Host: $SERVICE_HOSTNAME" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d @$INPUT_PATH \
http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict 

這邊打的是 v1 protocol 的 endpoint,因為 body 比較好寫 XDD,如果是要走 v2 protocal 可以參考 test_infer.py,同時 KServe 也有提供 Python Inference SDK InferenceRESTClientInferenceGRPCClient,這樣就不用透過 request 來發送請求了。

In-cluster

如果是 in-cluster 的話,就不用特別帶 token 了,可以直接打 internal service:

Python
base_url = "http://dcn-v2.kubeflow-user-example-com.svc.cluster.local/v1/models/dcnv2:predict"

response = requests.post(base_url, headers=headers, json={"inputs": v1_request})

V1 Protocal

Python
{'predictions': [[0.05610664561390877]]}

V2 Protocal

Python
{
    "model_name": "dcnv2",
    "model_version": None,
    "id": "6f509eae-fc74-42a1-9ecb-3803ebd908ac",
    "parameters": None,
    "outputs": [
        {
            "name": "output-0",
            "shape": [1, 1],
            "datatype": "FP32",
            "parameters": None,
            "data": [0.05610664561390877],
        }
    ],
}

Step 6: Autoscaling

最後,可以做一點 load test 來看 autoscaling 會不會正常運作。這邊我們用 Hey 來發送 request(30 秒、30 個 concurrent workers):

Bash
hey -z 30s -c 30 -m POST -host ${SERVICE_HOSTNAME} \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-D $INPUT_PATH \
http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict

接著就可以看到 Deployment 裡面,從 1 個 pod 長到 10 個 pods (maxReplicas):


Summary

我認為生產環境重要的還是 Stability & Scalability、Latency & Throughput,Kubeflow 在這邊就提供了一個很好的 end-to-end 平台,從開發到部署再到可觀測性都有一套 cloud-native 的 solution,尤其最近更是積極整合 LLM,讓人非常期待 Kubeflow 未來的發展,與和其他 ecosystem 的整合。

下一個想試的是 Ray 😎

Re-rank 算法: Maximal Marginal Relevance

最近在工作上實作了 Maximal Marginal Relevance (MMR) 的算法,主要用在推薦系統的 rerank,這個算法的結構跟想法都簡單,但效果算是非常不錯(至少在我的任務上),整體非常雋永,特別介紹給大家,尤其在 LLM 當道的時代,RAG 與 搜尋、廣告、推薦其實都是互通有無的。

先來看一個案例,我分別在 PChome 與 momo 上面搜尋「牛奶」,PChome 在前 10 個商品以內有兩組重複的,momo 則在前 10 個有一組重複的商品,當然商品排序需要考慮的因素眾多,但至少就「多樣性」來說,momo在這一方面或許就相對好一點,畢竟消費者可能沒那麼在乎今天到底是要買一箱?還是買兩箱?他們可能更在乎我有多少種類的東西可以選擇。

PChome – milk
momo – milk

像這樣子的排序問題,就直接地影響到消費者對於這個電商的看法。一般來說在使用者搜尋之後,系統會從各路檢索出相關的商品,經過一系列的排序再在頁面上面呈現給使用者,而在這一個過程當中,候選商品會逐漸減少,而 re-rank 通常發生在整個搜尋系統的下游,也就是我們現在手上已經有一批商品候選了,我們應該要如何排序這些商品?

MMR 就是就是一個非常經典且實用的算法,定義如下:

$$ \arg \max_{D_i \in \mathbb{R} \backslash \mathbb{S}} \left[ (1 – \lambda) \overbrace{\text{sim}(D_i, Q) }^\textrm{similarity} – \lambda \overbrace{\max_{D_j \in \mathbb{S}} \text{sim}(D_i, D_j) }^\textrm{diversity} \right]$$

  • $Q$: 表示一個 query
  • $D$: 表示一個文本
  • $\mathbb{R}$: 表示所有的文本的集合
  • $\mathbb{S}$: 表示已經選擇的文本集合
  • $\text{sim}(p, q)$: 用來計算 p, q 的相似分數
  • $\lambda \in [0, 1]$: 多樣性的權重

在這一個 argmax 中,我們會得到一個文本的 index $i$,使得在這一輪中,他與 query 有一定程度的相似,同時也與已經選擇的文本集合有一定程度的差異,這個 similariry 與 diversity 的 trade-off 主要是透過 $\lambda$ 來控制,要是我們執行 k 個 iteration,最後就會得到有 k 個文本的 re-rank 結果。

Python Implementation

完整流程與程式碼請參考 notebook: GitHub.

Dataset

我們利用一個公開資料集「CAS產品查詢」來模擬一下搜尋特定產品,並且利用 MMR 來排序的過程。我們將這個資料集裡面的產品名稱 Product_Name 當作搜尋的標的,這個資料集大概是長這個樣子:

為了實現依照相似度搜尋的功能,我們將 Product_Name 與 query 透過 embedding model 轉換成 1024 維度的向量,並且利用 consine similarity 進行語義搜尋。

  • Embedding: Qwen3-Embedding-0.6B (Dimension: 1024)
  • Package: sentence-transformers
  • Similarity: cosine similarity

Search and Re-rank

模擬使用者搜尋的過程如下:

  1. 使用者輸入一個搜尋的字詞 query
  2. queryProduct_Name 經過 embedding model 投影到一個 1024 維的向量
  3. 利用 cosine similarity 找出前 30 個跟 query 相似的商品
  4. 再利用 MMR 重新排序這 30 個商品,最後回傳 10 個商品結果

至於 MMR,有別於經典的方式,我們實作一個有 sliding window 的版本,在每一個 iteration 中,當考量 diversity 時,我們只考量前 m 個已經選擇的商品。


Python
def mmr(
    query_embedding: np.ndarray,
    document_embeddings: np.ndarray,
    diversity: float = 0.1,
    top_n: int = 10,
    window_size: int | None = None
) -> list[str]:
    """Maximal Marginal Relevance (with sliding window).

    Arguments:
        query_embedding: The query embedding
        document_embeddings: The embeddings of the selected documents
        diversity: The diversity of the selected embeddings. Values between 0 and 1.
        top_n: The top n items to return
        window_size: The size of the sliding window

    Returns:
            list[int]: The indices of the selected documents
    """
    from sklearn.metrics.pairwise import cosine_similarity

    # compute similarity(Q, D) and similarity(D, D)
    query_doc_similarity = cosine_similarity([query_embedding], document_embeddings)[0]
    pair_similarity = cosine_similarity(document_embeddings)

    if window_size is None:
        window_size = min(10, len(document_embeddings))

    # return doc_idx as the result and recode candidates_idx as current candidate set
    doc_idx = [np.argmax(query_doc_similarity)]
    candidates_idx = [i for i in range(len(document_embeddings)) if i != doc_idx[0]]
    for _ in range(top_n - 1):
        # in each iteration, select one documnet within candidates using MMR
        candidate_similarities = query_doc_similarity[candidates_idx]
        target_similarities = np.max(pair_similarity[candidates_idx][:, doc_idx[-window_size:]], axis=1)

        # calculate MMR
        mmr = (1 - diversity) * candidate_similarities - diversity * target_similarities
        mmr_idx = candidates_idx[np.argmax(mmr)]

        # Update doc_idx & candidates
        doc_idx.append(mmr_idx)
        candidates_idx.remove(mmr_idx)

    return doc_idx

Python
def search(
    query: str,
    using_mmr: bool = True,
    window_size: int | None = None,
    diversity: float = 0.1
) -> list[str]:
    query_embedding = model.encode(query, prompt_name="query")
    similarity_scores = model.similarity(query_embedding, document_embeddings)[0]
    
    indices = np.argsort(similarity_scores.tolist())[::-1]
    
    if not using_mmr:   
        return df.Product_Name[indices[:10]]
    
    doc_idx = mmr(
        query_embedding,
        document_embeddings[indices[:30]],
        top_n=10,
        window_size=window_size,
        diversity=diversity
    )
    return df.Product_Name[indices[doc_idx]]

詳細的 code 還請參照 GitHub

Example

  • Different Diversity
    搜尋「鮮乳」,比較不同 diversity 權重的結果
Python
pd.DataFrame({
    "Diversity=0.1": search("鮮乳", diversity=0.1).to_list(),
    "Diversity=0.9": search("鮮乳", diversity=0.9).to_list()
})

很明顯的可以看到當 diversity 低的時候,搜尋到的產品很有可能會重複。

  • Different window size
    比較同樣 diversity 時,啟用 sliding window 與否的結果
Python
pd.DataFrame({
    "window_size=None": search(
        "鮮乳", diversity=0.6, window_size=None
    ).to_list(),
    "window_size=4": search(
        "鮮乳", diversity=0.6, window_size=4
    ).to_list(),
})

window size = 4 時,第 0 與第 5 個產品都是「四方鮮乳全脂鮮乳」;第 4 與第 9 個產品都是「光泉鮮乳-成分無調整」,而這中間的間隔就是我們設定的 window size。當然,diversitywindow_size 都是需要被設計且調整的參數,實務上還是要多觀察才會比較知道要怎麼設定。

Conclusion

實在很喜歡這種小品算法,但算法本身的好壞,完全與商業指標相關,要是衡量的指標全是那種教科書上會出現的,那也真的不是一個很好的實踐。

搜廣推的算法領域實在有太多有趣、需要學習、想要學習的東西了~~

A/B Test 的指標選擇:Cumulative vs. Windowed

A/B test 的指摽幾乎是要做實驗前首要決定的事情,比如說當我們想要衡量網站改版前後,使用者的點擊率是否會提高?使用者的平均停留時間是否會改變?但這樣的指標定義確實是挺空泛的,比如說使用者有沒有包含那些未註冊的用戶?我們該如何衡量點擊率,是累積的點擊率?還是次日、七日的點擊率?在時間的維度上,Spotify 最近就發表了一篇我覺得挺有意思的文章,他們主要將指標分成兩類:

  • Cumulative metric: 每個樣本被測量到的時間區間是不一致的,這個區間也就是從樣本接受暴露開始($e$)到當下測量的時間($t$)。
  • Windowed metric: 每個樣本固定在接受暴露 ($e$), 並過了 $\nu$ 時間之後被測量 ($e + \nu$),所以要是測量的時間 $t < e + \nu$,此時這個指標是未被定義的。
閱讀全文〈A/B Test 的指標選擇:Cumulative vs. Windowed〉

根據 Cluster 調整標準誤的時機與誤區

之前寫完〈Doordash 如何透過 Switchback Experiment 處理 Network Effect〉之後,順藤摸瓜看到 ,這篇的作者陣容十分堅強,內容自然也是不在話下。

在 Doordash 的 Switchback experiment 裡面,他們依照 cluster 的特性調整標準誤(CRSE),主因就是實驗設計會使樣本之間存在關聯性。當然,這樣的調整並不僅限於實驗資料,在一般的觀測性資料裡面,也有一些人認為在 cluster 裡面有一些未被觀察到的成分是有相關性的,所以他們會基於 cluster 調整標準誤。

閱讀全文〈根據 Cluster 調整標準誤的時機與誤區〉

Doordash 如何透過 Switchback Experiment 處理 Network Effect

Doordash 是一家總部位在舊金山的外送平台,如同在台灣的 Uber Eats 或 Foodpanda,Doordash 作為一個平台負責媒合消費者與店家,並透過外送師(Dasher)協助完成每筆訂單。假設今天 Doordash 想要在某個地區發放折扣並檢視其成效,於是他們隨機地將一個地區裡面的消費者分為實驗組 (Treatment Group) 與對照組 (Control Group),可想而知實驗組的消費者可能會更傾向去訂購,於是那一個地區的外送師會為了處理實驗組的訂單而疲於奔命。最後當我們檢視實驗效果的時候,其實很難說實驗組與對照組下訂的數量差異,到底是來自於優惠券的發放?還是因為外送師被實驗組大量佔據,導致根本就沒有 Supply 可以去處理對照組的 Demand?這個時候透過 A/B testing 估計的處置效果就會有 bias。

閱讀全文〈Doordash 如何透過 Switchback Experiment 處理 Network Effect〉

Netflix 如何透過短期 A/B Testing 結果估計長期成效

實驗一直是 Big Tech 用以推動產品迭代的重要步驟,好的實驗可以驗證產品的改變是否對使用者來說有正面的效益,公司也才會有信心將這個功能正式上線,而對於一個網路產品來說最常用的手法即是 A/B testing。然而,在講求快速迭代的產品開發過程當中,A/B testing 的實驗週期通常不會太長,也就很難透過 A/B testing 衡量長期效益。舉例來說,Netflix 想要加強推廣旗下的遊戲產品,於是試圖在 App 上新增廣告欄位,透過實驗發現遊戲的下載數確實因為廣告而有所提升,但我們卻很難回答:長期來看這個功能是否使得使用者更喜歡 Netflix?更願意留在產品內並提供營收?

閱讀全文〈Netflix 如何透過短期 A/B Testing 結果估計長期成效〉

視覺化 Apple Health Data (Streamlit + Altair)

Online Demo: https://boboru-apple-health.streamlit.app/

意外發現手機裡面的 Apple Health 儲存了大量歷年來的資料,於是想說來做個專案轉換一下心情,順便練習一下新的玩具。想法是要做一些 Apple 沒有做到的事情,比如說使用者其實不太能自由地選擇要分析的時間區段(但使用上很直覺,也沒什麼不好);Apple 基本上都是選擇「平均數」當作敘述統計量,但想想許多資料分佈肯定是長尾,而平均數對於極端值很敏感,所以這個時候或許可以考慮其他敘述統計的方式。

閱讀全文〈視覺化 Apple Health Data (Streamlit + Altair)〉

如果你在前方抬頭,而我亦抬頭

在 1968 年的某個冬日下午,一條繁忙的紐約街道上,一群人正抬頭望向身旁的建築物,這群莫名其妙的人吸引了路人的目光,有些人因此佇足,也有些人也跟著抬起了頭,望向遠方。 而他們的一舉一動正默默地被記錄下來⋯⋯

今年的搞笑諾貝爾獎(Ig Nobel Prize)於心理學領域,頒給了這篇 1969 年發表的研究, 他們試圖找出不同群眾大小所發揮的影響力是否相異,於是如同前段所述,研究者找了一個會有行人經過的場域,並且派出數量不等的群眾(1 ~ 15 人),站在街上仰望天空 60 秒,研究者並接著統計在這 60 秒期間,有多少路人會停下來,甚至是加入這個群體一起抬頭。

閱讀全文〈如果你在前方抬頭,而我亦抬頭〉

不可兼得的稀疏性與穩定性 (Sparsity and Stability)

針對一個機器學習的演算法,一般來說我們會希望他的泛化能力(generalization ability)是好的,也就是說倘若模型在訓練資料集的 loss 很低,則在測試資料集的 loss 應該也要很低才對,這樣的想法並不侷限於訓練與測試,offline 與 online、development 與 production、A 資料集與 B 資料集等應用場域都是互相呼應的,而穩定性(Stability)稀疏性(Sparsity)正是兩個用以描述演算法泛化能力的性質。這篇文章主要想透過 介紹這兩個性質並進一步探討兩者之間的關係。

閱讀全文〈不可兼得的稀疏性與穩定性 (Sparsity and Stability)〉