import os import glob from airflow import DAG from airflow.decorators import task from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.mysql.hooks.mysql import MySqlHook from kubernetes.client import models as k8s from datetime import datetime, timedelta # --- 設定參數 --- TOTAL_IPS = 1000 # 總目標:1000 台 BATCH_SIZE = 250 # 每個 Pod 負責 250 台 # 設定路徑 # 1. NFS 上的實際路徑 (我們用來清空舊檔) NFS_REAL_PATH = "/srv/nfs/dags/ping_results" # 2. Airflow Worker 讀取路徑 (Airflow 預設把 DAGs 掛載在 /opt/airflow/dags) AIRFLOW_READ_PATH = "/opt/airflow/dags/ping_results" # 3. Pod 內部的寫入路徑 (我們等一下會掛載進去) POD_MOUNT_PATH = "/mnt/dags/ping_results" default_args = { 'owner': 'admin', 'retries': 1, 'retry_delay': timedelta(minutes=1), } with DAG( '03_ping_to_doris', default_args=default_args, description='Ping 1000台 -> 存CSV -> 匯入 Doris', # schedule=timedelta(minutes=5), start_date=datetime(2023, 1, 1), catchup=False, tags=['monitor', 'doris', 'production'], ) as dag: # 0. 準備工作:確保資料夾存在,並清空上一輪的 CSV @task def prepare_environment(): # 因為 Airflow Worker 本身也有掛載 DAGs 資料夾,我們直接操作 if not os.path.exists(AIRFLOW_READ_PATH): os.makedirs(AIRFLOW_READ_PATH, exist_ok=True) # 刪除舊的 .csv 檔案,避免重複匯入 # 注意:真實生產環境可能會將舊檔搬移到 backup 資料夾,這裡示範直接刪除 files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv") for f in files: try: os.remove(f) except OSError: pass print(f"環境準備完成,已清理 {len(files)} 個舊檔案") # 1. 生成 IP @task def generate_target_ips(): ip_list = [] for i in range(TOTAL_IPS): subnet = i // 255 host = (i % 255) + 1 ip_list.append(f"10.10.{subnet}.{host}") return ip_list # 2. 切分批次 @task def chunk_ips(all_ips): return [all_ips[i:i + BATCH_SIZE] for i in range(0, len(all_ips), BATCH_SIZE)] # 3. 定義 K8s Volume (讓 Pod 可以寫入 NFS) nfs_vol = k8s.V1Volume( name="nfs-storage", persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( claim_name="airflow-dags-pvc" # 使用存放 DAG 的那個 PVC ) ) nfs_mount = k8s.V1VolumeMount( name="nfs-storage", mount_path="/mnt/dags" # 掛載到 Pod 裡的 /mnt/dags ) # # 4. 執行 Ping 並寫檔 # ping_worker = KubernetesPodOperator.partial( # task_id="fping_worker", # name="fping-pod", # namespace="airflow", # image="alpine:3.18", # # 掛載 NFS # volumes=[nfs_vol], # volume_mounts=[nfs_mount], # # --- 核心邏輯 --- # # 1. 安裝 fping # # 2. 定義檔名: 使用隨機數避免多個 Pod 檔名衝突 # # 3. fping 執行 # # 4. awk 解析: # # - 如果第3欄是數字 (例如 12.34) -> 存活 (1), 延遲=$3, 掉包=0 # # - 否則 -> 死亡 (0), 延遲=-1, 掉包=100 # # 5. 結果寫入 CSV 檔案 # # 6. || true 確保任務永遠綠燈 # cmds=["/bin/sh", "-c", f""" # apk add --no-cache fping && \ # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \ # echo "正在寫入: $filename" && \ # fping -c 1 -q -A $@ 2>&1 | \ # awk '{{ # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/) # print $1 ",1," $3 ",0"; # else # print $1 ",0,-1,100"; # }}' > $filename || true # """], # is_delete_operator_pod=True, # in_cluster=True, # ) # 4. 執行 Ping 並寫檔 ping_worker = KubernetesPodOperator.partial( task_id="fping_worker", name="fping-pod", namespace="airflow", image="alpine:3.18", # 建議開啟 Host Network,這對監控最準 (如果您之前沒加,建議加上) hostnetwork=True, dnspolicy="ClusterFirstWithHostNet", volumes=[nfs_vol], volume_mounts=[nfs_mount], # --- 修正後的指令 --- # fping 參數調整: # -c 1 : 發送 1 個封包 # -r 1 : 如果失敗,重試 1 次 (這是關鍵!避免 ARP 延遲導致誤判) # -t 1000 : 超時等待 1000 毫秒 (預設 500 太短) # cmds=["/bin/sh", "-c", f""" # apk add --no-cache fping && \ # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \ # echo "正在寫入: $filename" && \ # fping -c 1 -r 1 -t 1000 -q -A $@ 2>&1 | \ # awk '{{ # # fping 輸出邏輯很特殊,成功時輸出延遲,失敗時無輸出(因-q)或輸出統計 # # 我們用簡單邏輯:只要 $3 是數字,就是活著 # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/) # print $1 ",1," $3 ",0"; # else # print $1 ",0,-1,100"; # }}' > $filename || true # """] cmds = ["/bin/sh", "-c", f""" apk add --no-cache fping && \ filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \ echo "正在寫入: $filename" && \ fping -C 1 -q -A $@ 2>&1 | \ awk '{{ # 大寫 -C 1 的輸出格式很乾淨: # 成功時:10.10.0.85 : 12.34 (第3欄是數字) # 失敗時:10.10.0.89 : - (第3欄是減號) if ($3 ~ /^[0-9]+(\.[0-9]+)?$/) print $1 ",1," $3 ",0"; else print $1 ",0,-1,100"; }}' > $filename || true """], is_delete_operator_pod=True, in_cluster=True, ) # 5. 讀取 CSV 並匯入 Doris @task def load_to_doris(): # 1. 搜尋所有 CSV csv_files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv") print(f"找到 {len(csv_files)} 個結果檔案,準備匯入...") if not csv_files: print("沒有檔案需要匯入") return all_values = [] current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 2. 讀取所有檔案內容 for file_path in csv_files: with open(file_path, 'r') as f: for line in f: # line 格式: 8.8.8.8,1,12.34,0 parts = line.strip().split(',') if len(parts) == 4: ip, alive, latency, loss = parts # 組合 SQL Value all_values.append( f"('{current_time}', '{ip}', {alive}, {latency}, {loss})" ) # 3. 執行 Batch Insert if all_values: mysql_hook = MySqlHook(mysql_conn_id='doris_db') conn = mysql_hook.get_conn() cursor = conn.cursor() print(f"總共 {len(all_values)} 筆資料,開始寫入 DB...") # 分批寫入 (避免 SQL 太長),一次 2000 筆 batch_size = 2000 for i in range(0, len(all_values), batch_size): batch = all_values[i:i + batch_size] sql = f""" INSERT INTO ping_results (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate) VALUES {','.join(batch)} """ cursor.execute(sql) print(f"已寫入批次 {i} ~ {i + len(batch)}") conn.commit() cursor.close() conn.close() print("資料匯入完成!") else: print("CSV 檔案是空的,無資料匯入。") # --- 流程串接 --- # 先清理環境 -> 生成IP -> 切分 -> 平行 Ping -> 最後匯入 DB prepare_env = prepare_environment() ip_data = generate_target_ips() ip_batches = chunk_ips(ip_data) # Ping 任務需等待環境準備好 ping_task = ping_worker.expand(arguments=ip_batches) prepare_env >> ip_data >> ip_batches >> ping_task >> load_to_doris()