初探 Kubeflow: MLOps From Training to Deployment

Kubeflow 是一個架構在 Kubernetes 的 MLOps 平台,從開發到部署都擁有各種 solution,包含大家最愛用的 jupyter notebook(?)、支援各種模型訓練 framework 的 Trainer、超參數調整的 Katlib、以及整合各種推論 runtime 的 KServe,當然 Kubeflow 也積極整合各種 LLM 所需的推論架構。

ref: https://www.kubeflow.org/docs/started/images/ai-lifecycle-kubeflow.drawio.svg

在這個專案中,我自建了一個 kubernetes cluster,並且部署了一套 Kubeflow,並在其中嘗試模型開發、訓練與部署,最後也試著在 in-cluster, out-of-cluster 的環境發送模型的推論請求。

主要用到的 Kubeflow 服務包括:

  • Kubeflow Trainer:定義模型,並且在 cluster 裡面做 DDP 的模型訓練
  • Kubeflow KServe:部署模型,並且利用 Knative autoscaler 做 autosacling
  • Kubeflow Dashboard 與 Notebooks:開發與觀測,包括實驗、監控與協作環境

Github repo: kubeflow-mlops-starter

Step 0: Build Kind Cluster

Bash
kind create cluster --name=kubeflow  --config kubeflow-example-config.yaml

這邊直接用 kind 建立一個 local cluster,建好之後可以搭配 k9s 檢測 cluster 狀態。

Step 1: Deploy Kubeflow Platform

Kubeflow 的 components 稍微多了一點,我這邊是用 Kubeflow Manifests v1.0 的 branch 去做部署,有一些資源耗費較大或暫時用不到的部件,像是 Katib, Kubeflow Pipelines, Spark Operator,在這個專案裡就先沒有裝了。裝好之後,應該會看到 k9s 裡面有一堆 pod。

Step 2: Model Training

Kubeflow 的架構本身就是 model agnostic 的,所以 text, image, tabular data 自然都可以被處理,這邊我們嘗試利用一個廣告相關的資料訓練 CTR prediction model。這個資料集是 TaobaoAd_x1,主要就是利用 user, item features 去預測點擊機率 (binary classification),模型方面我們採用 Google 提出的 DCN v2,這樣子的模型在現代自然是一個很小的模型,但整個預測表現還是不錯的。

在訓練的部分,我們利用 Kubeflow Trainer 進行 torch model distributed training ( Distributed Data Parallel (DDP)),順帶一提,最近 Trainer 好像剛上 v2,某些 function 可能在新版會有不同的寫法。但總之,定義好 dataset 處理、模型、forward 之後就可以硬 train 一波了。

送出 Training job 之後,根據你定義的 worker 數量,k8 會啟動相對應的 pod,並且執行你定義的 training function,至於訓練的結果看是要送去 object storage,還是哪裡都是可以,我這邊就直接放到 kubeflow notebook 開的 PVC 裡面。模型訓練的好壞暫不是這個專案的重點,但 kubeflow 是有提供 tensorboard 的介面來視覺化模型的訓練過程,也方便實時監控。

Step 3: Define Custom Model

為了要讓 model 可以在 cluster 裡面部署,我們會需要準備 model inference 的 runtime,這邊參考了 KServe 的 Deploy Custom Python Serving Runtime with InferenceService。在這個專案裡,我們主要定義 preprocess handler 如何做 data preprocessing,其他細節就麻煩參考 GitHub Repo。

至於 image 的部分,我這邊用了 BuildPacks 去打包 image,雖然是蠻方便的,但我覺得用 docker 包可能會更容易用一點 XD

Bash
pack build --builder=heroku/builder:24 ${DOCKER_USER}/dcnv2:v1

Step 4:  Deploy the Service in Kubernetes

接下來,就可以利用剛包好的 image 搭配 KServe 來讓模型上線,實務上就是寫一個 yaml 來部署 InferenceService:

