You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

236 lines
8.4 KiB

  1. import os
  2. import glob
  3. from airflow import DAG
  4. from airflow.decorators import task
  5. from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
  6. from airflow.providers.mysql.hooks.mysql import MySqlHook
  7. from kubernetes.client import models as k8s
  8. from datetime import datetime, timedelta
  9. # --- 設定參數 ---
  10. TOTAL_IPS = 1000 # 總目標:1000 台
  11. BATCH_SIZE = 250 # 每個 Pod 負責 250 台
  12. # 設定路徑
  13. # 1. NFS 上的實際路徑 (我們用來清空舊檔)
  14. NFS_REAL_PATH = "/srv/nfs/dags/ping_results"
  15. # 2. Airflow Worker 讀取路徑 (Airflow 預設把 DAGs 掛載在 /opt/airflow/dags)
  16. AIRFLOW_READ_PATH = "/opt/airflow/dags/ping_results"
  17. # 3. Pod 內部的寫入路徑 (我們等一下會掛載進去)
  18. POD_MOUNT_PATH = "/mnt/dags/ping_results"
  19. default_args = {
  20. 'owner': 'admin',
  21. 'retries': 1,
  22. 'retry_delay': timedelta(minutes=1),
  23. }
  24. with DAG(
  25. '03_ping_to_doris',
  26. default_args=default_args,
  27. description='Ping 1000台 -> 存CSV -> 匯入 Doris',
  28. # schedule=timedelta(minutes=5),
  29. start_date=datetime(2023, 1, 1),
  30. catchup=False,
  31. tags=['monitor', 'doris', 'production'],
  32. ) as dag:
  33. # 0. 準備工作:確保資料夾存在,並清空上一輪的 CSV
  34. @task
  35. def prepare_environment():
  36. # 因為 Airflow Worker 本身也有掛載 DAGs 資料夾,我們直接操作
  37. if not os.path.exists(AIRFLOW_READ_PATH):
  38. os.makedirs(AIRFLOW_READ_PATH, exist_ok=True)
  39. # 刪除舊的 .csv 檔案,避免重複匯入
  40. # 注意:真實生產環境可能會將舊檔搬移到 backup 資料夾,這裡示範直接刪除
  41. files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv")
  42. for f in files:
  43. try:
  44. os.remove(f)
  45. except OSError:
  46. pass
  47. print(f"環境準備完成,已清理 {len(files)} 個舊檔案")
  48. # 1. 生成 IP
  49. @task
  50. def generate_target_ips():
  51. ip_list = []
  52. for i in range(TOTAL_IPS):
  53. subnet = i // 255
  54. host = (i % 255) + 1
  55. ip_list.append(f"10.10.{subnet}.{host}")
  56. return ip_list
  57. # 2. 切分批次
  58. @task
  59. def chunk_ips(all_ips):
  60. return [all_ips[i:i + BATCH_SIZE] for i in range(0, len(all_ips), BATCH_SIZE)]
  61. # 3. 定義 K8s Volume (讓 Pod 可以寫入 NFS)
  62. nfs_vol = k8s.V1Volume(
  63. name="nfs-storage",
  64. persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
  65. claim_name="airflow-dags-pvc" # 使用存放 DAG 的那個 PVC
  66. )
  67. )
  68. nfs_mount = k8s.V1VolumeMount(
  69. name="nfs-storage",
  70. mount_path="/mnt/dags" # 掛載到 Pod 裡的 /mnt/dags
  71. )
  72. # # 4. 執行 Ping 並寫檔
  73. # ping_worker = KubernetesPodOperator.partial(
  74. # task_id="fping_worker",
  75. # name="fping-pod",
  76. # namespace="airflow",
  77. # image="alpine:3.18",
  78. # # 掛載 NFS
  79. # volumes=[nfs_vol],
  80. # volume_mounts=[nfs_mount],
  81. # # --- 核心邏輯 ---
  82. # # 1. 安裝 fping
  83. # # 2. 定義檔名: 使用隨機數避免多個 Pod 檔名衝突
  84. # # 3. fping 執行
  85. # # 4. awk 解析:
  86. # # - 如果第3欄是數字 (例如 12.34) -> 存活 (1), 延遲=$3, 掉包=0
  87. # # - 否則 -> 死亡 (0), 延遲=-1, 掉包=100
  88. # # 5. 結果寫入 CSV 檔案
  89. # # 6. || true 確保任務永遠綠燈
  90. # cmds=["/bin/sh", "-c", f"""
  91. # apk add --no-cache fping && \
  92. # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
  93. # echo "正在寫入: $filename" && \
  94. # fping -c 1 -q -A $@ 2>&1 | \
  95. # awk '{{
  96. # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
  97. # print $1 ",1," $3 ",0";
  98. # else
  99. # print $1 ",0,-1,100";
  100. # }}' > $filename || true
  101. # """],
  102. # is_delete_operator_pod=True,
  103. # in_cluster=True,
  104. # )
  105. # 4. 執行 Ping 並寫檔
  106. ping_worker = KubernetesPodOperator.partial(
  107. task_id="fping_worker",
  108. name="fping-pod",
  109. namespace="airflow",
  110. image="alpine:3.18",
  111. # 建議開啟 Host Network,這對監控最準 (如果您之前沒加,建議加上)
  112. hostnetwork=True,
  113. dnspolicy="ClusterFirstWithHostNet",
  114. volumes=[nfs_vol],
  115. volume_mounts=[nfs_mount],
  116. # --- 修正後的指令 ---
  117. # fping 參數調整:
  118. # -c 1 : 發送 1 個封包
  119. # -r 1 : 如果失敗,重試 1 次 (這是關鍵!避免 ARP 延遲導致誤判)
  120. # -t 1000 : 超時等待 1000 毫秒 (預設 500 太短)
  121. # cmds=["/bin/sh", "-c", f"""
  122. # apk add --no-cache fping && \
  123. # filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
  124. # echo "正在寫入: $filename" && \
  125. # fping -c 1 -r 1 -t 1000 -q -A $@ 2>&1 | \
  126. # awk '{{
  127. # # fping 輸出邏輯很特殊,成功時輸出延遲,失敗時無輸出(因-q)或輸出統計
  128. # # 我們用簡單邏輯:只要 $3 是數字,就是活著
  129. # if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
  130. # print $1 ",1," $3 ",0";
  131. # else
  132. # print $1 ",0,-1,100";
  133. # }}' > $filename || true
  134. # """]
  135. cmds = ["/bin/sh", "-c", f"""
  136. apk add --no-cache fping && \
  137. filename="{POD_MOUNT_PATH}/batch_$(date +%s)_$RANDOM.csv" && \
  138. echo "正在寫入: $filename" && \
  139. fping -C 1 -q -A $@ 2>&1 | \
  140. awk '{{
  141. # 大寫 -C 1 的輸出格式很乾淨:
  142. # 成功時:10.10.0.85 : 12.34 (第3欄是數字)
  143. # 失敗時:10.10.0.89 : - (第3欄是減號)
  144. if ($3 ~ /^[0-9]+(\.[0-9]+)?$/)
  145. print $1 ",1," $3 ",0";
  146. else
  147. print $1 ",0,-1,100";
  148. }}' > $filename || true
  149. """],
  150. is_delete_operator_pod=True,
  151. in_cluster=True,
  152. )
  153. # 5. 讀取 CSV 並匯入 Doris
  154. @task
  155. def load_to_doris():
  156. # 1. 搜尋所有 CSV
  157. csv_files = glob.glob(f"{AIRFLOW_READ_PATH}/*.csv")
  158. print(f"找到 {len(csv_files)} 個結果檔案,準備匯入...")
  159. if not csv_files:
  160. print("沒有檔案需要匯入")
  161. return
  162. all_values = []
  163. current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  164. # 2. 讀取所有檔案內容
  165. for file_path in csv_files:
  166. with open(file_path, 'r') as f:
  167. for line in f:
  168. # line 格式: 8.8.8.8,1,12.34,0
  169. parts = line.strip().split(',')
  170. if len(parts) == 4:
  171. ip, alive, latency, loss = parts
  172. # 組合 SQL Value
  173. all_values.append(
  174. f"('{current_time}', '{ip}', {alive}, {latency}, {loss})"
  175. )
  176. # 3. 執行 Batch Insert
  177. if all_values:
  178. mysql_hook = MySqlHook(mysql_conn_id='doris_db')
  179. conn = mysql_hook.get_conn()
  180. cursor = conn.cursor()
  181. print(f"總共 {len(all_values)} 筆資料,開始寫入 DB...")
  182. # 分批寫入 (避免 SQL 太長),一次 2000 筆
  183. batch_size = 2000
  184. for i in range(0, len(all_values), batch_size):
  185. batch = all_values[i:i + batch_size]
  186. sql = f"""
  187. INSERT INTO ping_results
  188. (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
  189. VALUES {','.join(batch)}
  190. """
  191. cursor.execute(sql)
  192. print(f"已寫入批次 {i} ~ {i + len(batch)}")
  193. conn.commit()
  194. cursor.close()
  195. conn.close()
  196. print("資料匯入完成!")
  197. else:
  198. print("CSV 檔案是空的,無資料匯入。")
  199. # --- 流程串接 ---
  200. # 先清理環境 -> 生成IP -> 切分 -> 平行 Ping -> 最後匯入 DB
  201. prepare_env = prepare_environment()
  202. ip_data = generate_target_ips()
  203. ip_batches = chunk_ips(ip_data)
  204. # Ping 任務需等待環境準備好
  205. ping_task = ping_worker.expand(arguments=ip_batches)
  206. prepare_env >> ip_data >> ip_batches >> ping_task >> load_to_doris()