|
- import os
- import glob
- from airflow import DAG
- from airflow.decorators import task
- from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
- from airflow.providers.mysql.hooks.mysql import MySqlHook
- from kubernetes.client import models as k8s
- from datetime import datetime, timedelta
-
- # --- 設定參數 ---
- TOTAL_IPS = 1000 # 總目標:1000 台
- BATCH_SIZE = 250 # 每個 Pod 負責 250 台
-
- # 設定路徑
- # 1. NFS 上的實際路徑 (我們用來清空舊檔)
- NFS_REAL_PATH = "/srv/nfs/dags/ping_results"
- # 2. Airflow Worker 讀取路徑 (Airflow 預設把 DAGs 掛載在 /opt/airflow/dags)
- AIRFLOW_READ_PATH = "/opt/airflow/dags/ping_results"
- # 3. Pod 內部的寫入路徑 (我們等一下會掛載進去)
- POD_MOUNT_PATH = "/mnt/dags/ping_results"
-
- default_args = {
- 'owner': 'admin',
- 'retries': 1,
- 'retry_delay': timedelta(minutes=1),
- }
-
- with DAG(
- '03_ping_to_doris',
- default_args=default_args,
- description='Ping 1000台 -> 存CSV -> 匯入 Doris',
- # schedule=timedelta(minutes=5),
- start_date=datetime(2023, 1, 1),
- catchup=False,
- tags=['monitor', 'doris', 'production'],
- ) as dag:
- # 0. 準備工作:確保資料夾存在,並清空上一輪的 CSV
- @task
- def prepare_environment():
- # 因為 Airflow Worker 本身也有掛載 DAGs 資料夾,我們直接操作
- if not os.path.exists(AIRFLOW_READ_PATH):
- os.makedirs(AIRFLOW_READ_PATH, exist_ok=True)
-
- # 刪除舊的 .csv 檔案,避免重複匯入
- # 注意:真實生產環境可能會將舊檔搬移到 backup 資料夾,這裡示範直接刪除
- files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv")
- for f in files:
- try:
- os.remove(f)
- except OSError:
- pass
- print(f"環境準備完成,已清理 {len(files)} 個舊檔案")
-
-
- # 1. 生成 IP
- @task
- def generate_target_ips():
- ip_list = []
- for i in range(TOTAL_IPS):
- subnet = i // 255
- host = (i % 255) + 1
- ip_list.append(f"10.10.{subnet}.{host}")
- return ip_list
-
-
- # 2. 切分批次
- @task
- def chunk_ips(all_ips):
- return [all_ips[i:i + BATCH_SIZE] for i in range(0, len(all_ips), BATCH_SIZE)]
-
-
- # 3. 定義 K8s Volume (讓 Pod 可以寫入 NFS)
- nfs_vol = k8s.V1Volume(
- name="nfs-storage",
- persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
- claim_name="airflow-dags-pvc" # 使用存放 DAG 的那個 PVC
- )
- )
- nfs_mount = k8s.V1VolumeMount(
- name="nfs-storage",
- mount_path="/mnt/dags" # 掛載到 Pod 裡的 /mnt/dags
- )
-
- # # 4. 執行 Ping 並寫檔
- # ping_worker = KubernetesPodOperator.partial(
- # task_id="fping_worker",
- # name="fping-pod",
- # namespace="airflow",
- # image="alpine:3.18",
- # # 掛載 NFS
- # volumes=[nfs_vol],
- # volume_mounts=[nfs_mount],
- # # --- 核心邏輯 ---
- # # 1. 安裝 fping
- # # 2. 定義檔名: 使用隨機數避免多個 Pod 檔名衝突
- # # 3. fping 執行
- # # 4. awk 解析:
- # # - 如果第3欄是數字 (例如 12.34) -> 存活 (1), 延遲=$3, 掉包=0
- # # - 否則 -> 死亡 (0), 延遲=-1, 掉包=100
- # # 5. 結果寫入 CSV 檔案
- # # 6. || true 確保任務永遠綠燈
- # cmds=["/bin/sh", "-c", f"""
- # apk add --no-cache fping && \
- # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
- # echo "正在寫入: $filename" && \
- # fping -c 1 -q -A $@ 2>&1 | \
- # awk '{{
- # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
- # print $1 ",1," $3 ",0";
- # else
- # print $1 ",0,-1,100";
- # }}' > $filename || true
- # """],
- # is_delete_operator_pod=True,
- # in_cluster=True,
- # )
-
- # 4. 執行 Ping 並寫檔
- ping_worker = KubernetesPodOperator.partial(
- task_id="fping_worker",
- name="fping-pod",
- namespace="airflow",
- image="alpine:3.18",
- # 建議開啟 Host Network,這對監控最準 (如果您之前沒加,建議加上)
- hostnetwork=True,
- dnspolicy="ClusterFirstWithHostNet",
-
- volumes=[nfs_vol],
- volume_mounts=[nfs_mount],
-
- # --- 修正後的指令 ---
- # fping 參數調整:
- # -c 1 : 發送 1 個封包
- # -r 1 : 如果失敗,重試 1 次 (這是關鍵!避免 ARP 延遲導致誤判)
- # -t 1000 : 超時等待 1000 毫秒 (預設 500 太短)
- # cmds=["/bin/sh", "-c", f"""
- # apk add --no-cache fping && \
- # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
- # echo "正在寫入: $filename" && \
- # fping -c 1 -r 1 -t 1000 -q -A $@ 2>&1 | \
- # awk '{{
- # # fping 輸出邏輯很特殊,成功時輸出延遲,失敗時無輸出(因-q)或輸出統計
- # # 我們用簡單邏輯:只要 $3 是數字,就是活著
- # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
- # print $1 ",1," $3 ",0";
- # else
- # print $1 ",0,-1,100";
- # }}' > $filename || true
- # """]
- cmds = ["/bin/sh", "-c", f"""
- apk add --no-cache fping && \
- filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
- echo "正在寫入: $filename" && \
- fping -C 1 -q -A $@ 2>&1 | \
- awk '{{
- # 大寫 -C 1 的輸出格式很乾淨:
- # 成功時:10.10.0.85 : 12.34 (第3欄是數字)
- # 失敗時:10.10.0.89 : - (第3欄是減號)
-
- if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
- print $1 ",1," $3 ",0";
- else
- print $1 ",0,-1,100";
- }}' > $filename || true
- """],
-
- is_delete_operator_pod=True,
- in_cluster=True,
- )
-
-
- # 5. 讀取 CSV 並匯入 Doris
- @task
- def load_to_doris():
- # 1. 搜尋所有 CSV
- csv_files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv")
- print(f"找到 {len(csv_files)} 個結果檔案,準備匯入...")
-
- if not csv_files:
- print("沒有檔案需要匯入")
- return
-
- all_values = []
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 2. 讀取所有檔案內容
- for file_path in csv_files:
- with open(file_path, 'r') as f:
- for line in f:
- # line 格式: 8.8.8.8,1,12.34,0
- parts = line.strip().split(',')
- if len(parts) == 4:
- ip, alive, latency, loss = parts
- # 組合 SQL Value
- all_values.append(
- f"('{current_time}', '{ip}', {alive}, {latency}, {loss})"
- )
-
- # 3. 執行 Batch Insert
- if all_values:
- mysql_hook = MySqlHook(mysql_conn_id='doris_db')
- conn = mysql_hook.get_conn()
- cursor = conn.cursor()
-
- print(f"總共 {len(all_values)} 筆資料,開始寫入 DB...")
-
- # 分批寫入 (避免 SQL 太長),一次 2000 筆
- batch_size = 2000
- for i in range(0, len(all_values), batch_size):
- batch = all_values[i:i + batch_size]
- sql = f"""
- INSERT INTO ping_results
- (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
- VALUES {','.join(batch)}
- """
- cursor.execute(sql)
- print(f"已寫入批次 {i} ~ {i + len(batch)}")
-
- conn.commit()
- cursor.close()
- conn.close()
- print("資料匯入完成!")
- else:
- print("CSV 檔案是空的,無資料匯入。")
-
-
- # --- 流程串接 ---
- # 先清理環境 -> 生成IP -> 切分 -> 平行 Ping -> 最後匯入 DB
- prepare_env = prepare_environment()
- ip_data = generate_target_ips()
- ip_batches = chunk_ips(ip_data)
-
- # Ping 任務需等待環境準備好
- ping_task = ping_worker.expand(arguments=ip_batches)
-
- prepare_env >> ip_data >> ip_batches >> ping_task >> load_to_doris()
|