Bash
  apiVersion: serving.kserve.io/v1beta1
  kind: InferenceService
  metadata:
    name: dcnv2
    namespace: kubeflow-user-example-com
    annotations:
      sidecar.istio.io/inject: "false"
  spec:
    predictor:
      scaleTarget: 1
      scaleMetric: concurrency
      maxReplicas: 10
      containers:
        - name: kserve-container
          image: boboru/dcnv2:v1
          resources:
            requests:
              cpu: "100m"
              memory: "512Mi"
            limits:
              cpu: "1"
              memory: "1Gi"
          env:
            - name: PROTOCOL
              value: v2
            - name: MODEL_PATH
              value: /mnt/models/model_weights.pth
            - name: ENCODER_PATH
              value: /mnt/models/preprocess_metadata.pkl
            - name: DENSE_COLS
              value: price
            - name: SPARSE_COLS
              value: userid,cms_segid,cms_group_id,final_gender_code,age_level,pvalue_level,shopping_level,occupation,new_user_class_level,adgroup_id,cate_id,campaign_id,customer,brand,pid,btag
            - name: STORAGE_URI
              value: pvc://torch-workspace

這邊的 STORAGE_URI 是 InferenceService 提供的寫法,他會幫忙 mount torch-workspace 這一個 PVC 到 /mnt/models/ 這個位置,所以我們所訓練的模型權重、資料前處理所需的資料都可以這樣被提供,當然這也可以掛載到 S3, GCS 之類的地方。

而在 Autoscaling 的部分,可以定義在 predictor(如果有額外定義 transformer service 也是一樣的道理):

Bash
  predictor:
    scaleTarget: 1
    scaleMetric: concurrency
    maxReplicas: 10

這樣就可以直接依據 concurrency 的水位來決定要有幾個 replica,那這邊因為是用 serverless 的方式做部署,所以會用 Knative 的 autoscaling,如果是要用 KEDA 做 autoscaling 或者部署 LLM,可以轉用 raw deployment 的部署方式。

一旦部署完,k9 就可以看到那一個 InferenceService,而 Central Dashboard 上面的 Kserve endpoints 也可以看到:

k9s InferenceService CR
Kubeflow Central Dashboard – Kserve Endpoints

Step 5: Test Inference Service

Out-of-cluster

假設要在 cluster 外面發出推論請求,會需要帶 token 並且因為這中間有過一層 Ingress,所以 request header 會需要帶 Host:

Bash
curl -v \
-H "Host: $SERVICE_HOSTNAME" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d @$INPUT_PATH \
http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict 

這邊打的是 v1 protocol 的 endpoint,因為 body 比較好寫 XDD,如果是要走 v2 protocal 可以參考 test_infer.py,同時 KServe 也有提供 Python Inference SDK InferenceRESTClientInferenceGRPCClient,這樣就不用透過 request 來發送請求了。

In-cluster

如果是 in-cluster 的話,就不用特別帶 token 了,可以直接打 internal service:

Python
base_url = "http://dcn-v2.kubeflow-user-example-com.svc.cluster.local/v1/models/dcnv2:predict"

response = requests.post(base_url, headers=headers, json={"inputs": v1_request})

V1 Protocal

Python
{'predictions': [[0.05610664561390877]]}

V2 Protocal

Python
{
    "model_name": "dcnv2",
    "model_version": None,
    "id": "6f509eae-fc74-42a1-9ecb-3803ebd908ac",
    "parameters": None,
    "outputs": [
        {
            "name": "output-0",
            "shape": [1, 1],
            "datatype": "FP32",
            "parameters": None,
            "data": [0.05610664561390877],
        }
    ],
}

Step 6: Autoscaling

最後,可以做一點 load test 來看 autoscaling 會不會正常運作。這邊我們用 Hey 來發送 request(30 秒、30 個 concurrent workers):

Bash
hey -z 30s -c 30 -m POST -host ${SERVICE_HOSTNAME} \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-D $INPUT_PATH \
http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict

接著就可以看到 Deployment 裡面,從 1 個 pod 長到 10 個 pods (maxReplicas):


Summary

我認為生產環境重要的還是 Stability & Scalability、Latency & Throughput,Kubeflow 在這邊就提供了一個很好的 end-to-end 平台,從開發到部署再到可觀測性都有一套 cloud-native 的 solution,尤其最近更是積極整合 LLM,讓人非常期待 Kubeflow 未來的發展,與和其他 ecosystem 的整合。

