diff --git a/01-airflow功能介紹.md b/01-airflow功能介紹.md
new file mode 100644
index 0000000..e76b766
--- /dev/null
+++ b/01-airflow功能介紹.md
@@ -0,0 +1,138 @@
+# Apache Airflow 介紹
+
+---
+
+## 1. 什麼是 Apache Airflow?
+
+- ### **Apache Airflow 是「用程式碼管理所有資料流程(Configuration as Code)的平台」**
+- 適用於 ETL、資料整合、報表產生、機器學習前後處理等流程。
+- 使用者透過 **Python 程式碼** 來 **編寫**、**排程** 和**監控** 數據工作流程。
+
+### 核心組成
+Airflow 的運作邏輯是由以下三個核心概念組成的:
+
+1. **流程怎麼走 : DAG (有向無環圖)**:
+ * 這是任務流程的「藍圖」。所有的任務順序、依賴關係(先做 A,再做 B)都在這裡定義。
+2. **工作要怎麼做 : Operator (操作員)**:
+ * 定義任務「實際做什麼」的模板。
+ * **Providers**:Airflow 內建 AWS, GCP, Snowflake 等數千種現成的連接器,這是傳統工具較難追上的生態優勢。
+3. **實際跑出來的結果 : Task (任務)**:
+ * 當 DAG 被執行時,Operator 就會被實例化成為一個 Task,並擁有自己的狀態(排程中、執行中、成功、失敗)。
+
+```python
+from airflow import DAG
+from airflow.decorators import task
+from datetime import datetime
+import json
+from pathlib import Path
+
+with DAG(
+ dag_id="taskflow_realistic_etl_path_xcom",
+ start_date=datetime(2024, 1, 1),
+ # schedule="0 2 * * *",
+ catchup=False,
+ tags=["example", "taskflow", "xcom"],
+) as dag:
+
+ @task
+ def extract() -> str:
+ # 實務上:抓 API/DB 後落地成檔案(回傳「路徑」最推薦)
+ out = Path("/tmp/airflow_demo_extract.json")
+ out.write_text(json.dumps({"rows": 3, "data": [1, 2, 3]}))
+ return str(out) # 這個字串會自動進 XCom
+
+ @task
+ def transform(path: str) -> str:
+ p = Path(path)
+ payload = json.loads(p.read_text())
+
+ # 假裝做轉換:把每個值 * 10
+ payload["data"] = [x * 10 for x in payload["data"]]
+
+ out = Path("/tmp/airflow_demo_transform.json")
+ out.write_text(json.dumps(payload))
+ return str(out)
+
+ @task
+ def load(path: str) -> None:
+ payload = json.loads(Path(path).read_text())
+ # 實務上:寫入 Doris / Postgres / S3 / Kafka...
+ print(f"LOAD rows={payload['rows']} data={payload['data']}")
+
+ load(transform(extract()))
+
+ # e = extract()
+ # t = transform(e)
+ # l = load(t)
+
+ # e >> t >> l
+```
+
+---
+
+## 2. Airflow 平台核心能力
+
+
+### A. 可擴展的任務執行架構
+* **功能**:Airflow 採用「排程控制」與「任務執行」分離的分散式架構設計,任務可以分散執行於多個 Worker 節點,並依需求彈性擴充。
+* 實際價值:
+ - 資料量波動大、尖峰不固定 的工作負載
+ - 可與容器平台(如 Kubernetes)整合,避免資源長期閒置
+ - 不同類型的任務(ETL、API、ML)可以並行處理
+
+### B. 流程即程式碼的版本管理能力
+* **功能**:Airflow 將所有流程定義為程式碼,天然可納入 Git 等版本控制系統進行管理。
+* 實際價值:
+ - 每一次流程變更都有明確紀錄,方便回溯與審核
+ - 可安全地重跑歷史資料(Backfill),確保使用的是「當時的流程邏輯」
+ - 流程修改可以走標準的開發流程(Review / 測試 / 部署)
+
+### C. 多元觸發機制:時間 + 事件
+* **功能**:除了傳統的時間排程(Cron),Airflow 也支援以「資料狀態」或「事件」作為流程啟動條件。
+* 實際價值:
+ - 上游資料完成後,下游流程可立即啟動
+ - 減少不必要的空跑(資料還沒好就先排程)
+ - 更適合串接資料湖、串流處理或跨系統流程
+
+### D. 清楚可視化的流程監控與錯誤處理
+* **功能**:Airflow 提供 DAG 視覺化介面,清楚呈現任務依賴、執行狀態與錯誤位置。
+* 實際價值:
+ - 問題發生時,可以快速定位「卡在哪一個步驟」
+ - 支援重試、跳過、補跑等操作,降低人工介入成本
+ - 可與告警系統整合,提升維運可視性
+
+---
+
+## 3. 常見的數據流程管理痛點
+
+| 傳統痛點 | Airflow 解決方案 |
+| :--- |:---------------------------------------------------------|
+| **GUI 黑箱作業**
流程邏輯藏在圖形介面與設定檔深處,版控困難,難以 Code Review。 | **Configuration as Code**
流程即代碼。邏輯透明、可版控、可測試、可多人協作開發。 |
+| **依賴地獄 (Dependency Hell)**
任務 A 失敗,任務 B 卻繼續跑;或跨系統依賴難以管理。 | **DAG 狀態管理**
嚴格的依賴控制,支援複雜的邏輯判斷 (Branching) 與跨 DAG 觸發。 |
+| **授權費昂貴**
商用軟體以 Agent 或 Job 數量計費,擴充成本極高。 | **Open Source**
開源免費,無 Agent 數量限制,適合大規模雲端動態擴展。 |
+| **人才斷層**
年輕工程師不想學專有的商用工具指令。 | **Python 標準**
使用通用的 Python 語言,人才庫龐大且易於招募。 |
+
+---
+
+## 4. Airflow vs. Control-M
+
+| 比較維度 | **Apache Airflow 3.0** | **BMC Control-M** |
+| :--- | :--- | :--- |
+| **核心哲學** | **Code-First (代碼優先)**
適合開發者 (Dev) 與資料工程師。 | **Config-First (設定優先)**
適合維運人員 (Ops),以 GUI 拖拉為主。 |
+| **版控與協作** | 🏆 **Git Flow 整合**
天生支援 Branch、PR、Code Review 流程。 | **弱**
依賴匯出 XML/JSON 進行版控,難以多人同時開發。 |
+| **雲端與生態** | 🏆 **雲原生 (Cloud Native)**
AWS/GCP/Azure 整合極深,容器化支援佳。 | **傳統強項**
強項在 Mainframe (大型主機) 與地端 Legacy 系統。 |
+| **成本結構** | **硬體與人力成本**
軟體免費,但需投入工程人力維護。 | **高昂授權費**
按 Job 數或處理器核心計費,擴充昂貴。 |
+| **客製化能力** | **無限**
任何 Python 能寫的邏輯都能跑。 | **受限**
需依賴原廠提供的 Plugin 或自行開發複雜腳本。 |
+
+兩者在實務上常並存,依工作負載性質選擇合適工具。
+
+---
+
+## 5. 總結
+
+Airflow 能夠實現:
+
+1. **自動化**:將手動的腳本轉化為自動執行的流程。
+2. **標準化**:用統一的 Python 語法定義所有資料處理邏輯。
+3. **視覺化**:讓複雜的資料依賴關係變得清晰可見。
+4. **可靠性**:自動重試 (Retry)、超時控制 (Timeout) 與 警報通知 (Alerting)。
diff --git a/k8s-airflow-HA基礎架構說明.md b/02-k8s-airflow-HA基礎架構說明.md
similarity index 100%
rename from k8s-airflow-HA基礎架構說明.md
rename to 02-k8s-airflow-HA基礎架構說明.md
diff --git a/infra/airflow/airflow-on-k8s-deploy-evaluate.md b/03-airflow-on-k8s-deploy-evaluate.md
similarity index 100%
rename from infra/airflow/airflow-on-k8s-deploy-evaluate.md
rename to 03-airflow-on-k8s-deploy-evaluate.md
diff --git a/QUICK_COMMANDS.md b/QUICK_COMMANDS.md
new file mode 100644
index 0000000..b29ab67
--- /dev/null
+++ b/QUICK_COMMANDS.md
@@ -0,0 +1,511 @@
+# K8s Airflow HA - 快速命令參考
+
+簡潔實用的版本查詢、系統診斷與故障排查命令集。
+
+**更新日期**: 2026-01-30
+
+---
+
+## 📋 目錄
+
+1. [快速查詢](#快速查詢-一句話查詢)
+2. [版本查詢](#版本查詢)
+3. [基本診斷](#基本診斷)
+4. [Airflow 診斷](#airflow-診斷)
+5. [PostgreSQL + Patroni + Etcd 診斷](#postgresql--patroni--etcd-診斷)
+6. [RabbitMQ 診斷](#rabbitmq-診斷-若使用-celeryexecutor)
+7. [Kubernetes 密鑰與配置](#kubernetes-密鑰與配置檢查)
+8. [NFS 與存儲](#nfs-與存儲診斷)
+9. [網路連通性](#網路連通性)
+10. [效能監控](#效能監控)
+11. [故障排查](#故障排查)
+12. [快速診斷腳本](#快速診斷腳本)
+13. [速查表](#速查表)
+
+---
+
+## 📋 快速查詢 (一句話查詢)
+
+```bash
+# Kubernetes 版本
+kubectl version --short
+
+# Airflow 版本
+kubectl exec -n airflow -it $(kubectl get pods -n airflow -l component=webserver -o jsonpath='{.items[0].metadata.name}') -- airflow version
+
+# 節點狀態
+kubectl get nodes -o wide
+
+# Airflow Pods
+kubectl get pods -n airflow -o wide
+
+# 資源使用
+kubectl top nodes && kubectl top pods -n airflow
+```
+
+---
+
+## 📦 版本查詢
+
+### Kubernetes 版本
+```bash
+kubectl version --short
+kubectl version --output yaml
+kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name} {.status.nodeInfo.kubeletVersion}{"\n"}{end}'
+ssh -i ~/.ssh/id_rsa root@10.10.0.85 'kubeadm version -o short && kubelet --version'
+```
+
+### Airflow 版本
+```bash
+kubectl exec -n airflow -it $(kubectl get pods -n airflow -l component=webserver -o jsonpath='{.items[0].metadata.name}') -- airflow version
+source .venv/bin/activate && airflow version
+python -c "import airflow; print(f'Airflow: {airflow.__version__}')"
+```
+
+### Helm & 容器映像
+```bash
+helm version
+helm list -n airflow
+helm status airflow -n airflow
+kubectl get pods -n airflow -o jsonpath='{.items[*].spec.containers[*].image}' | tr ' ' '\n' | sort -u
+```
+
+---
+
+## 🔍 基本診斷
+
+### 節點與 Pod
+```bash
+kubectl get nodes -o wide
+kubectl get pods -n airflow -o wide
+kubectl describe nodes
+kubectl get events -A --sort-by='.lastTimestamp' | tail -30
+```
+
+### Pod 狀態檢查
+```bash
+kubectl get pods -n airflow -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.phase}{"\n"}{end}'
+kubectl get pods -n airflow -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.containerStatuses[0].restartCount}{"\n"}{end}'
+kubectl describe pod -n airflow
+```
+
+---
+
+## 🔧 Airflow 診斷
+
+### Pod 檢查
+```bash
+kubectl get pods -n airflow -o wide
+kubectl get pods -n airflow -l component=webserver
+kubectl get pods -n airflow -l component=scheduler
+kubectl get pods -n airflow -l component=worker
+```
+
+### 日誌查看
+```bash
+kubectl logs -f deployment/airflow-scheduler -n airflow
+kubectl logs -f deployment/airflow-webserver -n airflow
+kubectl logs -n airflow --tail=100
+kubectl logs deployment/airflow-scheduler -n airflow | grep -i error
+```
+
+### DAG 檢查
+```bash
+kubectl exec -it -n airflow -- airflow dags list
+kubectl exec -it -n airflow -- airflow dags info
+kubectl exec -it -n airflow -- ls -la /opt/airflow/dags
+```
+
+### 配置檢查
+```bash
+helm get values airflow -n airflow
+kubectl get configmap -n airflow
+kubectl get secrets -n airflow
+kubectl exec -it -n airflow -- env | grep AIRFLOW
+```
+
+---
+
+## 🗄️ PostgreSQL + Patroni + Etcd 診斷
+
+### PostgreSQL 版本與狀態
+
+```bash
+# 直接連線檢查版本
+ssh -i ~/.ssh/id_rsa root@10.10.0.85
+psql --version
+psql -U postgres -c "SELECT version();"
+
+# 檢查 PostgreSQL 服務狀態
+sudo systemctl status postgresql
+sudo systemctl status patroni
+
+# 查看 PostgreSQL 日誌
+sudo journalctl -u postgresql -n 50
+sudo journalctl -u patroni -n 50
+```
+
+### Patroni 狀態與檢查
+
+```bash
+# 查看 Patroni 叢集狀態
+sudo patronictl -c /etc/patroni.yml list
+
+# 查看 Patroni 配置
+sudo cat /etc/patroni.yml
+
+# 檢查 Patroni 服務
+sudo systemctl status patroni
+sudo systemctl start patroni
+sudo systemctl restart patroni
+
+# Patroni API 狀態
+curl http://10.10.0.85:8008/health
+curl http://10.10.0.85:8008/leader
+curl http://10.10.0.85:8008/cluster
+
+# 查看 Patroni 日誌
+sudo journalctl -u patroni -f
+```
+
+### Etcd 叢集診斷
+
+```bash
+# 檢查 Etcd 版本
+etcd --version
+etcdctl version
+
+# 查看 Etcd 成員
+sudo ETCDCTL_API=3 etcdctl --endpoints=http://127.0.0.1:12379 member list
+
+# 查看 Etcd 健康狀態
+sudo ETCDCTL_API=3 etcdctl --endpoints=http://127.0.0.1:12379 endpoint health
+sudo ETCDCTL_API=3 etcdctl --endpoints=http://127.0.0.1:12379 endpoint status
+
+# 查看 Etcd 中的 Patroni 鍵
+sudo ETCDCTL_API=3 etcdctl --endpoints=http://127.0.0.1:12379 get "" --prefix | grep patroni
+
+# 查看 Etcd 日誌
+sudo journalctl -u etcd-patroni -f
+
+# 檢查 Etcd 服務
+sudo systemctl status etcd-patroni
+sudo systemctl restart etcd-patroni
+
+# Etcd 性能測試
+sudo ETCDCTL_API=3 etcdctl --endpoints=http://127.0.0.1:12379 check perf
+```
+
+### 數據庫操作
+
+```bash
+# 連線到主資料庫
+psql -h 10.10.0.85 -U postgres -d postgres
+
+# 查看複製狀態
+psql -U postgres -c "SELECT client_addr, state, write_lag FROM pg_stat_replication;"
+
+# 查看 WAL 接收器狀態
+psql -U postgres -c "SELECT * FROM pg_stat_wal_receiver;"
+
+# 查看資料庫列表
+psql -U postgres -c "\l"
+
+# 查看資料表
+psql -U postgres -d airflow_db -c "\dt"
+
+# 查看複製使用者
+psql -U postgres -c "SELECT usename, usesuper, usereplication FROM pg_user WHERE usereplication = true;"
+
+# 查看連接
+psql -U postgres -c "SELECT datname, usename, state FROM pg_stat_activity WHERE state IS NOT NULL GROUP BY datname, usename, state;"
+```
+
+### 自動容錯轉移測試
+
+```bash
+# 模擬主節點故障進行容錯轉移
+# 1. 查看當前主節點
+sudo patronictl -c /etc/patroni.yml list
+
+# 2. 停止主節點 Patroni 服務
+sudo systemctl stop patroni
+
+# 3. 觀察叢集自動選舉新主節點
+sleep 5
+sudo patronictl -c /etc/patroni.yml list
+
+# 4. 重新啟動原主節點
+sudo systemctl start patroni
+```
+
+### Patroni 節點重新初始化
+
+```bash
+# 列出所有節點
+sudo patronictl -c /etc/patroni.yml list
+
+# 對特定節點進行重新初始化 (會從主節點同步資料)
+sudo patronictl -c /etc/patroni.yml reinit pgcluster node2
+
+# 清除節點數據重新初始化
+sudo rm -rf /var/lib/postgresql/18/main
+sudo patronictl -c /etc/patroni.yml reinit pgcluster node2
+```
+
+### PostgreSQL 連線測試
+
+```bash
+# 從 Kubernetes Pod 連線到 PostgreSQL
+kubectl exec -it -n airflow -- bash
+psql -h 10.10.0.85 -p 5432 -U postgres -d airflow_db -c "SELECT version();"
+
+# 測試複製連接
+psql -h 10.10.0.87 -U replicator -c "SELECT 1;" 2>&1
+
+# 檢查 Airflow 資料庫連線
+psql -h 10.10.0.85 -p 5432 -U airflow_user -d airflow_db -c "SELECT COUNT(*) FROM dag;"
+```
+
+---
+
+## 🔌 RabbitMQ 診斷 (若使用 CeleryExecutor)
+
+### RabbitMQ 狀態檢查
+
+```bash
+# 查看 RabbitMQ Pod
+kubectl get pods -n airflow -l app=rabbitmq
+
+# 查看 RabbitMQ 狀態
+kubectl exec -it -n airflow -- rabbitmqctl status
+
+# 查看隊列
+kubectl exec -it -n airflow -- rabbitmqctl list_queues
+
+# 查看使用者
+kubectl exec -it -n airflow -- rabbitmqctl list_users
+
+# 查看權限
+kubectl exec -it -n airflow -- rabbitmqctl list_permissions
+
+# RabbitMQ 日誌
+kubectl logs -f -n airflow
+```
+
+### RabbitMQ 管理界面
+
+```bash
+# Port-forward to RabbitMQ management UI
+kubectl port-forward svc/airflow-rabbitmq 15672:15672 -n airflow
+
+# 在瀏覽器中打開: http://localhost:15672
+# 預設使用者: user
+# 預設密碼: bitnami (或檢查 Helm values)
+```
+
+---
+
+## 🔐 Kubernetes 密鑰與配置檢查
+
+### ConfigMap 與 Secrets
+
+```bash
+# 查看所有 ConfigMap
+kubectl get configmap -n airflow
+
+# 查看特定 ConfigMap
+kubectl describe configmap -n airflow
+
+# 查看所有 Secrets
+kubectl get secrets -n airflow
+
+# 查看特定 Secret
+kubectl describe secret -n airflow
+
+# 解碼 Secret 內容
+kubectl get secret -n airflow -o jsonpath='{.data.password}' | base64 -d
+
+# 檢查 Airflow 數據庫連線 Secret
+kubectl get secret airflow-postgresql -n airflow -o yaml
+```
+
+---
+
+## 📝 NFS 與存儲診斷
+
+### NFS 存儲檢查
+
+```bash
+# 查看 PVC 狀態
+kubectl get pvc -n airflow
+
+# 查看 PV 狀態
+kubectl get pv
+
+# 查看 PVC 詳情
+kubectl describe pvc -n airflow
+
+# 檢查存儲類別
+kubectl get storageclass
+
+# 測試 NFS 掛載
+kubectl exec -it -n airflow -- df -h
+kubectl exec -it -n airflow -- ls -la /opt/airflow/dags
+kubectl exec -it -n airflow -- ls -la /opt/airflow/logs
+```
+
+### 主機 NFS 操作
+
+```bash
+# 檢查 NFS 掛載點
+mount | grep nfs
+
+# 檢查 NFS 服務
+showmount -e
+
+# 手動掛載測試
+sudo mount -t nfs :/path /mnt/test
+
+# 檢查 NFS 日誌
+sudo journalctl -u nfs-server
+```
+
+---
+
+
+
+### Pod 與 Service 測試
+```bash
+kubectl get pod -n airflow -o jsonpath='{.status.podIP}'
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- ping
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- nc -zv :
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- nslookup kubernetes.default
+
+kubectl get svc -n airflow
+kubectl describe svc -n airflow
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- nc -zv airflow-webserver.airflow 8080
+```
+
+### Node 網路檢查
+```bash
+ssh -i ~/.ssh/id_rsa root@10.10.0.85
+ip addr show
+ip route show
+ip -d link show | grep flannel
+ping 8.8.8.8
+```
+
+---
+
+## 📊 效能監控
+
+### 資源使用
+```bash
+kubectl top nodes
+kubectl top pods -n airflow
+kubectl top pods -A --sort-by=memory
+kubectl top pods -A --sort-by=cpu
+
+# 資源總和
+kubectl top pods -n airflow --no-headers | awk '{cpu+=$2; mem+=$3} END {print "CPU: " cpu "m\nMem: " mem "Mi"}'
+```
+
+### 節點容量
+```bash
+kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.allocatable.cpu}{"\t"}{.status.allocatable.memory}{"\n"}{end}'
+kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.capacity.cpu}{"\t"}{.status.capacity.memory}{"\n"}{end}'
+```
+
+---
+
+## ⚠️ 故障排查
+
+### 常見診斷
+```bash
+kubectl describe pod -n airflow
+kubectl logs -n airflow
+kubectl logs -n airflow --previous
+kubectl get events -n airflow --sort-by='.lastTimestamp'
+```
+
+### 重啟與清理
+```bash
+kubectl rollout restart deployment/airflow-scheduler -n airflow
+kubectl rollout restart deployment/airflow-webserver -n airflow
+kubectl delete pod --field-selector status.phase=Failed -n airflow
+kubectl rollout history deployment/airflow-scheduler -n airflow
+kubectl rollout undo deployment/airflow-scheduler -n airflow
+```
+
+### 磁碟與日誌
+```bash
+df -h
+du -sh /var/lib/docker/*
+du -sh /var/lib/containerd/*
+docker system prune -a
+sudo journalctl --vacuum=500M
+```
+
+---
+
+## 🤖 快速診斷腳本
+
+### 健康檢查
+```bash
+#!/bin/bash
+echo "=== K8s 版本 ==="
+kubectl version --short
+
+echo "=== 節點狀態 ==="
+kubectl get nodes -o wide
+
+echo "=== Airflow Pods ==="
+kubectl get pods -n airflow
+
+echo "=== 叢集事件 ==="
+kubectl get events -A --sort-by='.lastTimestamp' | tail -10
+
+echo "=== 資源使用 ==="
+kubectl top nodes 2>/dev/null || echo "Metrics Server 未部署"
+```
+
+### 故障排查
+```bash
+#!/bin/bash
+POD_NAME=$1
+NAMESPACE=${2:-airflow}
+
+echo "=== Pod 狀態 ==="
+kubectl describe pod $POD_NAME -n $NAMESPACE
+
+echo "=== 最近日誌 ==="
+kubectl logs $POD_NAME -n $NAMESPACE --tail=50
+
+echo "=== 環境變數 ==="
+kubectl exec $POD_NAME -n $NAMESPACE -- env | head -20
+
+echo "=== Events ==="
+kubectl get events -n $NAMESPACE | grep $POD_NAME
+```
+
+---
+
+## 📚 速查表
+
+| 用途 | 命令 |
+|:-----|:-----|
+| K8s 版本 | `kubectl version --short` |
+| Airflow 版本 | `kubectl exec -n airflow ... airflow version` |
+| 所有節點 | `kubectl get nodes -o wide` |
+| 所有 Pod | `kubectl get pods -n airflow` |
+| Pod 日誌 | `kubectl logs -n airflow` |
+| 進入 Pod | `kubectl exec -it -n airflow -- bash` |
+| 資源使用 | `kubectl top nodes && kubectl top pods -n airflow` |
+| 重啟 Scheduler | `kubectl rollout restart deployment/airflow-scheduler -n airflow` |
+| Helm 配置 | `helm get values airflow -n airflow` |
+| 叢集事件 | `kubectl get events -A --sort-by='.lastTimestamp'` |
+
+---
+
+**Last Updated**: 2026-01-30
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..849ac39
--- /dev/null
+++ b/README.md
@@ -0,0 +1,60 @@
+# k8s Airflow HA 部署指南
+
+本專案提供了一套基礎的高可用 (HA) Apache Airflow on k8s 部署方案。以下文件按建議閱讀與建置順序排列,請依序參考執行。
+
+## 1. 架構與概念 (Architecture & Concepts)
+
+建立對整體架構的認識:
+
+1. **[Airflow 功能介紹](01-airflow功能介紹.md)**
+ * 了解 Airflow 的核心功能、元件 (DAG, Operator, Task) 與應用場景。
+2. **[K8s Airflow HA 基礎架構說明](02-k8s-airflow-HA基礎架構說明.md)**
+ * 詳述本專案採用的多層級 HA 架構 (Ingress, K8s Control Plane, Airflow Components, Database)。
+3. **[Airflow on K8s 部署評估](03-airflow-on-k8s-deploy-evaluate.md)**
+ * 探討不同的部署策略,以及為何選擇將 Metadata DB 部署於 Bare Metal 而非 K8s 內部的考量。
+
+---
+
+## 2. 基礎設施建置 (Infrastructure Setup)
+
+在部署 Airflow 之前,必須先準備好底層的運算與儲存資源:
+
+1. **高可用負載平衡 (Load Balancer)**
+ * **[Keepalived & HAProxy 安裝指南](infra/keepalived_haproxy/04-keepalived&haproxy-install-guide.md)**: **建議最先建置**。提供 VIP (`10.10.0.83`) 與負載平衡,統一管理 K8s API, DB, Web UI, Doris, RabbitMQ 的流量入口。
+
+2. **Kubernetes 叢集建置**
+ * **[Kubernetes 安裝指南](infra/kubernetes/05-k8s-install-guide.md)**: 基於上述 VIP 建置高可用 Control Plane。
+
+
+2. **儲存系統 (Storage)**
+ * **[NFS 安裝指南](infra/airflow/nfs/06-nfs-install-guide.md)**: 建立 NFS Server 並配置 K8s NFS CSI Driver,供 Airflow DAGs/Logs 與 RabbitMQ 使用。
+
+---
+
+## 3. 外部服務建置 (External Services)
+
+Airflow 的核心元件依賴於外部資料庫與訊息佇列:
+
+1. **Metadata Database**
+ * **[PostgreSQL + Patroni + Etcd 安裝指南](infra/postgres/07-postgresql&patroni&etcd-install-guide.md)**: 建置高可用的 PostgreSQL 叢集作為 Airflow Metadata DB。
+
+2. **Message Broker**
+ * **[RabbitMQ 安裝指南](infra/airflow/rabbitmq/08-rabbitmq-install-guide.md)**: 在 K8s 上部署 RabbitMQ Cluster,作為 CeleryExecutor 的訊息中間件。
+
+3. **Container Registry**
+ * **[Container Registry 安裝指南](infra/registry/09-registry-install-guide.md)**: 在 `10.10.0.85:50000` 建置私有 Registry,供存放 Airflow 客製化 Image。
+ * (若需做HA,請參考) **[Harbor Registry 安裝指南](infra/registry/harbor-install-guide.md)**: 部署高可用 Harbor Registry (Helm),整合外部 PostgreSQL 與 NFS 儲存。
+
+---
+
+## 4. Airflow 部署 (Airflow Deployment)
+
+當上述依賴服務皆準備就緒後,即可進行 Airflow 的安裝:
+
+* **[Airflow on K8s HA 安裝指南](infra/airflow/10-airflow-install-guide.md)**
+ * **核心文件**。整合了上述所有資源,說明如何:
+ * 建立 Airflow 專用的 Namespace 與 Secrets。
+ * 建立並綁定固定的 PV/PVC (DAGs/Logs)。
+ * 使用 Helm Chart (配置 CeleryExecutor) 部署 Airflow。
+ * 驗證服務運作與 Web UI 存取。
+
diff --git a/TEST_ENV_HOSTS.md b/TEST_ENV_HOSTS.md
new file mode 100644
index 0000000..90f0dc2
--- /dev/null
+++ b/TEST_ENV_HOSTS.md
@@ -0,0 +1,360 @@
+# K8s Airflow HA 測試環境 - 主機清單
+
+
+**更新日期**: 2026-01-30
+
+---
+
+## 📋 目錄
+
+1. [測試環境概覽](#測試環境概覽)
+2. [測試主機清單](#測試主機清單)
+ - [虛擬 IP (VIP) - Keepalived](#虛擬-ip-vip---keepalived)
+ - [Master 節點](#master-節點-control-plane)
+ - [Worker 節點](#worker-節點)
+4. [服務與埠號](#服務與埠號)
+5. [連線方式](#連線方式)
+ - [VIP 連線 (推薦)](#vip-連線-推薦)
+6. [常用命令](#常用命令)
+7. [故障排查](#故障排查)
+
+---
+
+## 📊 測試環境概覽
+
+| 項目 | 詳情 |
+|:----------------------|:--------------------------|
+| **部署位置** | 單一實體 Server |
+| **虛擬機數量** | 7 台 (3 Master + 4 Worker) |
+| **OS** | Ubuntu 24.04 LTS |
+| **Container Runtime** | containerd |
+| **K8s 版本** | v1.30.14 |
+| **Airflow 版本** | 3.0.2 |
+| **部署方式** | Helm + kubeadm |
+| **Executor** | CeleryExecutor |
+
+---
+
+## 🖥️ 測試主機清單
+
+### 虛擬 IP (VIP) - Keepalived
+
+| 項目 | 詳情 |
+|:-----|:---|
+| **VIP 地址** | `10.10.0.83` |
+| **實現方式** | Keepalived + HAProxy |
+| **狀態** | ✅ 已部署 |
+| **優先級分配** | f01(100) > f02(90) > f03(80) |
+
+**VIP 服務轉發配置** (HAProxy):
+
+| 服務 | VIP 埠 | 後端目標 | 用途 |
+|:-----|:------|:--------|:-----|
+| **Kubernetes API** | 6444 | 10.10.0.85/87/89:6443 | K8s Control Plane 統一入口 |
+| **PostgreSQL (主寫)** | 5000 | 10.10.0.85/87/89:5432 | 資料庫讀寫連線 (主節點優先) |
+| **PostgreSQL (備讀)** | 5001 | 10.10.0.85/87/89:5432 | 資料庫唯讀連線 (副本負載均衡) |
+| **Airflow WebUI** | 8080 | 10.10.0.85/87/89:30080 | Airflow 網頁介面 |
+| **RabbitMQ 管理** | 15672 | 10.10.0.85/87/89:31672 | RabbitMQ 管理介面 |
+| **Doris MySQL** | 9031 | 10.10.0.85/87/89:9030 | Doris 前端節點存取 |
+| **監控統計** | 8404 | HAProxy | HAProxy 統計頁面 |
+
+---
+
+### Master 節點 (Control Plane)
+
+| Host | IP | 角色 | CPU | 記憶體 | 磁碟 | Keepalived |
+|:--------------|:-------------|:-----|:----|:------|:-----|:----------|
+| **doris-f01** | `10.10.0.85` | Master #1 (初始) | 8 核 | 15GB | 118GB | Master (100) |
+| **doris-f02** | `10.10.0.87` | Master #2 (副本) | 8 核 | 15GB | 118GB | Backup (90) |
+| **doris-f03** | `10.10.0.89` | Master #3 (副本) | 8 核 | 15GB | 118GB | Backup (80) |
+
+### Worker 節點
+
+| Host | IP | 角色 | CPU | 記憶體 | 磁碟 |
+|:--------------|:-------------|:-----|:----|:-----|:-----|
+| **doris-b01** | `10.10.0.93` | Worker #1 | 4 核 | 15GB | 118GB |
+| **doris-b02** | `10.10.0.94` | Worker #2 | 4 核 | 15GB | 118GB |
+| **doris-b03** | `10.10.0.95` | Worker #3 | 4 核 | 15GB | 118GB |
+| **doris-b04** | `10.10.0.96` | Worker #4 | 4 核 | 15GB | 118GB |
+
+---
+
+## 🔌 服務與埠號
+
+### Kubernetes 服務埠
+
+| 服務 | 埠 | 節點 | 用途 |
+|:-----|:--|:-----|:-----|
+| **API Server** | 6443 | Master | kubectl 連線、API 呼叫 |
+| **Kubelet** | 10250 | 所有節點 | Node 狀態、Pod 管理 |
+| **Etcd** | 2379, 2380 | Master | K8s 內部狀態儲存 |
+| **Scheduler** | 10259 | Master | Pod 排程(內部用) |
+| **Controller Manager** | 10257 | Master | 控制器(內部用) |
+
+### Airflow 應用埠
+
+| 服務 | 埠 | 節點 | 用途 |
+|:-----|:--|:-----|:-----|
+| **Webserver** | 30080 | NodePort | Airflow UI 存取 |
+| **Flower (可選)** | 30555 | NodePort | Celery 監控 UI |
+
+### PostgreSQL
+
+| 服務 | 埠 | 用途 |
+|:-----|:--|:-----|
+| **PostgreSQL** | 5432 | 資料庫連線 |
+
+### RabbitMQ (若使用 CeleryExecutor)
+
+| 服務 | 埠 | 用途 |
+|:-----|:--|:-----|
+| **AMQP** | 5672 | 訊息佇列 |
+| **Management UI** | 15672 | 管理介面 |
+
+---
+
+## 🔐 連線方式
+
+### VIP 連線 (推薦)
+
+```bash
+# ========== Kubernetes API Server (埠 6444) ==========
+# 使用 VIP 連線到 Kubernetes API Server
+kubectl --server=https://10.10.0.83:6444 get nodes --insecure-skip-tls-verify
+
+# 驗證 VIP 連通性
+curl -k https://10.10.0.83:6444/version
+
+# ========== PostgreSQL (主寫 - 埠 5000) ==========
+# 連線到主資料庫進行讀寫
+psql -h 10.10.0.83 -p 5000 -U postgres -d airflow_db
+
+# ========== PostgreSQL (備讀 - 埠 5001) ==========
+# 連線到副本資料庫進行唯讀查詢(負載均衡)
+psql -h 10.10.0.83 -p 5001 -U postgres -d airflow_db
+
+# ========== Airflow WebUI (埠 8080) ==========
+# 在瀏覽器中訪問 Airflow
+# http://10.10.0.83:8080
+
+# ========== RabbitMQ 管理介面 (埠 15672) ==========
+# 訪問 RabbitMQ 管理控制臺
+# http://10.10.0.83:15672
+# 帳號/密碼: airflow/airflow
+
+# ========== Doris MySQL (埠 9031) ==========
+# 連線到 Doris 前端節點
+mysql -h 10.10.0.83 -P 9031 -u root -p
+
+# ========== HAProxy 監控統計 (埠 8404) ==========
+# 查看 HAProxy 統計頁面
+ 在瀏覽器中訪問: http://10.10.0.83:8404/stats
+```
+
+#### Airflow WebUI 存取
+
+**使用 VIP (推薦) - HAProxy 埠 8080**
+```bash
+# 在瀏覽器中訪問 Airflow (透過 VIP)
+http://10.10.0.83:8080
+```
+
+#### PostgreSQL 連線
+
+```bash
+# ========== 主寫資料庫 (埠 5000) ==========
+# 連線到主資料庫進行讀寫
+psql -h 10.10.0.83 -p 5000 -U postgres -d airflow_db
+
+# ========== 備讀資料庫 (埠 5001) ==========
+# 連線到副本資料庫進行唯讀查詢 (負載均衡)
+psql -h 10.10.0.83 -p 5001 -U postgres -d airflow_db
+```
+
+---
+
+## 🛠️ 常用命令
+
+### Kubernetes 基本命令
+
+```bash
+# 查看所有節點
+kubectl get nodes -o wide
+
+# 查看所有 Pod
+kubectl get pods -A
+
+# 查看 Airflow namespace 的 Pod
+kubectl get pods -n airflow
+
+# 查看 Pod 詳細資訊
+kubectl describe pod -n airflow
+
+# 查看 Pod 日誌
+kubectl logs -n airflow
+
+# 進入 Pod
+kubectl exec -it -n airflow -- bash
+
+# 查看所有 Service
+kubectl get svc -A
+
+# 查看節點資源使用
+kubectl top nodes
+kubectl top pods -n airflow
+```
+
+### Airflow 特定命令
+
+```bash
+# 進入 Airflow scheduler
+kubectl exec -it -n airflow -- airflow scheduler list-dag
+
+# 進入 Airflow webserver Pod
+kubectl exec -it -n airflow -- bash
+
+# 查看 Airflow 日誌
+kubectl logs -f deployment/airflow-scheduler -n airflow
+kubectl logs -f deployment/airflow-webserver -n airflow
+
+# 查看 Airflow DAG
+kubectl exec -it -n airflow -- airflow dags list
+```
+
+### 診斷命令
+
+```bash
+# 查看 K8s 事件
+kubectl get events -A
+
+# 查看節點詳細狀態
+kubectl describe nodes
+
+# 檢查 DNS
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- nslookup kubernetes.default
+
+# 測試連通性
+kubectl run -it --rm netshoot --image=nicolaka/netshoot --restart=Never -- bash
+```
+
+---
+
+## 🔧 故障排查
+
+### 節點無法加入集群
+
+**症狀**: `kubeadm join` 執行失敗
+
+```bash
+# 檢查 kubelet 狀態
+systemctl status kubelet
+journalctl -xe -u kubelet
+
+# 重置 kubeadm (若需要重新開始)
+sudo kubeadm reset
+sudo iptables -F
+sudo iptables -t nat -F
+sudo iptables -t mangle -F
+sudo ipvsadm --clear
+```
+
+### Pod 無法啟動
+
+```bash
+# 查看 Pod 狀態
+kubectl describe pod -n airflow
+
+# 查看 Pod 日誌
+kubectl logs -n airflow
+
+# 常見原因:
+# - 記憶體不足
+# - 磁碟空間不足
+# - 映像拉取失敗
+# - 資源請求超過節點配置
+```
+
+### 網路連通性問題
+
+```bash
+# 測試 Pod 間連通性
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- ping
+
+# 檢查 Service 連通性
+kubectl run -it --rm debug --image=busybox:1.28 --restart=Never -- nc -zv airflow-webserver.airflow 8080
+
+# 查看 CNI 外掛狀態
+kubectl get pods -n calico-system
+```
+
+### 磁碟空間不足
+
+```bash
+# 檢查磁碟使用
+df -h
+du -sh /var/lib/docker/*
+du -sh /var/lib/containerd/*
+
+# 清理未使用的 Pod 和容器
+kubectl delete pod --field-selector status.phase=Failed -A
+kubectl delete pod --field-selector status.phase=Succeeded -A
+
+# 清理本地容器日誌 (Host Machine)
+sudo journalctl --vacuum=500M
+
+# 清理舊的 containerd 數據
+sudo ctr images prune
+```
+
+### 記憶體不足
+
+```bash
+# 檢查記憶體使用
+free -h
+kubectl top nodes
+kubectl top pods -A --sort-by=memory
+
+# 檢查 Pod 限制配置
+kubectl get pod -o yaml | grep -A 5 resources
+
+# 若 Master 記憶體不足,減少不必要的 Pod
+# 關閉監控、Addon 等非必要服務
+```
+
+---
+
+## 📝 建議與注意事項
+
+### 初期部署建議
+
+1. **從最小配置開始**: 先用 1 個 Master + 1 個 Worker 驗證功能
+2. **逐步擴展**: 驗證通過後再添加更多節點
+3. **資源監控**: 持續監控 CPU、記憶體、磁碟使用
+4. **日誌收集**: 定期檢查日誌,及時發現問題
+
+### 常見坑點
+
+- ❌ 忘記關閉 swap → K8s 無法啟動
+- ❌ 主機名衝突 → 節點加入失敗
+- ❌ 網路配置錯誤 → Pod 無法通訊
+- ❌ 磁碟空間不足 → 容器無法啟動
+- ❌ 記憶體不足 → Pod Eviction
+
+### 後續優化
+
+- [ ] 添加 VIP / Keepalived 進行 HA 測試
+- [ ] 部署 PostgreSQL 進行完整功能測試
+- [ ] 部署 RabbitMQ 測試 CeleryExecutor
+- [ ] 設定持久化儲存 (NFS 或 hostPath)
+- [ ] 配置監控 (Prometheus + Grafana)
+- [ ] 設定 DAG 自動部署
+
+---
+
+## 📚 相關文檔
+
+- [Kubernetes 官方文檔](https://kubernetes.io/docs/)
+- [Airflow Helm Chart](https://airflow.apache.org/docs/helm-chart/)
+- [Calico 官方文檔](https://docs.tigera.io/calico/)
+- [本專案安裝指南](infra/kubernetes/05-k8s-install-guide.md)
+
diff --git a/infra/airflow/10-airflow-install-guide.md b/infra/airflow/10-airflow-install-guide.md
new file mode 100644
index 0000000..2136c74
--- /dev/null
+++ b/infra/airflow/10-airflow-install-guide.md
@@ -0,0 +1,571 @@
+# Airflow on k8s HA Installation Guide
+
+本文件說明如何在 k8s cluster 中部署高可用的 Airflow。
+
+## 0. 前置需求
+
+- **Kubernetes**: 已部署且運作正常。
+- **PostgreSQL**: 已部署且運作正常。
+- **RabbitMQ**: 已部署且運作正常。
+
+---
+
+## 1. 基礎配置
+
+### 1.0 準備 NFS 儲存目錄
+
+在部署 Airflow 之前,必須先在 NFS Server (10.10.0.85) 上建立所需的目錄結構並設定正確權限。
+
+**建立 Airflow 目錄結構:**
+
+```bash
+# 建立 dags 和 logs 目錄
+sudo mkdir -p /srv/nfs/airflow/{dags,logs}
+
+# 設定權限 (50000:0 是 Airflow 容器內的預設 UID:GID)
+sudo chown -R 50000:0 /srv/nfs/airflow
+sudo chmod -R 775 /srv/nfs/airflow
+
+# 驗證目錄權限
+ls -ld /srv/nfs/airflow/{dags,logs}
+# 預期輸出:
+# drwxrwxr-x 2 50000 root 4096 Feb 3 19:30 /srv/nfs/airflow/dags
+# drwxrwxr-x 2 50000 root 4096 Feb 3 19:30 /srv/nfs/airflow/logs
+```
+
+**重新載入 NFS 匯出配置:**
+
+```bash
+# 確認 /etc/exports 包含以下配置:
+# /srv/nfs/airflow 10.10.0.0/16(rw,sync,no_subtree_check,no_root_squash)
+
+# 重新載入 NFS 匯出
+sudo exportfs -ra
+
+# 驗證匯出狀態
+sudo exportfs -v | grep airflow
+# 預期輸出:
+# /srv/nfs/airflow 10.10.0.0/16(rw,wdelay,no_root_squash,no_subtree_check,...)
+```
+
+> **重要提醒**:
+> - NFS 匯出配置 (`/etc/exports`) 只定義掛載權限,不會自動建立目錄
+> - 必須手動建立目錄並設定正確的 UID/GID (50000:0)
+> - 確保目錄權限為 775,讓 Airflow 容器可以寫入
+
+---
+
+### 1.1 Storage Class 設定
+
+**1. 安裝 NFS CSI Driver:**
+
+```bash
+helm repo add csi-driver-nfs https://raw.githubusercontent.com/kubernetes-csi/csi-driver-nfs/master/charts
+helm repo update
+helm upgrade --install csi-driver-nfs csi-driver-nfs/csi-driver-nfs -n kube-system
+```
+
+**2. 建立 StorageClass:**
+
+
+```bash
+sudo vi nfs-airflow-storage-class.yml
+```
+
+```yaml
+apiVersion: storage.k8s.io/v1
+kind: StorageClass
+metadata:
+ name: nfs-airflow
+provisioner: nfs.csi.k8s.io
+parameters:
+ server: 10.10.0.85
+ share: /srv/nfs/airflow
+reclaimPolicy: Retain
+volumeBindingMode: Immediate
+```
+
+執行套用:
+
+```bash
+kubectl apply -f nfs-airflow-storage-class.yml
+```
+
+### 1.2 建立 Airflow 固定的 PV/PVC
+
+建立 `airflow-dags-storage.yml`:
+
+```bash
+sudo vi airflow-dags-storage.yml
+```
+
+```yaml
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+ name: airflow-dags-pv
+spec:
+ capacity:
+ storage: 10Gi
+ volumeMode: Filesystem
+ accessModes:
+ - ReadWriteMany
+ persistentVolumeReclaimPolicy: Retain
+ nfs:
+ path: /srv/nfs/airflow/dags
+ server: 10.10.0.85
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: airflow-dags-pvc
+ namespace: airflow
+spec:
+ storageClassName: nfs-airflow
+ accessModes:
+ - ReadWriteMany
+ resources:
+ requests:
+ storage: 10Gi
+```
+
+建立 `airflow-logs-storage.yml`:
+
+```bash
+sudo vi airflow-logs-storage.yml
+```
+
+```yaml
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+ name: airflow-logs-pv
+spec:
+ capacity:
+ storage: 10Gi
+ volumeMode: Filesystem
+ accessModes:
+ - ReadWriteMany
+ persistentVolumeReclaimPolicy: Retain
+ nfs:
+ path: /srv/nfs/airflow/logs
+ server: 10.10.0.85
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: airflow-logs-pvc
+ namespace: airflow
+spec:
+ storageClassName: nfs-airflow
+ accessModes:
+ - ReadWriteMany
+ resources:
+ requests:
+ storage: 10Gi
+```
+
+執行套用:
+
+```bash
+kubectl create namespace airflow
+kubectl apply -f nfs-dags.yml
+kubectl apply -f airflow-logs-storage.yml
+```
+
+驗證 PV/PVC 狀態:
+
+```bash
+kubectl get pv | grep airflow
+kubectl get pvc -n airflow
+```
+
+### 1.3 PostgreSQL 設定
+
+**1. 建立資料庫與使用者:**
+
+連線至資料庫
+
+```bash
+psql -h 10.10.0.83 -p 5000 -U postgres
+```
+
+執行以下 SQL 指令:
+
+```sql
+-- 1. 建立使用者 airflow
+CREATE USER airflow WITH PASSWORD 'airflow';
+
+-- 2. 建立資料庫 airflow_db 並指定擁有者為 airflow
+CREATE DATABASE airflow_db OWNER airflow;
+
+-- 3. (選用) 授予權限
+GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow;
+GRANT ALL PRIVILEGES ON SCHEMA public TO airflow;
+GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
+GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO airflow;
+
+-- 授予未來建立的物件權限
+ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL PRIVILEGES ON TABLES TO airflow;
+ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL PRIVILEGES ON SEQUENCES TO airflow;
+```
+
+**2. 驗證連線:**
+
+確認可以使用新帳號連線:
+`psql -h 10.10.0.83 -p 5000 -U airflow -d airflow_db`
+
+### 1.4 Container Registry 設定
+
+**設定 Image Pull Secret (若需要):**
+
+若 Registry 需要驗證,請建立 Secret 並在 `values.yml` 中參照。
+
+```bash
+kubectl create secret docker-registry airflow-registry-secret \
+ --docker-server=10.10.0.85:50000 \
+ --docker-username=admin \
+ --docker-password= \
+ --namespace airflow
+```
+
+### 1.5 RabbitMQ 設定
+
+**建立 Airflow 專用帳號(若未建立):**
+
+建立 `airflow-rabbitmq-user.yaml` 並套用:
+
+```bash
+sudo vi airflow-rabbitmq-user.yaml
+```
+
+```yaml
+apiVersion: v1
+kind: Secret
+metadata:
+ name: airflow-rabbitmq-credentials
+ namespace: airflow
+type: Opaque
+stringData:
+ username: airflow
+ password: airflow
+---
+apiVersion: rabbitmq.com/v1beta1
+kind: User
+metadata:
+ name: airflow
+ namespace: airflow
+spec:
+ rabbitmqClusterReference:
+ name: airflow-rabbitmq-cluster
+ tags:
+ - management
+ credentials:
+ secretName: airflow-rabbitmq-credentials
+---
+apiVersion: rabbitmq.com/v1beta1
+kind: Permission
+metadata:
+ name: airflow-permission
+ namespace: airflow
+spec:
+ rabbitmqClusterReference:
+ name: airflow-rabbitmq-cluster
+ user: airflow
+ vhost: /
+ permissions:
+ configure: ".*"
+ write: ".*"
+ read: ".*"
+```
+
+執行套用:
+
+```bash
+kubectl apply -f airflow-rabbitmq-user.yaml
+```
+
+---
+
+## 2. 環境設定
+
+### 2.1 建立 Namespace
+
+為 Airflow 建立獨立的命名空間 (Namespace):
+
+```bash
+kubectl create namespace airflow # 如果尚未建立
+```
+
+### 2.2 設定節點標籤 (Node Labels)
+
+根據架構設計,Airflow 的控制元件 (Scheduler, Webserver) 將運行於 Control Plane 節點,而 Worker 運行於 Worker 節點。
+
+**Control Plane 節點 (doris-f01 ~ f03):**
+```bash
+# 確保這些節點有此標籤
+kubectl label node doris-f01 node-role.kubernetes.io/control-plane="" --overwrite
+kubectl label node doris-f02 node-role.kubernetes.io/control-plane="" --overwrite
+kubectl label node doris-f03 node-role.kubernetes.io/control-plane="" --overwrite
+```
+
+**Worker 節點 (doris-b01 ~ b04):**
+```bash
+# 確保這些節點有此標籤
+kubectl label node doris-b01 role=worker --overwrite
+kubectl label node doris-b02 role=worker --overwrite
+kubectl label node doris-b03 role=worker --overwrite
+kubectl label node doris-b04 role=worker --overwrite
+```
+
+### 2.3 建立 Kubernetes Secrets(若需要)
+
+為了安全起見,手動建立包含敏感資訊的 Secret,而不是直接寫在 Helm Values 中。
+
+**產生 Fernet Key:**
+```python
+python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
+# 輸出範例: rv638BORYwOheHEXB6JoROvDgR3r9vdrOHnYcQfl0gs=
+```
+
+**建立 Secret:**
+
+```bash
+kubectl create secret generic airflow-secrets \
+ --namespace airflow \
+ --from-literal=airflow-fernet-key='rv638BORYwOheHEXB6JoROvDgR3r9vdrOHnYcQfl0gs=' \
+ --from-literal=airflow-webserver-secret='this-must-be-a-long-random-string-fixed-for-ha' \
+ --from-literal=metadata-connection='postgresql://airflow:airflow@10.10.0.83:5000/airflow_db?sslmode=disable' \
+ --from-literal=result-backend-connection='postgresql://airflow:airflow@10.10.0.83:5000/airflow_db?sslmode=disable' \
+ --from-literal=broker-url='amqp://airflow:airflow@rabbitmq-cluster.rabbitmq-system.svc.cluster.local:5672//'
+```
+
+---
+
+## 3. 建置與推送 Docker Image
+
+由於我們需要安裝 `fping` 並給予 `NET_RAW` 權限,必須使用客製化的 Docker Image。
+
+### 3.1 準備 Dockerfile
+
+
+```bash
+sudo vi Dockerfile
+```
+
+```dockerfile
+FROM apache/airflow:3.0.2
+USER root
+RUN apt-get update && apt-get install -y --no-install-recommends fping iputils-ping libcap2-bin \
+ && setcap cap_net_raw+ep /usr/bin/fping && setcap cap_net_raw+ep /usr/bin/ping \
+ && apt-get clean && rm -rf /var/lib/apt/lists/*
+USER airflow
+RUN pip install --no-cache-dir ping3==4.0.8
+```
+
+### 3.2 建置並推送
+
+```bash
+# 1. 建置 Image
+podman build -t 10.10.0.85:50000/airflow-custom:1.0 .
+
+# 2. 推送至 Registry
+podman push 10.10.0.85:50000/airflow-custom:1.0
+```
+
+---
+
+## 4. 使用 Helm 部署 Airflow
+
+### 4.1 加入 Airflow Helm Repo
+
+```bash
+helm repo add apache-airflow https://airflow.apache.org
+helm repo update
+```
+
+### 4.2 準備 Values 檔案
+
+建立 `values.yml`,內容如下(請務必檢查資料庫與 Broker 連線資訊):
+
+```yaml
+fullnameOverride: "airflow"
+
+useStandardNaming: true
+
+images:
+ airflow:
+ repository: 10.10.0.85:50000/airflow-custom
+ tag: "1.0"
+ pullPolicy: Always
+
+executor: "CeleryExecutor"
+
+postgresql:
+ enabled: false
+redis:
+ enabled: false
+
+data:
+ metadataConnection:
+ user: "airflow"
+ pass: "airflow"
+ protocol: postgresql
+ host: "10.10.0.83"
+ port: 5000
+ db: "airflow_db"
+ sslmode: disable
+ brokerUrl: "amqp://airflow:airflow@airflow-rabbitmq-cluster:5672/"
+ resultBackendConnection:
+ protocol: postgresql
+ host: "10.10.0.83"
+ port: 5000
+ db: "airflow_db"
+ user: "airflow"
+ pass: "airflow"
+ sslmode: disable
+
+migrateDatabaseJob:
+ nodeSelector:
+ role: worker
+
+webserverSecretKey: "this-must-be-a-long-random-string-fixed-for-ha"
+fernetKey: "rv638BORYwOheHEXB6JoROvDgR3r9vdrOHnYcQfl0gs="
+
+dags:
+ persistence:
+ enabled: true
+ existingClaim: airflow-dags-pvc
+logs:
+ persistence:
+ enabled: true
+ existingClaim: airflow-logs-pvc
+
+# ✅ 保留 apiServer 配置(你的環境需要它)
+apiServer:
+ replicas: 3
+ service:
+ type: NodePort
+ ports:
+ - name: airflow-ui
+ port: 8080
+ nodePort: 30080
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+
+scheduler:
+ replicas: 1
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+
+workers:
+ podManagementPolicy: Parallel
+ replicas: 4
+ nodeSelector:
+ role: worker
+ resources:
+ requests:
+ cpu: 1
+ memory: 1Gi
+ limits:
+ cpu: 2
+ memory: 2Gi
+ persistence:
+ enabled: true
+ size: 5Gi
+ storageClassName: "nfs-airflow"
+ env:
+ - name: TZ
+ value: "Asia/Taipei"
+ securityContexts:
+ container:
+ capabilities:
+ add:
+ - NET_RAW
+
+flower:
+ enabled: true
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+ service:
+ type: NodePort
+
+dagProcessor:
+ nodeSelector:
+ role: worker
+
+triggerer:
+ nodeSelector:
+ role: worker
+ persistence:
+ enabled: false
+
+config:
+ core:
+ max_map_length: 100000
+ webserver:
+ base_url: "http://10.10.0.83:8080"
+ enable_proxy_fix: "True"
+ cookie_secure: 'False'
+ cookie_samesite: 'Lax'
+ session_backend: 'database'
+
+ celery:
+ worker_concurrency: 4
+ task_acks_late: "True"
+ worker_prefetch_multiplier: 1
+
+```
+
+### 4.3 部署
+
+使用上述 `values.yml` 進行部署。
+
+```bash
+helm upgrade --install airflow apache-airflow/airflow \
+ --namespace airflow \
+ --version 1.18.0 \
+ -f values_celery.yml \
+ --set images.airflow.repository=10.10.0.85:50000/airflow-custom \
+ --set images.airflow.tag=1.0 \
+ --set images.airflow.pullPolicy=Always \
+ --debug
+```
+> **注意**: `--version` 請根據 Airflow 版本對應表選擇合適的 Chart 版本。
+
+---
+
+## 5. 驗證部署
+
+### 5.1 檢查 Pod 狀態
+
+```bash
+kubectl get pods -n airflow -o wide -w
+```
+確認所有 Pod (Webserver, Scheduler, Worker, Redis/RabbitMQ) 都處於 `Running` 狀態。
+
+### 5.2 存取 Web UI
+
+Airflow Webserver 使用瀏覽器直接存取:
+
+* **URL**: `http://10.10.0.83:8080`
+* **帳號/密碼**: 預設為 `admin` / `admin`
+
+### 5.3 驗證 Airflow 運作
+
+1. 登入 Web UI。
+2. 確認首頁正常顯示,且無錯誤訊息。
+3. 確認 Cluster Activity 或 DAGs 列表正常載入。
+
diff --git a/infra/airflow/Dockerfile b/infra/airflow/Dockerfile
new file mode 100644
index 0000000..ccde080
--- /dev/null
+++ b/infra/airflow/Dockerfile
@@ -0,0 +1,17 @@
+FROM apache/airflow:3.0.2
+
+USER airflow
+RUN pip install --no-cache-dir ping3==4.0.8
+
+USER root
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends fping iputils-ping libcap2-bin \
+ && setcap cap_net_raw+ep /usr/bin/fping \
+ && setcap cap_net_raw+ep /usr/bin/ping \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/*
+
+RUN PYTHON_BIN=$(which python3) && \
+ setcap cap_net_raw+ep $PYTHON_BIN
+
+USER airflow
diff --git a/infra/airflow/airflow-dags-storage.yml b/infra/airflow/airflow-dags-storage.yml
new file mode 100644
index 0000000..2de462c
--- /dev/null
+++ b/infra/airflow/airflow-dags-storage.yml
@@ -0,0 +1,29 @@
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+ name: airflow-dags-pv
+spec:
+ capacity:
+ storage: 10Gi
+ volumeMode: Filesystem
+ accessModes:
+ - ReadWriteMany
+ persistentVolumeReclaimPolicy: Retain
+ storageClassName: nfs-airflow
+ nfs:
+ path: /srv/nfs/airflow/dags
+ server: 10.10.0.85
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: airflow-dags-pvc
+ namespace: airflow
+spec:
+ accessModes:
+ - ReadWriteMany
+ storageClassName: nfs-airflow
+ volumeName: airflow-dags-pv
+ resources:
+ requests:
+ storage: 10Gi
diff --git a/infra/airflow/airflow-logs-storage.yml b/infra/airflow/airflow-logs-storage.yml
new file mode 100644
index 0000000..5c79b93
--- /dev/null
+++ b/infra/airflow/airflow-logs-storage.yml
@@ -0,0 +1,29 @@
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+ name: airflow-logs-pv
+spec:
+ capacity:
+ storage: 20Gi
+ volumeMode: Filesystem
+ accessModes:
+ - ReadWriteMany
+ persistentVolumeReclaimPolicy: Retain
+ storageClassName: nfs-airflow
+ nfs:
+ path: /srv/nfs/airflow/logs
+ server: 10.10.0.85
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: airflow-logs-pvc
+ namespace: airflow
+spec:
+ accessModes:
+ - ReadWriteMany
+ storageClassName: nfs-airflow
+ volumeName: airflow-logs-pv
+ resources:
+ requests:
+ storage: 20Gi
diff --git a/infra/airflow/dags/06_ping_to_doris_standard_ping.py b/infra/airflow/dags/06_ping_to_doris_standard_ping.py
new file mode 100644
index 0000000..2025307
--- /dev/null
+++ b/infra/airflow/dags/06_ping_to_doris_standard_ping.py
@@ -0,0 +1,168 @@
+from __future__ import annotations
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.mysql.hooks.mysql import MySqlHook
+from airflow.utils.trigger_rule import TriggerRule
+from datetime import datetime, timedelta
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import subprocess
+import re
+import time
+
+TOTAL_IPS = 100000
+BATCH_SIZE = 5000
+PING_TIMEOUT_SEC = 60
+DB_EXEC_STEP = 2000
+MAX_WORKERS = 1000 # 标准 ping 开销略大,调低一点并发以免过载
+
+PING_POOL = "ping_pool"
+PING_POOL_SLOTS_PER_TASK = 1
+
+default_args = {
+ "owner": "admin",
+ "retries": 1,
+ "retry_delay": timedelta(minutes=1),
+}
+
+# 提取 time=xx.x ms 中的数值
+LATENCY_RE = re.compile(r"time=(\d+\.?\d*)\s*ms")
+
+
+def _chunk_ranges(total: int, size: int) -> list[dict]:
+ return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)]
+
+
+def _gen_ips_by_range(start: int, end: int) -> list[str]:
+ ips = []
+ for i in range(start, end):
+ subnet = i // 255
+ host = (i % 255) + 1
+ ips.append(f"10.10.{subnet}.{host}")
+ return ips
+
+
+with DAG(
+ dag_id="06_ping_to_doris_standard_ping",
+ default_args=default_args,
+ start_date=datetime(2023, 1, 1),
+ catchup=False,
+ tags=["monitor", "doris", "ping", "parallel"],
+ max_active_runs=1,
+ max_active_tasks=20,
+) as dag:
+ @task
+ def make_batches() -> list[dict]:
+ batches = _chunk_ranges(TOTAL_IPS, BATCH_SIZE)
+ print(f"Generated {len(batches)} batches")
+ return batches
+
+
+ @task(
+ pool=PING_POOL,
+ pool_slots=PING_POOL_SLOTS_PER_TASK,
+ execution_timeout=timedelta(seconds=PING_TIMEOUT_SEC * 3),
+ )
+ def ping_and_load_batch(batch: dict) -> dict:
+ start, end = int(batch["start"]), int(batch["end"])
+ ip_batch = _gen_ips_by_range(start, end)
+
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ rows: list[tuple] = []
+ alive_cnt = 0
+ dead_cnt = 0
+
+ def ping_single_ip(ip: str) -> tuple:
+ """使用标准 ping 指令单个 IP"""
+ try:
+ # -c 1: 发送一个包
+ # -W 2: 等待 2 秒超时
+ cmd = ["/usr/bin/ping", "-c", "1", "-W", "2", ip]
+ proc = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ timeout=5,
+ check=False
+ )
+ output = proc.stdout
+
+ if proc.returncode == 0:
+ m = LATENCY_RE.search(output)
+ if m:
+ latency = float(m.group(1))
+ return (now, ip, 1, latency, 0)
+
+ return (now, ip, 0, -1, 100)
+
+ except Exception:
+ return (now, ip, 0, -1, 100)
+
+ start_time = time.time()
+
+ with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+ future_to_ip = {executor.submit(ping_single_ip, ip): ip for ip in ip_batch}
+
+ for future in as_completed(future_to_ip):
+ result = future.result()
+ rows.append(result)
+ if result[2] == 1:
+ alive_cnt += 1
+ else:
+ dead_cnt += 1
+
+ elapsed = time.time() - start_time
+ print(f"[Batch {start}-{end}] Pinged {len(rows)} IPs in {elapsed:.2f}s with {MAX_WORKERS} threads")
+
+ # 批量写入数据库
+ if rows:
+ try:
+ mysql_hook = MySqlHook(mysql_conn_id="doris_db")
+ conn = mysql_hook.get_conn()
+ cur = conn.cursor()
+
+ sql = """
+ INSERT INTO ping_results
+ (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
+ VALUES (%s, %s, %s, %s, %s) \
+ """
+
+ for i in range(0, len(rows), DB_EXEC_STEP):
+ cur.executemany(sql, rows[i: i + DB_EXEC_STEP])
+
+ conn.commit()
+ cur.close()
+ conn.close()
+
+ print(f"[Batch {start}-{end}] Written {len(rows)} records: {alive_cnt} alive, {dead_cnt} dead")
+ except Exception as e:
+ print(f"[DB Error] {e}")
+ raise
+
+ return {
+ "start": start,
+ "end": end,
+ "count": end - start,
+ "alive": alive_cnt,
+ "dead": dead_cnt,
+ "duration": elapsed,
+ }
+
+
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def summarize(stats: list[dict]) -> None:
+ total = sum(x.get("count", 0) for x in stats)
+ alive = sum(x.get("alive", 0) for x in stats)
+ dead = sum(x.get("dead", 0) for x in stats)
+ total_duration = sum(x.get("duration", 0) for x in stats)
+ avg_duration = total_duration / len(stats) if stats else 0
+
+ alive_pct = (alive * 100 // total) if total > 0 else 0
+ print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead}")
+ print(f"[SUMMARY] Batches: {len(stats)} | Avg duration: {avg_duration:.2f}s | Total: {total_duration:.2f}s")
+
+
+ batches = make_batches()
+ stats = ping_and_load_batch.expand(batch=batches)
+ summarize(stats)
diff --git a/infra/airflow/dags/07_ping_to_doris_python_ping.py b/infra/airflow/dags/07_ping_to_doris_python_ping.py
new file mode 100644
index 0000000..2dde65e
--- /dev/null
+++ b/infra/airflow/dags/07_ping_to_doris_python_ping.py
@@ -0,0 +1,158 @@
+from __future__ import annotations
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.mysql.hooks.mysql import MySqlHook
+from airflow.utils.trigger_rule import TriggerRule
+from datetime import datetime, timedelta
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import time
+
+# 注意: 此 DAG 需要在 Airflow Worker 环境安装 ping3 套件
+# pip install ping3
+
+TOTAL_IPS = 100000
+BATCH_SIZE = 5000
+PING_TIMEOUT_SEC = 60
+DB_EXEC_STEP = 2000
+MAX_WORKERS = 1000
+
+PING_POOL = "ping_pool"
+PING_POOL_SLOTS_PER_TASK = 1
+
+default_args = {
+ "owner": "admin",
+ "retries": 1,
+ "retry_delay": timedelta(minutes=1),
+}
+
+
+def _chunk_ranges(total: int, size: int) -> list[dict]:
+ return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)]
+
+
+def _gen_ips_by_range(start: int, end: int) -> list[str]:
+ ips = []
+ for i in range(start, end):
+ subnet = i // 255
+ host = (i % 255) + 1
+ ips.append(f"10.10.{subnet}.{host}")
+ return ips
+
+
+with DAG(
+ dag_id="07_ping_to_doris_python_ping",
+ default_args=default_args,
+ start_date=datetime(2023, 1, 1),
+ catchup=False,
+ tags=["monitor", "doris", "python", "parallel"],
+ max_active_runs=1,
+ max_active_tasks=20,
+) as dag:
+ @task
+ def make_batches() -> list[dict]:
+ batches = _chunk_ranges(TOTAL_IPS, BATCH_SIZE)
+ print(f"Generated {len(batches)} batches")
+ return batches
+
+
+ @task(
+ pool=PING_POOL,
+ pool_slots=PING_POOL_SLOTS_PER_TASK,
+ execution_timeout=timedelta(seconds=PING_TIMEOUT_SEC * 3),
+ )
+ def ping_and_load_batch(batch: dict) -> dict:
+ from ping3 import ping # 在任務內部導入,避免 Webserver 加載失敗
+ import logging
+
+ start, end = int(batch["start"]), int(batch["end"])
+ ip_batch = _gen_ips_by_range(start, end)
+
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ rows: list[tuple] = []
+ alive_cnt = 0
+ dead_cnt = 0
+
+ def ping_single_ip(ip: str) -> tuple:
+ """使用 ping3 套件單個 IP"""
+ try:
+ # ✅ ping3.ping() 返回秒數,失敗時返回 False
+ latency_sec = ping(ip, timeout=2)
+
+ if latency_sec: # ✅ 簡潔的成功檢查
+ latency_ms = latency_sec * 1000
+ return (now, ip, 1, latency_ms, 0)
+
+ return (now, ip, 0, -1, 100)
+
+ except Exception as e:
+ logging.warning(f"Ping failed for {ip}: {str(e)}")
+ return (now, ip, 0, -1, 100)
+
+ start_time = time.time()
+
+ with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+ future_to_ip = {executor.submit(ping_single_ip, ip): ip for ip in ip_batch}
+
+ for future in as_completed(future_to_ip):
+ result = future.result()
+ rows.append(result)
+ if result[2] == 1:
+ alive_cnt += 1
+ else:
+ dead_cnt += 1
+
+ elapsed = time.time() - start_time
+ print(f"[Batch {start}-{end}] Pinged {len(rows)} IPs in {elapsed:.2f}s - Alive: {alive_cnt}, Dead: {dead_cnt}")
+
+ # 批量写入数据库
+ if rows:
+ try:
+ mysql_hook = MySqlHook(mysql_conn_id="doris_db")
+ conn = mysql_hook.get_conn()
+ cur = conn.cursor()
+
+ sql = """
+ INSERT INTO ping_results
+ (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
+ VALUES (%s, %s, %s, %s, %s) \
+ """
+
+ for i in range(0, len(rows), DB_EXEC_STEP):
+ cur.executemany(sql, rows[i: i + DB_EXEC_STEP])
+
+ conn.commit()
+ cur.close()
+ conn.close()
+
+ print(f"[Batch {start}-{end}] Written {len(rows)} records: {alive_cnt} alive, {dead_cnt} dead")
+ except Exception as e:
+ print(f"[DB Error] {e}")
+ raise
+
+ return {
+ "start": start,
+ "end": end,
+ "count": end - start,
+ "alive": alive_cnt,
+ "dead": dead_cnt,
+ "duration": elapsed,
+ }
+
+
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def summarize(stats: list[dict]) -> None:
+ total = sum(x.get("count", 0) for x in stats)
+ alive = sum(x.get("alive", 0) for x in stats)
+ dead = sum(x.get("dead", 0) for x in stats)
+ total_duration = sum(x.get("duration", 0) for x in stats)
+ avg_duration = total_duration / len(stats) if stats else 0
+
+ alive_pct = (alive * 100 // total) if total > 0 else 0
+ print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead}")
+ print(f"[SUMMARY] Batches: {len(stats)} | Avg duration: {avg_duration:.2f}s | Total: {total_duration:.2f}s")
+
+
+ batches = make_batches()
+ stats = ping_and_load_batch.expand(batch=batches)
+ summarize(stats)
diff --git a/infra/airflow/dags/ping_to_doris.py b/infra/airflow/dags/ping_to_doris.py
new file mode 100644
index 0000000..645e805
--- /dev/null
+++ b/infra/airflow/dags/ping_to_doris.py
@@ -0,0 +1,236 @@
+import os
+import glob
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
+from airflow.providers.mysql.hooks.mysql import MySqlHook
+from kubernetes.client import models as k8s
+from datetime import datetime, timedelta
+
+# --- 設定參數 ---
+TOTAL_IPS = 1000 # 總目標:1000 台
+BATCH_SIZE = 250 # 每個 Pod 負責 250 台
+
+# 設定路徑
+# 1. NFS 上的實際路徑 (我們用來清空舊檔)
+NFS_REAL_PATH = "/srv/nfs/dags/ping_results"
+# 2. Airflow Worker 讀取路徑 (Airflow 預設把 DAGs 掛載在 /opt/airflow/dags)
+AIRFLOW_READ_PATH = "/opt/airflow/dags/ping_results"
+# 3. Pod 內部的寫入路徑 (我們等一下會掛載進去)
+POD_MOUNT_PATH = "/mnt/dags/ping_results"
+
+default_args = {
+ 'owner': 'admin',
+ 'retries': 1,
+ 'retry_delay': timedelta(minutes=1),
+}
+
+with DAG(
+ '03_ping_to_doris',
+ default_args=default_args,
+ description='Ping 1000台 -> 存CSV -> 匯入 Doris',
+ # schedule=timedelta(minutes=5),
+ start_date=datetime(2023, 1, 1),
+ catchup=False,
+ tags=['monitor', 'doris', 'production'],
+) as dag:
+ # 0. 準備工作:確保資料夾存在,並清空上一輪的 CSV
+ @task
+ def prepare_environment():
+ # 因為 Airflow Worker 本身也有掛載 DAGs 資料夾,我們直接操作
+ if not os.path.exists(AIRFLOW_READ_PATH):
+ os.makedirs(AIRFLOW_READ_PATH, exist_ok=True)
+
+ # 刪除舊的 .csv 檔案,避免重複匯入
+ # 注意:真實生產環境可能會將舊檔搬移到 backup 資料夾,這裡示範直接刪除
+ files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv")
+ for f in files:
+ try:
+ os.remove(f)
+ except OSError:
+ pass
+ print(f"環境準備完成,已清理 {len(files)} 個舊檔案")
+
+
+ # 1. 生成 IP
+ @task
+ def generate_target_ips():
+ ip_list = []
+ for i in range(TOTAL_IPS):
+ subnet = i // 255
+ host = (i % 255) + 1
+ ip_list.append(f"10.10.{subnet}.{host}")
+ return ip_list
+
+
+ # 2. 切分批次
+ @task
+ def chunk_ips(all_ips):
+ return [all_ips[i:i + BATCH_SIZE] for i in range(0, len(all_ips), BATCH_SIZE)]
+
+
+ # 3. 定義 K8s Volume (讓 Pod 可以寫入 NFS)
+ nfs_vol = k8s.V1Volume(
+ name="nfs-storage",
+ persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
+ claim_name="airflow-dags-pvc" # 使用存放 DAG 的那個 PVC
+ )
+ )
+ nfs_mount = k8s.V1VolumeMount(
+ name="nfs-storage",
+ mount_path="/mnt/dags" # 掛載到 Pod 裡的 /mnt/dags
+ )
+
+ # # 4. 執行 Ping 並寫檔
+ # ping_worker = KubernetesPodOperator.partial(
+ # task_id="fping_worker",
+ # name="fping-pod",
+ # namespace="airflow",
+ # image="alpine:3.18",
+ # # 掛載 NFS
+ # volumes=[nfs_vol],
+ # volume_mounts=[nfs_mount],
+ # # --- 核心邏輯 ---
+ # # 1. 安裝 fping
+ # # 2. 定義檔名: 使用隨機數避免多個 Pod 檔名衝突
+ # # 3. fping 執行
+ # # 4. awk 解析:
+ # # - 如果第3欄是數字 (例如 12.34) -> 存活 (1), 延遲=$3, 掉包=0
+ # # - 否則 -> 死亡 (0), 延遲=-1, 掉包=100
+ # # 5. 結果寫入 CSV 檔案
+ # # 6. || true 確保任務永遠綠燈
+ # cmds=["/bin/sh", "-c", f"""
+ # apk add --no-cache fping && \
+ # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
+ # echo "正在寫入: $filename" && \
+ # fping -c 1 -q -A $@ 2>&1 | \
+ # awk '{{
+ # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
+ # print $1 ",1," $3 ",0";
+ # else
+ # print $1 ",0,-1,100";
+ # }}' > $filename || true
+ # """],
+ # is_delete_operator_pod=True,
+ # in_cluster=True,
+ # )
+
+ # 4. 執行 Ping 並寫檔
+ ping_worker = KubernetesPodOperator.partial(
+ task_id="fping_worker",
+ name="fping-pod",
+ namespace="airflow",
+ image="alpine:3.18",
+ # 建議開啟 Host Network,這對監控最準 (如果您之前沒加,建議加上)
+ hostnetwork=True,
+ dnspolicy="ClusterFirstWithHostNet",
+
+ volumes=[nfs_vol],
+ volume_mounts=[nfs_mount],
+
+ # --- 修正後的指令 ---
+ # fping 參數調整:
+ # -c 1 : 發送 1 個封包
+ # -r 1 : 如果失敗,重試 1 次 (這是關鍵!避免 ARP 延遲導致誤判)
+ # -t 1000 : 超時等待 1000 毫秒 (預設 500 太短)
+ # cmds=["/bin/sh", "-c", f"""
+ # apk add --no-cache fping && \
+ # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
+ # echo "正在寫入: $filename" && \
+ # fping -c 1 -r 1 -t 1000 -q -A $@ 2>&1 | \
+ # awk '{{
+ # # fping 輸出邏輯很特殊,成功時輸出延遲,失敗時無輸出(因-q)或輸出統計
+ # # 我們用簡單邏輯:只要 $3 是數字,就是活著
+ # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
+ # print $1 ",1," $3 ",0";
+ # else
+ # print $1 ",0,-1,100";
+ # }}' > $filename || true
+ # """]
+ cmds = ["/bin/sh", "-c", f"""
+ apk add --no-cache fping && \
+ filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
+ echo "正在寫入: $filename" && \
+ fping -C 1 -q -A $@ 2>&1 | \
+ awk '{{
+ # 大寫 -C 1 的輸出格式很乾淨:
+ # 成功時:10.10.0.85 : 12.34 (第3欄是數字)
+ # 失敗時:10.10.0.89 : - (第3欄是減號)
+
+ if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
+ print $1 ",1," $3 ",0";
+ else
+ print $1 ",0,-1,100";
+ }}' > $filename || true
+ """],
+
+ is_delete_operator_pod=True,
+ in_cluster=True,
+ )
+
+
+ # 5. 讀取 CSV 並匯入 Doris
+ @task
+ def load_to_doris():
+ # 1. 搜尋所有 CSV
+ csv_files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv")
+ print(f"找到 {len(csv_files)} 個結果檔案,準備匯入...")
+
+ if not csv_files:
+ print("沒有檔案需要匯入")
+ return
+
+ all_values = []
+ current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+
+ # 2. 讀取所有檔案內容
+ for file_path in csv_files:
+ with open(file_path, 'r') as f:
+ for line in f:
+ # line 格式: 8.8.8.8,1,12.34,0
+ parts = line.strip().split(',')
+ if len(parts) == 4:
+ ip, alive, latency, loss = parts
+ # 組合 SQL Value
+ all_values.append(
+ f"('{current_time}', '{ip}', {alive}, {latency}, {loss})"
+ )
+
+ # 3. 執行 Batch Insert
+ if all_values:
+ mysql_hook = MySqlHook(mysql_conn_id='doris_db')
+ conn = mysql_hook.get_conn()
+ cursor = conn.cursor()
+
+ print(f"總共 {len(all_values)} 筆資料,開始寫入 DB...")
+
+ # 分批寫入 (避免 SQL 太長),一次 2000 筆
+ batch_size = 2000
+ for i in range(0, len(all_values), batch_size):
+ batch = all_values[i:i + batch_size]
+ sql = f"""
+ INSERT INTO ping_results
+ (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
+ VALUES {','.join(batch)}
+ """
+ cursor.execute(sql)
+ print(f"已寫入批次 {i} ~ {i + len(batch)}")
+
+ conn.commit()
+ cursor.close()
+ conn.close()
+ print("資料匯入完成!")
+ else:
+ print("CSV 檔案是空的,無資料匯入。")
+
+
+ # --- 流程串接 ---
+ # 先清理環境 -> 生成IP -> 切分 -> 平行 Ping -> 最後匯入 DB
+ prepare_env = prepare_environment()
+ ip_data = generate_target_ips()
+ ip_batches = chunk_ips(ip_data)
+
+ # Ping 任務需等待環境準備好
+ ping_task = ping_worker.expand(arguments=ip_batches)
+
+ prepare_env >> ip_data >> ip_batches >> ping_task >> load_to_doris()
\ No newline at end of file
diff --git a/infra/airflow/dags/ping_to_doris_celery.py b/infra/airflow/dags/ping_to_doris_celery.py
new file mode 100644
index 0000000..b801adc
--- /dev/null
+++ b/infra/airflow/dags/ping_to_doris_celery.py
@@ -0,0 +1,161 @@
+from __future__ import annotations
+
+from ftplib import print_line
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.mysql.hooks.mysql import MySqlHook
+from airflow.utils.trigger_rule import TriggerRule
+from datetime import datetime, timedelta
+import subprocess
+import re
+
+TOTAL_IPS = 100000
+BATCH_SIZE = 5000
+FPING_TIMEOUT_SEC = 60
+DB_EXEC_STEP = 2000
+
+PING_POOL = "ping_pool"
+PING_POOL_SLOTS_PER_TASK = 1
+
+default_args = {
+ "owner": "admin",
+ "retries": 1,
+ "retry_delay": timedelta(minutes=1),
+}
+
+# 修正: 匹配 "1.06 ms" 格式
+LATENCY_RE = re.compile(r"(\d+\.?\d*)\s*ms")
+
+
+def _chunk_ranges(total: int, size: int) -> list[dict]:
+ return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)]
+
+
+def _gen_ips_by_range(start: int, end: int) -> list[str]:
+ ips = []
+ for i in range(start, end):
+ subnet = i // 255
+ host = (i % 255) + 1
+ ips.append(f"10.10.{subnet}.{host}")
+ return ips
+
+
+with DAG(
+ dag_id="05_ping_to_doris_celery",
+ default_args=default_args,
+ start_date=datetime(2023, 1, 1),
+ catchup=False,
+ tags=["monitor", "doris", "celery", "latest"],
+ max_active_runs=1,
+ max_active_tasks=8,
+) as dag:
+ @task
+ def make_batches() -> list[dict]:
+ return _chunk_ranges(TOTAL_IPS, BATCH_SIZE)
+
+
+ @task(
+ pool=PING_POOL,
+ pool_slots=PING_POOL_SLOTS_PER_TASK,
+ execution_timeout=timedelta(seconds=FPING_TIMEOUT_SEC * 2),
+ )
+ def ping_and_load_batch(batch: dict) -> dict:
+ start, end = int(batch["start"]), int(batch["end"])
+ ip_batch = _gen_ips_by_range(start, end)
+
+ cmd = ["/opt/tools/bin/fping", "-C", "1", "-A"] + ip_batch
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ rows: list[tuple] = []
+ alive_cnt = 0
+ dead_cnt = 0
+
+ try:
+ proc = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ timeout=FPING_TIMEOUT_SEC,
+ )
+ output = proc.stdout.splitlines()
+
+ for line in output:
+ m = LATENCY_RE.search(line)
+ m = LATENCY_RE.search(line)
+ # 範例輸出:
+ # 活的: "10.10.0.33 : [0], 64 bytes, 1.06 ms (1.06 avg, 0% loss)"
+ # 死的: "10.10.0.1 : [0], timed out"
+ # 或: "10.10.0.2 : -"
+
+ # 提取 IP
+ parts = line.split(":")
+ if len(parts) < 2:
+ continue
+
+ ip = parts[0].strip()
+ rest = ":".join(parts[1:]) # 剩餘部分
+
+ # 檢查是否有延遲資訊
+ m = LATENCY_RE.search(rest)
+ if m and "timed out" not in rest:
+ # 活著且有延遲資訊
+ latency = float(m.group(1))
+ alive_cnt += 1
+ rows.append((now, ip, 1, latency, 0))
+ # print(f"✓ {ip}: {latency} ms")
+ else:
+ # 死掉或 timeout
+ dead_cnt += 1
+ rows.append((now, ip, 0, -1, 100))
+ # print(f"✗ {ip}: dead")
+
+ except Exception as e:
+ print(f"[ping_and_load_batch] exception={repr(e)} range=({start},{end}) size={len(ip_batch)}")
+ dead_cnt = len(ip_batch)
+ for ip in ip_batch:
+ rows.append((now, ip, 0, -1, 100))
+
+ # 寫入 Doris
+ if rows:
+ mysql_hook = MySqlHook(mysql_conn_id="doris_db")
+ conn = mysql_hook.get_conn()
+ cur = conn.cursor()
+
+ sql = """
+ INSERT INTO ping_results
+ (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
+ VALUES (%s, %s, %s, %s, %s) \
+ """
+
+ for i in range(0, len(rows), DB_EXEC_STEP):
+ cur.executemany(sql, rows[i: i + DB_EXEC_STEP])
+
+ conn.commit()
+ cur.close()
+ conn.close()
+
+ print(f"[Batch {start}-{end}] Written {len(rows)} records to Doris")
+
+ return {
+ "start": start,
+ "end": end,
+ "count": end - start,
+ "alive": alive_cnt,
+ "dead": dead_cnt,
+ }
+
+
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def summarize(stats: list[dict]) -> None:
+ total = sum(x.get("count", 0) for x in stats)
+ alive = sum(x.get("alive", 0) for x in stats)
+ dead = sum(x.get("dead", 0) for x in stats)
+ alive_pct = (alive * 100 // total) if total > 0 else 0
+ print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead} | Batches: {len(stats)}")
+
+
+ batches = make_batches()
+ stats = ping_and_load_batch.expand(batch=batches)
+ summarize(stats)
\ No newline at end of file
diff --git a/infra/airflow/dags/ping_to_doris_celery_parallel.py b/infra/airflow/dags/ping_to_doris_celery_parallel.py
new file mode 100644
index 0000000..311b258
--- /dev/null
+++ b/infra/airflow/dags/ping_to_doris_celery_parallel.py
@@ -0,0 +1,165 @@
+from __future__ import annotations
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.mysql.hooks.mysql import MySqlHook
+from airflow.utils.trigger_rule import TriggerRule
+from datetime import datetime, timedelta
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import subprocess
+import re
+
+TOTAL_IPS = 100000
+BATCH_SIZE = 5000
+FPING_TIMEOUT_SEC = 60
+DB_EXEC_STEP = 2000
+MAX_WORKERS = 5000 # ✅ 每个任务内部并发数
+
+PING_POOL = "ping_pool"
+PING_POOL_SLOTS_PER_TASK = 1
+
+default_args = {
+ "owner": "admin",
+ "retries": 1,
+ "retry_delay": timedelta(minutes=1),
+}
+
+LATENCY_RE = re.compile(r"(\d+\.?\d*)\s*ms")
+
+
+def _chunk_ranges(total: int, size: int) -> list[dict]:
+ return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)]
+
+
+def _gen_ips_by_range(start: int, end: int) -> list[str]:
+ ips = []
+ for i in range(start, end):
+ subnet = i // 255
+ host = (i % 255) + 1
+ ips.append(f"10.10.{subnet}.{host}")
+ return ips
+
+
+with DAG(
+ dag_id="05_ping_to_doris_celery_parallel",
+ default_args=default_args,
+ start_date=datetime(2023, 1, 1),
+ catchup=False,
+ tags=["monitor", "doris", "celery", "parallel"],
+ max_active_runs=1,
+ max_active_tasks=20,
+) as dag:
+ @task
+ def make_batches() -> list[dict]:
+ batches = _chunk_ranges(TOTAL_IPS, BATCH_SIZE)
+ print(f"Generated {len(batches)} batches")
+ return batches
+
+
+ @task(
+ pool=PING_POOL,
+ pool_slots=PING_POOL_SLOTS_PER_TASK,
+ execution_timeout=timedelta(seconds=FPING_TIMEOUT_SEC * 2),
+ )
+ def ping_and_load_batch(batch: dict) -> dict:
+ start, end = int(batch["start"]), int(batch["end"])
+ ip_batch = _gen_ips_by_range(start, end)
+
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ rows: list[tuple] = []
+ alive_cnt = 0
+ dead_cnt = 0
+
+ def ping_single_ip(ip: str) -> tuple:
+ """ping 单个 IP"""
+ try:
+ cmd = ["/usr/bin/fping", "-C", "1", "-A", ip]
+ proc = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ timeout=5,
+ check=False
+ )
+ output = proc.stdout
+
+ m = LATENCY_RE.search(output)
+ if m and "timed out" not in output:
+ latency = float(m.group(1))
+ return (now, ip, 1, latency, 0)
+ else:
+ return (now, ip, 0, -1, 100)
+
+ except Exception:
+ return (now, ip, 0, -1, 100)
+
+ # ✅ 并行 ping
+ import time
+ start_time = time.time()
+
+ with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+ future_to_ip = {executor.submit(ping_single_ip, ip): ip for ip in ip_batch}
+
+ for future in as_completed(future_to_ip):
+ result = future.result()
+ rows.append(result)
+ if result[2] == 1:
+ alive_cnt += 1
+ else:
+ dead_cnt += 1
+
+ elapsed = time.time() - start_time
+ print(f"[Batch {start}-{end}] Pinged {len(rows)} IPs in {elapsed:.2f}s with {MAX_WORKERS} threads")
+
+ # 批量写入数据库
+ if rows:
+ try:
+ mysql_hook = MySqlHook(mysql_conn_id="doris_db")
+ conn = mysql_hook.get_conn()
+ cur = conn.cursor()
+
+ sql = """
+ INSERT INTO ping_results
+ (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
+ VALUES (%s, %s, %s, %s, %s) \
+ """
+
+ for i in range(0, len(rows), DB_EXEC_STEP):
+ cur.executemany(sql, rows[i: i + DB_EXEC_STEP])
+
+ conn.commit()
+ cur.close()
+ conn.close()
+
+ print(f"[Batch {start}-{end}] Written {len(rows)} records: {alive_cnt} alive, {dead_cnt} dead")
+ except Exception as e:
+ print(f"[DB Error] {e}")
+ raise
+
+ return {
+ "start": start,
+ "end": end,
+ "count": end - start,
+ "alive": alive_cnt,
+ "dead": dead_cnt,
+ "duration": elapsed,
+ }
+
+
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def summarize(stats: list[dict]) -> None:
+ total = sum(x.get("count", 0) for x in stats)
+ alive = sum(x.get("alive", 0) for x in stats)
+ dead = sum(x.get("dead", 0) for x in stats)
+ total_duration = sum(x.get("duration", 0) for x in stats)
+ avg_duration = total_duration / len(stats) if stats else 0
+
+ alive_pct = (alive * 100 // total) if total > 0 else 0
+ print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead}")
+ print(f"[SUMMARY] Batches: {len(stats)} | Avg duration: {avg_duration:.2f}s | Total: {total_duration:.2f}s")
+
+
+ batches = make_batches()
+ stats = ping_and_load_batch.expand(batch=batches)
+ summarize(stats)
diff --git a/infra/airflow/nfs-airflow-storage-class.yml b/infra/airflow/nfs-airflow-storage-class.yml
new file mode 100644
index 0000000..6a7da0a
--- /dev/null
+++ b/infra/airflow/nfs-airflow-storage-class.yml
@@ -0,0 +1,10 @@
+apiVersion: storage.k8s.io/v1
+kind: StorageClass
+metadata:
+ name: nfs-airflow
+provisioner: nfs.csi.k8s.io
+parameters:
+ server: 10.10.0.85
+ share: /srv/nfs/airflow
+reclaimPolicy: Retain
+volumeBindingMode: Immediate
\ No newline at end of file
diff --git a/infra/airflow/nfs/06-nfs-install-guide.md b/infra/airflow/nfs/06-nfs-install-guide.md
new file mode 100644
index 0000000..d03a20b
--- /dev/null
+++ b/infra/airflow/nfs/06-nfs-install-guide.md
@@ -0,0 +1,91 @@
+# NFS install
+
+本文件說明如何在 Ubuntu 環境下部署 NFS以及如何做 HA
+
+## 1. NFS Server 安裝
+
+安裝 nfs-kernel-server
+```shell
+sudo apt update
+sudo apt install -y nfs-kernel-server
+
+systemctl status nfs-server
+```
+
+建立共享目錄並給權限
+```shell
+sudo mkdir -p /srv/nfs/dags
+sudo mkdir -p /srv/nfs/logs
+
+sudo chown -R nobody:nogroup /srv/nfs
+sudo chmod -R 0775 /srv/nfs
+
+sudo mkdir -p /srv/nfs/airflow
+sudo chown nobody:nogroup /srv/nfs/airflow
+sudo chmod 777 /srv/nfs/airflow
+```
+
+
+設定 exports
+```shell
+sudo vi /etc/exports
+```
+/etc/exports:
+```
+/srv/nfs/dags 10.10.0.0/16(rw,sync,no_subtree_check,no_root_squash)
+/srv/nfs/logs 10.10.0.0/16(rw,sync,no_subtree_check,no_root_squash)
+
+/srv/nfs/airflow 10.10.0.0/16(rw,sync,no_subtree_check,no_root_squash)
+```
+
+套用設定
+```shell
+sudo exportfs -ra
+```
+
+防火牆設定(如果有)
+- 2049/tcp(一定要)
+- bind(111)
+```shell
+sudo ufw allow from 10.10.0.0/16 to any port 2049
+sudo ufw allow 111
+```
+---
+
+
+## 2. NFS Client 安裝
+
+安裝 nfs-common (以便在 Worker 節點上進行除錯或掛載測試)
+```shell
+sudo apt install -y nfs-common
+```
+
+> **注意**: DAGs 與 Logs 的 PV/PVC 設定已整合至 **[Airflow 部署指南](../10-airflow-install-guide.md)** 中,此處不再重複。
+
+
+
+## 3. HA 建置
+
+NFS 本身不提供資料層 HA,
+最簡單方式是透過「VIP 作為存取入口」與「資料同步機制」實現可維運的高可用。
+
+- 切換server ip為先前設置的VIP
+- dags 添加同步機制
+ - Git
+ - 所有 NFS 都 git pull
+ - 切換前補一次 pull 即可
+ - 部署時 rsync
+ - 你發佈 DAG 時,順手同步另一台
+ - 沒有背景機制、最好理解
+ - 定期 rsync(10~60 秒)
+ - 半自動
+ - RPO = 同步週期
+
+- 以部署時rsync為範例:
+```shell
+for host in 10.10.0.85 10.10.0.87 10.10.0.89; do
+ rsync -az --delete /srv/nfs/dags/ ${host}:/srv/nfs/dags/
+done
+```
+
+- Airflow logs 屬於過程性資料, 在 NFS 切換後重新產生即可,不強制做資料同步。 若有長期保存需求,應導入集中式日誌系統(ELK / Loki)。
\ No newline at end of file
diff --git a/infra/airflow/rabbitmq/08-rabbitmq-install-guide.md b/infra/airflow/rabbitmq/08-rabbitmq-install-guide.md
new file mode 100644
index 0000000..4ada673
--- /dev/null
+++ b/infra/airflow/rabbitmq/08-rabbitmq-install-guide.md
@@ -0,0 +1,197 @@
+# Rabbitmq on k8s deployment
+
+本文件說明如何在 k8s 上部署 RabbitMQ Cluster(使用官方 RabbitMQ Cluster Kubernetes Operator)
+
+
+
+## 1. 安裝 RabbitMQ Cluster Operator
+
+使用官方 manifest:
+```shell
+kubectl apply -f "https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml"
+```
+執行後可以看到 Operator 的 Deployment 跑起來:
+```shell
+kubectl get all -n rabbitmq-system
+```
+會看到:
+```
+deployment.apps/rabbitmq-cluster-operator 1/1 Running
+```
+
+## 2. 配置動態存儲(StorageClass + PVC)
+
+[//]: # (使用 NFS Server 搭配 nfs-subdir-external-provisioner 來實作 動態 NFS StorageClass。)
+
+[//]: # ()
+[//]: # (編輯 /etc/exports)
+
+[//]: # (```shell)
+
+[//]: # ( sudo vi /etc/exports)
+
+[//]: # (```)
+
+[//]: # ()
+[//]: # (添加以下字段)
+
+[//]: # ()
+[//]: # (```)
+
+[//]: # (/srv/nfs/rabbitmq 10.10.0.0/16(rw,sync,no_subtree_check,no_root_squash))
+
+[//]: # (```)
+
+[//]: # ()
+[//]: # (套用設定)
+
+[//]: # ()
+[//]: # (```shell)
+
+[//]: # (sudo exportfs -ra)
+[//]: # (```)
+
+使用 Helm 安裝官方的 NFS 動態 Provisioner:
+
+```shell
+helm repo add nfs-subdir-external-provisioner https://kubernetes-sigs.github.io/nfs-subdir-external-provisioner/
+helm repo update
+```
+執行安裝:
+```shell
+helm install nfs-rabbitmq-airflow \
+ nfs-subdir-external-provisioner/nfs-subdir-external-provisioner \
+ --namespace airflow \
+ --set nfs.server=10.10.0.85 \
+ --set nfs.path=/srv/nfs/airflow/rabbitmq \
+ --set storageClass.name=nfs-airflow-rabbitmq \
+ --set storageClass.reclaimPolicy=Retain
+```
+說明:
+- nfs.server:NFS Server IP
+- nfs.path:NFS export 的根目錄
+- storageClass.name:建立的 StorageClass 名稱
+- reclaimPolicy=Delete:
+ - PVC 刪除時,對應的 PV 與 NFS 子目錄會一併刪除
+ - 適合 RabbitMQ 這類可重建的服務
+- reclaimPolicy=Retain:
+ - PVC 刪除時,對應的 PV 與 NFS 子目錄會保留
+ - 正式環境建議採用手動刪除
+
+
+
+驗證 StorageClass 是否建立成功
+
+```shell
+kubectl get storageclass
+```
+
+應該看到類似結果:
+```
+NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE
+nfs-airflow-rabbitmq cluster.local/nfs-rabbitmq-airflow-nfs-subdir-external-provisioner Retain Immediate true 13s
+```
+
+代表:
+- Kubernetes 已具備可用的 動態 NFS StorageClass
+- 之後只要 PVC 指定 storageClassName: nfs-airflow-rabbitmq → 就會自動建立對應的 PV
+
+
+## 3. 建立 RabbitMQ Cluster
+
+
+```shell
+sudo vi airflow/rabbitmq-cluster.yml
+```
+
+```yaml
+apiVersion: rabbitmq.com/v1beta1
+kind: RabbitmqCluster
+metadata:
+ name: rabbitmq-cluster
+ namespace: rabbitmq-system
+spec:
+ replicas: 3
+
+ persistence:
+ storageClassName: "nfs-airflow-rabbitmq"
+ storage: 10Gi
+
+ override:
+ statefulSet:
+ spec:
+ template:
+ spec:
+ # 排到 master/control-plane
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+ - key: "node-role.kubernetes.io/master"
+ operator: "Exists"
+ effect: "NoSchedule"
+ # 3個 Pod 分散在不同節點
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app.kubernetes.io/name
+ operator: In
+ values:
+ - rabbitmq-cluster
+ topologyKey: kubernetes.io/hostname
+
+ # CRD 需要 containers 一起出現
+ containers:
+ - name: rabbitmq
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: rabbitmq-mgmt-nodeport
+ namespace: rabbitmq-system
+spec:
+ type: NodePort
+ selector:
+ app.kubernetes.io/name: rabbitmq-cluster
+ app.kubernetes.io/component: rabbitmq
+ ports:
+ - name: management
+ port: 15672
+ targetPort: 15672
+ nodePort: 31672
+```
+
+```shell
+kubectl apply -f airflow/rabbitmq-cluster.yml
+```
+
+```shell
+kubectl get pods -n airflow
+```
+會看到:
+```
+my-rabbitmq-cluster-server-0 Running
+my-rabbitmq-cluster-server-1 Running
+my-rabbitmq-cluster-server-2 Running
+```
+
+
+## 4. 建立給 airflow 用的帳號
+```shell
+kubectl exec -n airflow airflow-rabbitmq-cluster-server-0 -c rabbitmq -- \
+ rabbitmqctl add_user airflow airflow
+
+kubectl exec -n airflow airflow-rabbitmq-cluster-server-0 -c rabbitmq -- \
+ rabbitmqctl set_user_tags airflow management
+
+kubectl exec -n airflow airflow-rabbitmq-cluster-server-0 -c rabbitmq -- \
+ rabbitmqctl set_permissions -p / airflow ".*" ".*" ".*"
+```
+
+
+
diff --git a/infra/airflow/rabbitmq/airflow-rabbitmq-cluster.yml b/infra/airflow/rabbitmq/airflow-rabbitmq-cluster.yml
new file mode 100644
index 0000000..66ea29e
--- /dev/null
+++ b/infra/airflow/rabbitmq/airflow-rabbitmq-cluster.yml
@@ -0,0 +1,49 @@
+apiVersion: rabbitmq.com/v1beta1
+kind: RabbitmqCluster
+metadata:
+ name: airflow-rabbitmq-cluster
+ namespace: airflow
+spec:
+ replicas: 3
+ persistence:
+ storageClassName: "nfs-airflow-rabbitmq"
+ storage: 10Gi
+ override:
+ statefulSet:
+ spec:
+ template:
+ spec:
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+ affinity:
+ podAntiAffinity:
+ requiredDuringSchedulingIgnoredDuringExecution:
+ - labelSelector:
+ matchExpressions:
+ - key: app.kubernetes.io/name
+ operator: In
+ values:
+ - rabbitmq-cluster
+ topologyKey: kubernetes.io/hostname
+ containers:
+ - name: rabbitmq
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: airflow-rabbitmq-mgmt
+ namespace: airflow
+spec:
+ type: NodePort
+ selector:
+ app.kubernetes.io/name: airflow-rabbitmq-cluster
+ app.kubernetes.io/component: rabbitmq
+ ports:
+ - name: management
+ port: 15672
+ targetPort: 15672
+ nodePort: 31672
\ No newline at end of file
diff --git a/infra/airflow/rabbitmq/airflow-rabbitmq-user.yml b/infra/airflow/rabbitmq/airflow-rabbitmq-user.yml
new file mode 100644
index 0000000..8c8d8fa
--- /dev/null
+++ b/infra/airflow/rabbitmq/airflow-rabbitmq-user.yml
@@ -0,0 +1,37 @@
+apiVersion: v1
+kind: Secret
+metadata:
+ name: airflow-rabbitmq-credentials
+ namespace: airflow
+type: Opaque
+stringData:
+ username: airflow
+ password: airflow
+---
+apiVersion: rabbitmq.com/v1beta1
+kind: User
+metadata:
+ name: airflow
+ namespace: airflow
+spec:
+ rabbitmqClusterReference:
+ name: airflow-rabbitmq-cluster
+ tags:
+ - management
+ credentials:
+ secretName: airflow-rabbitmq-credentials
+---
+apiVersion: rabbitmq.com/v1beta1
+kind: Permission
+metadata:
+ name: airflow-permission
+ namespace: airflow
+spec:
+ rabbitmqClusterReference:
+ name: airflow-rabbitmq-cluster
+ user: airflow
+ vhost: /
+ permissions:
+ configure: ".*"
+ write: ".*"
+ read: ".*"
\ No newline at end of file
diff --git a/infra/airflow/values.yml b/infra/airflow/values.yml
new file mode 100644
index 0000000..b6492f0
--- /dev/null
+++ b/infra/airflow/values.yml
@@ -0,0 +1,156 @@
+fullnameOverride: "airflow"
+
+useStandardNaming: true
+
+images:
+ airflow:
+ repository: 10.10.0.85:50000/airflow-custom
+ tag: "1.0"
+ pullPolicy: Always
+
+executor: "CeleryExecutor"
+
+postgresql:
+ enabled: false
+redis:
+ enabled: false
+
+data:
+ metadataConnection:
+ user: "airflow"
+ pass: "airflow"
+ protocol: postgresql
+ host: "10.10.0.83"
+ port: 5000
+ db: "airflow_db"
+ sslmode: disable
+ brokerUrl: "amqp://airflow:airflow@airflow-rabbitmq-cluster:5672/"
+ resultBackendConnection:
+ protocol: postgresql
+ host: "10.10.0.83"
+ port: 5000
+ db: "airflow_db"
+ user: "airflow"
+ pass: "airflow"
+ sslmode: disable
+
+migrateDatabaseJob:
+ nodeSelector:
+ role: worker
+
+webserverSecretKey: "this-must-be-a-long-random-string-fixed-for-ha"
+fernetKey: "rv638BORYwOheHEXB6JoROvDgR3r9vdrOHnYcQfl0gs="
+
+dags:
+ persistence:
+ enabled: true
+ existingClaim: airflow-dags-pvc
+logs:
+ persistence:
+ enabled: true
+ existingClaim: airflow-logs-pvc
+
+# ✅ 保留 apiServer 配置(你的環境需要它)
+apiServer:
+ replicas: 3
+ service:
+ type: NodePort
+ ports:
+ - name: airflow-ui
+ port: 8080
+ nodePort: 30080
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+
+scheduler:
+ replicas: 1
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+ securityContexts:
+ pod:
+ runAsUser: 0
+ runAsNonRoot: false
+ containers:
+ runAsUser: 0
+ runAsNonRoot: false
+ allowPrivilegeEscalation: true
+ capabilities:
+ add:
+ - NET_RAW
+
+workers:
+ podManagementPolicy: Parallel
+ replicas: 4
+ nodeSelector:
+ role: worker
+ resources:
+ requests:
+ cpu: 1
+ memory: 1Gi
+ limits:
+ cpu: 2
+ memory: 2Gi
+ persistence:
+ enabled: true
+ size: 5Gi
+ storageClassName: "nfs-airflow"
+ env:
+ - name: TZ
+ value: "Asia/Taipei"
+ securityContexts:
+ pod:
+ runAsUser: 0
+ runAsNonRoot: false
+ containers:
+ runAsUser: 0
+ runAsNonRoot: false
+ allowPrivilegeEscalation: true
+ capabilities:
+ add:
+ - NET_RAW
+
+flower:
+ enabled: true
+ nodeSelector:
+ node-role.kubernetes.io/control-plane: ""
+ tolerations:
+ - key: "node-role.kubernetes.io/control-plane"
+ operator: "Exists"
+ effect: "NoSchedule"
+ service:
+ type: NodePort
+
+dagProcessor:
+ nodeSelector:
+ role: worker
+
+triggerer:
+ nodeSelector:
+ role: worker
+ persistence:
+ enabled: false
+
+config:
+ core:
+ max_map_length: 100000
+ webserver:
+ base_url: "http://10.10.0.83:8080"
+ enable_proxy_fix: "True"
+ cookie_secure: 'False'
+ cookie_samesite: 'Lax'
+ session_backend: 'database'
+
+ celery:
+ worker_concurrency: 4
+ task_acks_late: "True"
+ worker_prefetch_multiplier: 1
+
+
diff --git a/infra/ha/keepalived&haproxy.md b/infra/keepalived_haproxy/04-keepalived&haproxy-install-guide.md
similarity index 82%
rename from infra/ha/keepalived&haproxy.md
rename to infra/keepalived_haproxy/04-keepalived&haproxy-install-guide.md
index 426d2d5..b36a5bd 100644
--- a/infra/ha/keepalived&haproxy.md
+++ b/infra/keepalived_haproxy/04-keepalived&haproxy-install-guide.md
@@ -179,6 +179,81 @@ backend backend_ro
server f01 10.10.0.85:5432 check port 8008
server f02 10.10.0.87:5432 check port 8008
server f03 10.10.0.89:5432 check port 8008
+
+ frontend airflow_web
+ bind *:8080
+ mode http
+ option httplog
+ default_backend airflow_web_nodes
+
+backend airflow_web_nodes
+ mode http
+ balance roundrobin
+ option httpchk GET /api/v2/monitor/health
+ http-check expect status 200
+
+ # 必須轉發這三個 header
+ http-request set-header Host %[req.hdr(host)]
+ http-request set-header X-Forwarded-For %[src]
+
+ # 永遠設定 proto
+ http-request set-header X-Forwarded-Proto https if { ssl_fc }
+ http-request set-header X-Forwarded-Proto http if !{ ssl_fc }
+ http-request set-header X-Forwarded-For %[src]
+ http-request set-header Host %[req.hdr(host)]
+
+ server k8s-master-1 10.10.0.85:30080 check
+ server k8s-master-2 10.10.0.87:30080 check
+ server k8s-master-3 10.10.0.89:30080 check
+
+frontend doris_mysql
+ bind *:9031
+ mode tcp
+ option tcplog
+ default_backend doris_mysql_backend
+
+backend doris_mysql_backend
+ mode tcp
+ option tcp-check
+ tcp-check connect port 9030
+
+ server fe1 10.10.0.85:9030 check
+ server fe2 10.10.0.87:9030 check
+ server fe3 10.10.0.89:9030 check
+
+frontend doris_fe_http
+ bind *:8031
+ mode http
+ default_backend doris_fe_http_backend
+
+backend doris_fe_http_backend
+ mode http
+ cookie FEID insert indirect nocache
+ option httpchk GET /api/bootstrap
+ http-check expect status 200
+
+ http-request set-header X-Forwarded-Proto http
+ http-request set-header X-Forwarded-For %[src]
+
+ server fe1 10.10.0.85:8030 check cookie fe1
+ server fe2 10.10.0.87:8030 check cookie fe2
+ server fe3 10.10.0.89:8030 check cookie fe3
+
+frontend fe_rabbitmq_mgmt
+ bind *:15672
+ mode http
+ default_backend be_rabbitmq_mgmt
+
+backend be_rabbitmq_mgmt
+ mode http
+ balance roundrobin
+ option httpchk GET /
+ http-check expect status 200
+
+ # 換成你的 master node IP
+ server master1 10.10.0.85:31672 check
+ server master2 10.10.0.87:31672 check
+ server master3 10.10.0.89:31672 check
```
### 1.3 啟動與檢查
@@ -326,3 +401,6 @@ ip a
sudo systemctl start haproxy
```
5. 確認 VIP 是否搶回 f01 (因為 f01 權重較高)。
+
+
+
diff --git a/infra/keepalived_haproxy/haproxy/haproxy.cfg b/infra/keepalived_haproxy/haproxy/haproxy.cfg
new file mode 100644
index 0000000..2dd92ef
--- /dev/null
+++ b/infra/keepalived_haproxy/haproxy/haproxy.cfg
@@ -0,0 +1,136 @@
+global
+ log /dev/log local0
+ log /dev/log local1 notice
+ chroot /var/lib/haproxy
+ stats socket /run/haproxy/admin.sock mode 660 level admin
+ stats timeout 30s
+ user haproxy
+ group haproxy
+ daemon
+
+ # Default SSL material locations
+ ca-base /etc/ssl/certs
+ crt-base /etc/ssl/private
+
+ # See: https://ssl-config.mozilla.org/#server=haproxy&server-version=2.0.3&config=intermediate
+ ssl-default-bind-ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384
+ ssl-default-bind-ciphersuites TLS_AES_128_GCM_SHA256:TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256
+ ssl-default-bind-options ssl-min-ver TLSv1.2 no-tls-tickets
+
+defaults
+ log global
+ mode http
+ option httplog
+ option dontlognull
+ timeout connect 5000
+ timeout client 50000
+ timeout server 50000
+ errorfile 400 /etc/haproxy/errors/400.http
+ errorfile 403 /etc/haproxy/errors/403.http
+ errorfile 408 /etc/haproxy/errors/408.http
+ errorfile 500 /etc/haproxy/errors/500.http
+ errorfile 502 /etc/haproxy/errors/502.http
+ errorfile 503 /etc/haproxy/errors/503.http
+ errorfile 504 /etc/haproxy/errors/504.http
+
+listen stats
+ bind *:8404 # 監控頁面 Port
+ stats enable
+ stats uri /stats # 網址路徑
+ stats refresh 10s # 刷新頻率
+ stats auth admin:password # 登入帳號:密碼 (請自行修改)
+
+frontend kubernetes-api
+ bind *:6444
+ mode tcp
+ option tcplog
+ default_backend k8s_masters
+
+backend k8s_masters
+ mode tcp
+ option tcp-check
+ balance roundrobin
+ # 若要更 aggressive 的健康檢查,可加:
+ # tcp-check connect port 6443
+ server master-A 10.10.0.85:6443 check fall 3 rise 2
+ server master-B 10.10.0.87:6443 check fall 3 rise 2
+ server master-C 10.10.0.89:6443 check fall 3 rise 2
+
+frontend postgres_rw
+ bind *:5000
+ mode tcp
+ option tcplog
+ default_backend backend_rw
+
+backend backend_rw
+ mode tcp
+ option httpchk GET /primary
+ http-check expect status 200
+ server f01 10.10.0.85:5432 check port 8008
+ server f02 10.10.0.87:5432 check port 8008
+ server f03 10.10.0.89:5432 check port 8008
+
+frontend postgres_ro
+ bind *:5001
+ mode tcp
+ option tcplog
+ default_backend backend_ro
+
+backend backend_ro
+ mode tcp
+ balance roundrobin
+ option httpchk GET /read-only
+ http-check expect status 200
+
+ server f01 10.10.0.85:5432 check port 8008
+ server f02 10.10.0.87:5432 check port 8008
+ server f03 10.10.0.89:5432 check port 8008
+
+frontend airflow_web
+ bind *:8080
+ mode http
+ option httplog
+ default_backend airflow_web_nodes
+
+backend airflow_web_nodes
+ mode http
+ balance roundrobin
+ option httpchk GET /api/v2/monitor/health
+ http-check expect status 200
+
+ # 永遠設定 proto
+ http-request set-header X-Forwarded-Proto https if { ssl_fc }
+ http-request set-header X-Forwarded-Proto http if !{ ssl_fc }
+ http-request set-header X-Forwarded-For %[src]
+ http-request set-header Host %[req.hdr(host)]
+
+ server k8s-master-1 10.10.0.85:30080 check
+ server k8s-master-2 10.10.0.87:30080 check
+ server k8s-master-3 10.10.0.89:30080 check
+
+frontend doris_mysql
+ bind *:9031
+ default_backend doris_mysql_backend
+
+backend doris_mysql_backend
+ balance roundrobin
+ option tcp-check
+ server fe1 10.10.0.85:9030 check
+ server fe2 10.10.0.87:9030 check
+ server fe3 10.10.0.89:9030 check
+
+frontend fe_rabbitmq_mgmt
+ bind *:15672
+ mode http
+ default_backend be_rabbitmq_mgmt
+
+backend be_rabbitmq_mgmt
+ mode http
+ balance roundrobin
+ option httpchk GET /
+ http-check expect status 200
+
+ # 換成你的 master node IP
+ server master1 10.10.0.85:31672 check
+ server master2 10.10.0.87:31672 check
+ server master3 10.10.0.89:31672 check
\ No newline at end of file
diff --git a/infra/keepalived_haproxy/keepalived/keepalived.conf.backup b/infra/keepalived_haproxy/keepalived/keepalived.conf.backup
new file mode 100644
index 0000000..2560404
--- /dev/null
+++ b/infra/keepalived_haproxy/keepalived/keepalived.conf.backup
@@ -0,0 +1,35 @@
+global_defs {
+ # 當上 Master 後,延遲 5 秒發送 GARP
+ garp_master_delay 5
+ # 之後每 1 秒發一次
+ garp_master_refresh 1
+ script_user gjadmin
+ enable_script_security
+}
+
+vrrp_script check_haproxy {
+ script "/usr/bin/pgrep haproxy"
+ interval 2
+ weight -20
+}
+
+vrrp_instance VI_1 {
+ state BACKUP # 角色:備機 (注意這裡)
+ interface enp1s0 # 確認網卡名稱是否正確
+ virtual_router_id 51 # 必須跟 Master 一樣
+ priority 90 # 權重:比 Master 低 (例如 90)
+ advert_int 1
+
+ authentication {
+ auth_type PASS
+ auth_pass 1111
+ }
+
+ virtual_ipaddress {
+ 10.10.0.83 # VIP
+ }
+
+ track_script {
+ check_haproxy
+ }
+}
\ No newline at end of file
diff --git a/infra/keepalived_haproxy/keepalived/keepalived.conf.master b/infra/keepalived_haproxy/keepalived/keepalived.conf.master
new file mode 100644
index 0000000..59eeec9
--- /dev/null
+++ b/infra/keepalived_haproxy/keepalived/keepalived.conf.master
@@ -0,0 +1,37 @@
+# 定義檢查腳本:檢查 HAProxy 是否活著
+global_defs {
+ # 當上 Master 後,延遲 5 秒發送 GARP
+ garp_master_delay 5
+ # 之後每 1 秒發一次
+ garp_master_refresh 1
+ script_user gjadmin
+ enable_script_security
+}
+
+vrrp_script check_haproxy {
+ script "/usr/bin/pgrep haproxy" # 檢查是否有 haproxy 進程
+ interval 2 # 每 2 秒檢查一次
+ weight -20 # 如果檢查失敗,權重扣 20
+}
+
+# 定義虛擬路由實體
+vrrp_instance VI_1 {
+ state MASTER # 角色:主機
+ interface enp1s0 # 網卡名稱 (請用 `ip a` 確認你的網卡是 eth0 還是 ens33 等)
+ virtual_router_id 51 # ID:兩台機器必須一致
+ priority 100 # 權重:數值高的當老大 (Master設100)
+ advert_int 1 # 心跳包頻率 (1秒)
+
+ authentication {
+ auth_type PASS
+ auth_pass 1111 # 密碼:兩台必須一致
+ }
+
+ virtual_ipaddress {
+ 10.10.0.83 # 這裡填寫 VIP
+ }
+
+ track_script {
+ check_haproxy # 綁定上面的檢查腳本
+ }
+}
\ No newline at end of file
diff --git a/infra/kubernetes/k8s-install-guide.md b/infra/kubernetes/05-k8s-install-guide.md
similarity index 100%
rename from infra/kubernetes/k8s-install-guide.md
rename to infra/kubernetes/05-k8s-install-guide.md
diff --git a/infra/kubernetes/install-k8s-master.sh b/infra/kubernetes/install-k8s-master.sh
new file mode 100644
index 0000000..b37b972
--- /dev/null
+++ b/infra/kubernetes/install-k8s-master.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+set -e
+
+# 1. 關閉 swap
+sudo swapoff -a
+sudo sed -i '/ swap / s/^/#/' /etc/fstab
+
+# 2. 啟用橋接網路所需 kernel module + ip forward
+cat < /dev/null 2>&1; do
+ echo " 尚未 ready,等待 5 秒..."
+ sleep 5
+done
+
+# 8. 安裝 Flannel CNI (Pod network)
+kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml
+
+echo "=== 安裝 containerd + kubeadm/kubelet/kubectl + 初始化 master + 安裝 Flannel 完成 ==="
\ No newline at end of file
diff --git a/infra/kubernetes/install-k8s-worker.sh b/infra/kubernetes/install-k8s-worker.sh
new file mode 100644
index 0000000..7c3b29c
--- /dev/null
+++ b/infra/kubernetes/install-k8s-worker.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+set -e
+
+echo "=== 開始安裝 Worker node 必要元件 ==="
+
+# 1. 關閉 swap
+sudo swapoff -a
+sudo sed -i '/ swap / s/^/#/' /etc/fstab
+
+# 2. 啟用橋接網路所需 kernel module + ip forward
+cat < ⚠️ **注意**: 此版本無認證保護,僅適用於內部網路測試環境
+> 生產環境請參考:[091-registry-install-guide-prod.md](091-registry-install-guide-prod.md)
+
+---
+
+## 環境說明
+
+* **部署節點**: `10.10.0.85`
+* **服務 Port**: `50000`
+* **數據目錄**: `/srv/registry`
+* **認證**: 無 (開放存取)
+
+---
+
+## 安裝步驟
+
+### 1. 建立數據目錄
+
+在 `10.10.0.85` 上執行:
+
+```bash
+sudo mkdir -p /srv/registry
+```
+
+### 2. 啟動 Registry 容器
+
+```bash
+sudo podman run -d \
+ --name registry \
+ --restart=always \
+ -p 50000:5000 \
+ -v /srv/registry:/var/lib/registry \
+ docker.io/library/registry:2
+```
+
+### 3. 驗證服務
+
+```bash
+# 檢查容器狀態
+sudo podman ps | grep registry
+
+# 測試 API
+curl http://10.10.0.85:50000/v2/
+# 應回傳: {}
+```
+
+---
+
+## Kubernetes 節點配置
+
+在所有 K8s 節點上 (`doris-f01` ~ `f03`, `doris-b01` ~ `b04`) 執行:
+
+### 方法 1: 修改 config.toml(推薦)
+
+```bash
+# 編輯 Containerd 配置檔
+sudo vi /etc/containerd/config.toml
+
+# 在檔案最後加入以下內容:
+[plugins."io.containerd.grpc.v1.cri".registry.mirrors."10.10.0.85:50000"]
+ endpoint = ["http://10.10.0.85:50000"]
+
+# 重啟 Containerd
+sudo systemctl restart containerd
+```
+
+### 方法 2: 使用 hosts.toml
+
+```bash
+sudo mkdir -p /etc/containerd/certs.d/10.10.0.85:50000
+sudo tee /etc/containerd/certs.d/10.10.0.85:50000/hosts.toml < 💡 **提示**: 方法 1 更簡單且已驗證可用
+
+---
+
+## 測試驗證
+
+### 推送測試映像
+
+```bash
+# 標記映像
+podman tag alpine:latest 10.10.0.85:50000/test-alpine:latest
+
+# 推送映像
+podman push 10.10.0.85:50000/test-alpine:latest --tls-verify=false
+
+# 驗證
+curl http://10.10.0.85:50000/v2/_catalog
+# 應回傳: {"repositories":["test-alpine"]}
+```
+
+### 從 K8s 節點拉取
+
+```bash
+# 在任一 K8s 節點上
+sudo crictl pull 10.10.0.85:50000/test-alpine:latest
+
+# 檢查映像列表
+sudo crictl images | grep 10.10.0.85
+```
+
+---
+
+## 常見問題
+
+### 無法拉取映像
+
+```bash
+# 檢查節點配置
+sudo cat /etc/containerd/certs.d/10.10.0.85:50000/hosts.toml
+
+# 重啟 Containerd
+sudo systemctl restart containerd
+```
+
+### 檢查 Registry 日誌
+
+```bash
+sudo podman logs registry | tail -20
+```
+
+---
+
+**生產環境部署**: 請參考 [091-registry-install-guide-prod.md](091-registry-install-guide-prod.md)
+
diff --git a/infra/registry/091-registry-install-guide-prod.md b/infra/registry/091-registry-install-guide-prod.md
new file mode 100644
index 0000000..163ec7c
--- /dev/null
+++ b/infra/registry/091-registry-install-guide-prod.md
@@ -0,0 +1,386 @@
+# Container Registry 部署指南(生產環境 - 帶認證)
+
+生產環境私有 Container Registry 完整部署指南,包含 Basic Authentication 認證保護。
+
+> ✅ **推薦用於生產環境**
+> 測試環境無認證版本請參考:[09-registry-install-guide.md](09-registry-install-guide.md)
+
+---
+
+## 環境說明
+
+* **部署節點**: `10.10.0.85`
+* **服務 Port**: `50000`
+* **數據目錄**: `/srv/registry`
+* **認證方式**: Basic Auth (htpasswd)
+* **預設帳號**: `admin/password` (請自行更改密碼)
+
+---
+
+## 安裝步驟
+
+### 1. 安裝必要工具
+
+```bash
+# 在 10.10.0.85 上執行
+sudo apt-get update
+sudo apt-get install -y apache2-utils
+```
+
+### 2. 建立目錄與認證檔
+
+```bash
+# 建立基礎目錄
+sudo mkdir -p /srv/registry/auth
+
+# 建立認證檔案(設定密碼)
+sudo htpasswd -Bc /srv/registry/auth/htpasswd admin
+# 輸入密碼(建議 16+ 字元)
+
+# 驗證檔案
+sudo cat /srv/registry/auth/htpasswd
+# 應看到: admin:$2y$05$xxxx...
+```
+
+### 3. 啟動 Registry 容器
+
+```bash
+sudo podman run -d \
+ --name registry \
+ --restart=always \
+ -p 50000:5000 \
+ -v /srv/registry:/var/lib/registry \
+ -v /srv/registry/auth:/auth \
+ -e "REGISTRY_AUTH=htpasswd" \
+ -e "REGISTRY_AUTH_HTPASSWD_REALM=Registry Realm" \
+ -e "REGISTRY_AUTH_HTPASSWD_PATH=/auth/htpasswd" \
+ docker.io/library/registry:2
+```
+
+### 4. 驗證服務
+
+```bash
+# 檢查容器狀態
+sudo podman ps | grep registry
+
+# 測試無認證存取(應失敗)
+curl http://10.10.0.85:50000/v2/
+# 應回傳: {"errors":[{"code":"UNAUTHORIZED",...}]}
+
+# 測試有認證存取(應成功)
+curl -u admin: http://10.10.0.85:50000/v2/_catalog
+# 應回傳: {"repositories":[]}
+```
+
+---
+
+## Kubernetes 節點配置
+
+在所有 K8s 節點上 (`doris-f01` ~ `f03`, `doris-b01` ~ `b04`) 執行:
+
+### 方法 1: 修改 config.toml(推薦)
+
+```bash
+# 編輯 Containerd 配置檔
+sudo vi /etc/containerd/config.toml
+
+# 在檔案最後加入以下內容:
+[plugins."io.containerd.grpc.v1.cri".registry.mirrors."10.10.0.85:50000"]
+ endpoint = ["http://10.10.0.85:50000"]
+
+# 若需要認證,還需在 configs 區塊加入:
+[plugins."io.containerd.grpc.v1.cri".registry.configs."10.10.0.85:50000".auth]
+ username = "admin"
+ password = ""
+
+# 重啟 Containerd
+sudo systemctl restart containerd
+```
+
+### 方法 2: 使用 hosts.toml
+
+```bash
+sudo mkdir -p /etc/containerd/certs.d/10.10.0.85:50000
+sudo tee /etc/containerd/certs.d/10.10.0.85:50000/hosts.toml < \
+ -n airflow
+
+# 驗證 Secret
+kubectl get secret airflow-registry-secret -n airflow
+```
+
+在 Pod 中使用:
+
+```yaml
+apiVersion: v1
+kind: Pod
+metadata:
+ name: test-pod
+spec:
+ containers:
+ - name: test
+ image: 10.10.0.85:50000/airflow-custom:1.0
+ imagePullSecrets:
+ - name: airflow-registry-secret
+```
+
+---
+
+## 測試驗證
+
+### 推送測試映像
+
+```bash
+# 登入 Registry
+podman login 10.10.0.85:50000 --tls-verify=false
+# Username: admin
+# Password:
+
+# 標記映像
+podman tag alpine:latest 10.10.0.85:50000/test-alpine:secure
+
+# 推送映像
+podman push 10.10.0.85:50000/test-alpine:secure --tls-verify=false
+
+# 驗證
+curl -u admin: http://10.10.0.85:50000/v2/_catalog
+# 應回傳: {"repositories":["test-alpine"]}
+```
+
+### 從 K8s 節點拉取
+
+```bash
+# 在任一 K8s 節點上
+sudo crictl pull 10.10.0.85:50000/test-alpine:secure
+
+# 檢查映像列表
+sudo crictl images | grep 10.10.0.85
+```
+
+---
+
+## 進階配置
+
+### 啟用 HTTPS/TLS
+
+```bash
+# 生成自簽證書
+sudo mkdir -p /srv/registry/certs
+sudo openssl req -newkey rsa:4096 -nodes -sha256 \
+ -keyout /srv/registry/certs/domain.key \
+ -x509 -days 365 \
+ -out /srv/registry/certs/domain.crt \
+ -subj "/CN=10.10.0.85"
+
+# 重新啟動 Registry 啟用 TLS
+sudo podman stop registry
+sudo podman rm registry
+
+sudo podman run -d \
+ --name registry \
+ --restart=always \
+ -p 50000:5000 \
+ -v /srv/registry:/var/lib/registry \
+ -v /srv/registry/auth:/auth \
+ -v /srv/registry/certs:/certs \
+ -e "REGISTRY_HTTP_TLS_CERTIFICATE=/certs/domain.crt" \
+ -e "REGISTRY_HTTP_TLS_KEY=/certs/domain.key" \
+ -e "REGISTRY_AUTH=htpasswd" \
+ -e "REGISTRY_AUTH_HTPASSWD_REALM=Registry Realm" \
+ -e "REGISTRY_AUTH_HTPASSWD_PATH=/auth/htpasswd" \
+ docker.io/library/registry:2
+```
+
+### 啟用垃圾回收
+
+```bash
+sudo podman stop registry
+sudo podman rm registry
+
+sudo podman run -d \
+ --name registry \
+ --restart=always \
+ -p 50000:5000 \
+ -v /srv/registry:/var/lib/registry \
+ -v /srv/registry/auth:/auth \
+ -e "REGISTRY_AUTH=htpasswd" \
+ -e "REGISTRY_AUTH_HTPASSWD_REALM=Registry Realm" \
+ -e "REGISTRY_AUTH_HTPASSWD_PATH=/auth/htpasswd" \
+ -e "REGISTRY_STORAGE_DELETE_ENABLED=true" \
+ docker.io/library/registry:2
+```
+
+---
+
+## 維護操作
+
+### 更換密碼
+
+```bash
+# 更新密碼
+sudo htpasswd -B /srv/registry/auth/htpasswd admin
+
+# 重啟 Registry
+sudo podman restart registry
+
+# 更新 K8s 節點配置
+# 在每個節點上更新 /etc/containerd/certs.d/10.10.0.85:50000/hosts.toml
+# 然後執行: sudo systemctl restart containerd
+```
+
+### 添加新使用者
+
+```bash
+# 添加新使用者
+sudo htpasswd -B /srv/registry/auth/htpasswd developer
+
+# 重啟 Registry
+sudo podman restart registry
+```
+
+### 備份與恢復
+
+```bash
+# 備份數據
+sudo tar -czf /backup/registry-$(date +%Y%m%d).tar.gz \
+ /srv/registry/docker \
+ /srv/registry/auth
+
+# 恢復
+sudo tar -xzf /backup/registry-20260130.tar.gz -C /
+sudo podman restart registry
+```
+
+### 垃圾回收
+
+```bash
+# 執行垃圾回收(需先啟用 STORAGE_DELETE_ENABLED)
+sudo podman exec registry bin/registry garbage-collect \
+ /etc/docker/registry/config.yml
+
+# 查看清理效果
+du -sh /srv/registry/docker/registry/v2/*
+```
+
+---
+
+## 常見問題
+
+### 認證失敗
+
+```bash
+# 檢查認證檔案
+sudo cat /srv/registry/auth/htpasswd
+
+# 測試認證
+curl -u admin: http://10.10.0.85:50000/v2/_catalog
+
+# 重新建立認證
+sudo htpasswd -Bc /srv/registry/auth/htpasswd admin
+sudo podman restart registry
+```
+
+### 節點無法拉取映像
+
+```bash
+# 檢查節點配置
+sudo cat /etc/containerd/certs.d/10.10.0.85:50000/hosts.toml
+
+# 確認密碼正確
+grep password /etc/containerd/certs.d/10.10.0.85:50000/hosts.toml
+
+# 重啟 Containerd
+sudo systemctl restart containerd
+
+# 手動測試
+sudo crictl pull 10.10.0.85:50000/test-alpine:latest
+```
+
+### 檢查容器日誌
+
+```bash
+# 查看 Registry 日誌
+sudo podman logs registry | tail -50
+
+# 查看認證相關日誌
+sudo podman logs registry | grep -i auth
+```
+
+---
+
+## 安全性建議
+
+1. **使用強密碼**: 建議 16+ 字元隨機密碼
+ ```bash
+ # 生成隨機密碼
+ openssl rand -base64 24
+ ```
+
+2. **定期更換密碼**: 每 90 天更換一次
+
+3. **啟用 HTTPS**: 生產環境務必使用 TLS
+
+4. **限制網路訪問**:
+ ```bash
+ # 設定防火牆
+ sudo ufw allow from 10.10.0.0/24 to any port 50000
+ sudo ufw deny 50000
+ ```
+
+5. **定期備份**: 自動化備份腳本
+
+6. **監控磁碟空間**: 設定告警
+
+7. **審計日誌**: 定期檢查存取日誌
+
+---
+
+## 升級與遷移
+
+### 從無認證版本升級
+
+```bash
+# 1. 停止舊容器
+sudo podman stop registry
+sudo podman rm registry
+
+# 2. 建立認證檔案
+sudo mkdir -p /srv/registry/auth
+sudo htpasswd -Bc /srv/registry/auth/htpasswd admin
+
+# 3. 啟動新容器(帶認證)
+sudo podman run -d \
+ --name registry \
+ --restart=always \
+ -p 50000:5000 \
+ -v /srv/registry:/var/lib/registry \
+ -v /srv/registry/auth:/auth \
+ -e "REGISTRY_AUTH=htpasswd" \
+ -e "REGISTRY_AUTH_HTPASSWD_REALM=Registry Realm" \
+ -e "REGISTRY_AUTH_HTPASSWD_PATH=/auth/htpasswd" \
+ docker.io/library/registry:2
+
+# 4. 更新所有 K8s 節點配置
+# 在每個節點上添加認證資訊到 hosts.toml
+```
\ No newline at end of file
diff --git a/infra/registry/harbor-install-guide.md b/infra/registry/harbor-install-guide.md
new file mode 100644
index 0000000..5c943a2
--- /dev/null
+++ b/infra/registry/harbor-install-guide.md
@@ -0,0 +1,210 @@
+# Harbor HA on Kubernetes Installation Guide
+
+本文件說明如何使用 Helm 在 Kubernetes 上部署高可用 (HA) 的 Harbor Registry,並整合現有的 PostgreSQL Cluster 與 NFS Storage。
+
+---
+
+## 1. 架構說明
+
+* **部署方式**: Helm Chart (`goharbor/harbor`)
+* **Database**: 外部 PostgreSQL Cluster (`10.10.0.83` VIP)
+* **Redis**: 內部 Redis Cluster (由 Helm 管理)
+* **Storage**: NFS (`nfs-airflow` StorageClass)
+* **Ingress**: NodePort + 外部 HAProxy (`10.10.0.83`)
+
+---
+
+## 2. 前置準備
+
+### 2.1 建立資料庫
+
+Harbor 需要多個資料庫。請在 PostgreSQL Primary 節點上執行:
+
+```bash
+# 連線至 DB
+psql -h 10.10.0.83 -p 5000 -U postgres
+```
+
+```sql
+-- 建立使用者
+CREATE USER harbor WITH PASSWORD 'harbor_password';
+
+-- 建立資料庫
+CREATE DATABASE registry OWNER harbor;
+CREATE DATABASE notary_server OWNER harbor;
+CREATE DATABASE notary_signer OWNER harbor;
+CREATE DATABASE trivy OWNER harbor;
+
+-- 授權 (若有需要)
+GRANT ALL PRIVILEGES ON DATABASE registry TO harbor;
+GRANT ALL PRIVILEGES ON DATABASE notary_server TO harbor;
+GRANT ALL PRIVILEGES ON DATABASE notary_signer TO harbor;
+GRANT ALL PRIVILEGES ON DATABASE trivy TO harbor;
+```
+
+### 2.2 安裝 Helm Chart Repo
+
+```bash
+helm repo add harbor https://helm.goharbor.io
+helm repo update
+```
+
+---
+
+## 3. 配置 Values.yaml
+
+建立 `values-harbor.yml`,配置高可用參數與外部連線。
+
+```bash
+vi values-harbor.yml
+```
+
+```yaml
+expose:
+ type: nodePort
+ tls:
+ enabled: true
+ autoRedirect: true
+ # 指定 NodePort,方便 HAProxy 轉發 (範圍需在 K8s NodePort range 內 30000-32767)
+ nodePort:
+ http: 30002
+ https: 30003
+
+externalURL: https://10.10.0.83:443 # HAProxy VIP
+
+persistence:
+ persistentVolumeClaim:
+ registry:
+ storageClass: "nfs-airflow" # 使用 Airflow 建立的 SC
+ size: 50Gi
+ accessMode: ReadWriteMany
+ jobservice:
+ storageClass: "nfs-airflow"
+ size: 1Gi
+ accessMode: ReadWriteMany
+ database:
+ storageClass: "nfs-airflow" # 若使用內建 DB 才需要
+ size: 1Gi
+ redis:
+ storageClass: "nfs-airflow"
+ size: 1Gi
+ trivy:
+ storageClass: "nfs-airflow"
+ size: 5Gi
+
+# 使用外部 PostgreSQL
+database:
+ type: external
+ external:
+ host: "10.10.0.83"
+ port: "5000"
+ username: "harbor"
+ password: "harbor_password"
+ coreDatabase: "registry"
+ # Notary 相關功能若啟用需配置以下 DB
+ # notaryServerDatabase: "notary_server"
+ # notarySignerDatabase: "notary_signer"
+
+# 使用內建 Redis (HA)
+redis:
+ type: internal
+ internal:
+ image:
+ repository: goharbor/redis-photon
+ tag: v2.5.0
+ nodeSelector: {}
+
+# 元件複本數 (HA)
+portal:
+ replicas: 2
+core:
+ replicas: 2
+jobservice:
+ replicas: 2
+registry:
+ replicas: 2
+
+# 關閉內建 DB/Redis 的持久化 (若希望完全無狀態)
+# 但 Redis 建議還是要持久化
+```
+
+---
+
+## 4. 部署 Harbor
+
+```bash
+# 建立 Namespace
+kubectl create namespace harbor
+
+# 安裝
+helm install harbor harbor/harbor \
+ --namespace harbor \
+ -f values-harbor.yml \
+ --version 1.12.0 # 建議指定穩定版本
+```
+
+檢查 Pod 狀態:
+```bash
+kubectl get pods -n harbor -w
+```
+等待所有 Pod 狀態為 `Running`。
+
+---
+
+## 5. 配置 HAProxy
+
+為了讓外部能透過 VIP 存取 Harbor,需在 **所有 HAProxy 節點** (`/etc/haproxy/haproxy.cfg`) 加入轉發規則。
+
+### 5.1 修改 `haproxy.cfg`
+
+新增以下 Listener:
+
+```haproxy
+# Harbor HTTP
+frontend harbor_http
+ bind *:80
+ mode tcp
+ default_backend harbor_http_back
+
+backend harbor_http_back
+ mode tcp
+ balance roundrobin
+ server node1 10.10.0.85:30002 check
+ server node2 10.10.0.87:30002 check
+ server node3 10.10.0.89:30002 check
+
+# Harbor HTTPS
+frontend harbor_https
+ bind *:443
+ mode tcp
+ default_backend harbor_https_back
+
+backend harbor_https_back
+ mode tcp
+ balance roundrobin
+ server node1 10.10.0.85:30003 check
+ server node2 10.10.0.87:30003 check
+ server node3 10.10.0.89:30003 check
+```
+
+### 5.2 重啟 HAProxy
+
+```bash
+sudo systemctl restart haproxy
+```
+
+---
+
+## 6. 驗證
+
+1. 開啟瀏覽器存取 `https://10.10.0.83`。
+2. 預設帳號: `admin`,預設密碼: `Harbor12345` (可於 values.yaml 修改)。
+3. 測試 Docker Login:
+ ```bash
+ docker login 10.10.0.83
+ ```
+4. 推送 Image 測試:
+ ```bash
+ docker tag nginx:alpine 10.10.0.83/library/nginx:hah
+ docker push 10.10.0.83/library/nginx:hah
+ ```