下一個想試的是 Ray 😎

Re-rank 算法: Maximal Marginal Relevance

最近在工作上實作了 Maximal Marginal Relevance (MMR) 的算法,主要用在推薦系統的 rerank,這個算法的結構跟想法都簡單,但效果算是非常不錯(至少在我的任務上),整體非常雋永,特別介紹給大家,尤其在 LLM 當道的時代,RAG 與 搜尋、廣告、推薦其實都是互通有無的。

先來看一個案例,我分別在 PChome 與 momo 上面搜尋「牛奶」,PChome 在前 10 個商品以內有兩組重複的,momo 則在前 10 個有一組重複的商品,當然商品排序需要考慮的因素眾多,但至少就「多樣性」來說,momo在這一方面或許就相對好一點,畢竟消費者可能沒那麼在乎今天到底是要買一箱?還是買兩箱?他們可能更在乎我有多少種類的東西可以選擇。

PChome – milk
momo – milk

像這樣子的排序問題,就直接地影響到消費者對於這個電商的看法。一般來說在使用者搜尋之後,系統會從各路檢索出相關的商品,經過一系列的排序再在頁面上面呈現給使用者,而在這一個過程當中,候選商品會逐漸減少,而 re-rank 通常發生在整個搜尋系統的下游,也就是我們現在手上已經有一批商品候選了,我們應該要如何排序這些商品?

MMR 就是就是一個非常經典且實用的算法,定義如下:

$$ \arg \max_{D_i \in \mathbb{R} \backslash \mathbb{S}} \left[ (1 – \lambda) \overbrace{\text{sim}(D_i, Q) }^\textrm{similarity} – \lambda \overbrace{\max_{D_j \in \mathbb{S}} \text{sim}(D_i, D_j) }^\textrm{diversity} \right]$$

  • $Q$: 表示一個 query
  • $D$: 表示一個文本
  • $\mathbb{R}$: 表示所有的文本的集合
  • $\mathbb{S}$: 表示已經選擇的文本集合
  • $\text{sim}(p, q)$: 用來計算 p, q 的相似分數
  • $\lambda \in [0, 1]$: 多樣性的權重

在這一個 argmax 中,我們會得到一個文本的 index $i$,使得在這一輪中,他與 query 有一定程度的相似,同時也與已經選擇的文本集合有一定程度的差異,這個 similariry 與 diversity 的 trade-off 主要是透過 $\lambda$ 來控制,要是我們執行 k 個 iteration,最後就會得到有 k 個文本的 re-rank 結果。

Python Implementation

完整流程與程式碼請參考 notebook: GitHub.

Dataset

我們利用一個公開資料集「CAS產品查詢」來模擬一下搜尋特定產品,並且利用 MMR 來排序的過程。我們將這個資料集裡面的產品名稱 Product_Name 當作搜尋的標的,這個資料集大概是長這個樣子:

為了實現依照相似度搜尋的功能,我們將 Product_Name 與 query 透過 embedding model 轉換成 1024 維度的向量,並且利用 consine similarity 進行語義搜尋。

  • Embedding: Qwen3-Embedding-0.6B (Dimension: 1024)
  • Package: sentence-transformers
  • Similarity: cosine similarity

Search and Re-rank

模擬使用者搜尋的過程如下:

  1. 使用者輸入一個搜尋的字詞 query
  2. queryProduct_Name 經過 embedding model 投影到一個 1024 維的向量
  3. 利用 cosine similarity 找出前 30 個跟 query 相似的商品
  4. 再利用 MMR 重新排序這 30 個商品,最後回傳 10 個商品結果

至於 MMR,有別於經典的方式,我們實作一個有 sliding window 的版本,在每一個 iteration 中,當考量 diversity 時,我們只考量前 m 個已經選擇的商品。


Python
def mmr(
    query_embedding: np.ndarray,
    document_embeddings: np.ndarray,
    diversity: float = 0.1,
    top_n: int = 10,
    window_size: int | None = None
) -> list[str]:
    """Maximal Marginal Relevance (with sliding window).

    Arguments:
        query_embedding: The query embedding
        document_embeddings: The embeddings of the selected documents
        diversity: The diversity of the selected embeddings. Values between 0 and 1.
        top_n: The top n items to return
        window_size: The size of the sliding window

    Returns:
            list[int]: The indices of the selected documents
    """
    from sklearn.metrics.pairwise import cosine_similarity

    # compute similarity(Q, D) and similarity(D, D)
    query_doc_similarity = cosine_similarity([query_embedding], document_embeddings)[0]
    pair_similarity = cosine_similarity(document_embeddings)

    if window_size is None:
        window_size = min(10, len(document_embeddings))

    # return doc_idx as the result and recode candidates_idx as current candidate set
    doc_idx = [np.argmax(query_doc_similarity)]
    candidates_idx = [i for i in range(len(document_embeddings)) if i != doc_idx[0]]
    for _ in range(top_n - 1):
        # in each iteration, select one documnet within candidates using MMR
        candidate_similarities = query_doc_similarity[candidates_idx]
        target_similarities = np.max(pair_similarity[candidates_idx][:, doc_idx[-window_size:]], axis=1)

        # calculate MMR
        mmr = (1 - diversity) * candidate_similarities - diversity * target_similarities
        mmr_idx = candidates_idx[np.argmax(mmr)]

        # Update doc_idx & candidates
        doc_idx.append(mmr_idx)
        candidates_idx.remove(mmr_idx)

    return doc_idx

Python
def search(
    query: str,
    using_mmr: bool = True,
    window_size: int | None = None,
    diversity: float = 0.1
) -> list[str]:
    query_embedding = model.encode(query, prompt_name="query")
    similarity_scores = model.similarity(query_embedding, document_embeddings)[0]
    
    indices = np.argsort(similarity_scores.tolist())[::-1]
    
    if not using_mmr:   
        return df.Product_Name[indices[:10]]
    
    doc_idx = mmr(
        query_embedding,
        document_embeddings[indices[:30]],
        top_n=10,
        window_size=window_size,
        diversity=diversity
    )
    return df.Product_Name[indices[doc_idx]]

詳細的 code 還請參照 GitHub

Example

  • Different Diversity
    搜尋「鮮乳」,比較不同 diversity 權重的結果
Python
pd.DataFrame({
    "Diversity=0.1": search("鮮乳", diversity=0.1).to_list(),
    "Diversity=0.9": search("鮮乳", diversity=0.9).to_list()
})

很明顯的可以看到當 diversity 低的時候,搜尋到的產品很有可能會重複。

  • Different window size
    比較同樣 diversity 時,啟用 sliding window 與否的結果
Python
pd.DataFrame({
    "window_size=None": search(
        "鮮乳", diversity=0.6, window_size=None
    ).to_list(),
    "window_size=4": search(
        "鮮乳", diversity=0.6, window_size=4
    ).to_list(),
})

window size = 4 時,第 0 與第 5 個產品都是「四方鮮乳全脂鮮乳」;第 4 與第 9 個產品都是「光泉鮮乳-成分無調整」,而這中間的間隔就是我們設定的 window size。當然,diversitywindow_size 都是需要被設計且調整的參數,實務上還是要多觀察才會比較知道要怎麼設定。

Conclusion

實在很喜歡這種小品算法,但算法本身的好壞,完全與商業指標相關,要是衡量的指標全是那種教科書上會出現的,那也真的不是一個很好的實踐。

搜廣推的算法領域實在有太多有趣、需要學習、想要學習的東西了~~

不可兼得的稀疏性與穩定性 (Sparsity and Stability)

針對一個機器學習的演算法,一般來說我們會希望他的泛化能力(generalization ability)是好的,也就是說倘若模型在訓練資料集的 loss 很低,則在測試資料集的 loss 應該也要很低才對,這樣的想法並不侷限於訓練與測試,offline 與 online、development 與 production、A 資料集與 B 資料集等應用場域都是互相呼應的,而穩定性(Stability)稀疏性(Sparsity)正是兩個用以描述演算法泛化能力的性質。這篇文章主要想透過 介紹這兩個性質並進一步探討兩者之間的關係。

閱讀全文〈不可兼得的稀疏性與穩定性 (Sparsity and Stability)〉