from __future__ import annotations from ftplib import print_line from airflow import DAG from airflow.decorators import task from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.utils.trigger_rule import TriggerRule from datetime import datetime, timedelta import subprocess import re TOTAL_IPS = 100000 BATCH_SIZE = 5000 FPING_TIMEOUT_SEC = 60 DB_EXEC_STEP = 2000 PING_POOL = "ping_pool" PING_POOL_SLOTS_PER_TASK = 1 default_args = { "owner": "admin", "retries": 1, "retry_delay": timedelta(minutes=1), } # 修正: 匹配 "1.06 ms" 格式 LATENCY_RE = re.compile(r"(\d+\.?\d*)\s*ms") def _chunk_ranges(total: int, size: int) -> list[dict]: return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)] def _gen_ips_by_range(start: int, end: int) -> list[str]: ips = [] for i in range(start, end): subnet = i // 255 host = (i % 255) + 1 ips.append(f"10.10.{subnet}.{host}") return ips with DAG( dag_id="05_ping_to_doris_celery", default_args=default_args, start_date=datetime(2023, 1, 1), catchup=False, tags=["monitor", "doris", "celery", "latest"], max_active_runs=1, max_active_tasks=8, ) as dag: @task def make_batches() -> list[dict]: return _chunk_ranges(TOTAL_IPS, BATCH_SIZE) @task( pool=PING_POOL, pool_slots=PING_POOL_SLOTS_PER_TASK, execution_timeout=timedelta(seconds=FPING_TIMEOUT_SEC * 2), ) def ping_and_load_batch(batch: dict) -> dict: start, end = int(batch["start"]), int(batch["end"]) ip_batch = _gen_ips_by_range(start, end) cmd = ["/opt/tools/bin/fping", "-C", "1", "-A"] + ip_batch now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") rows: list[tuple] = [] alive_cnt = 0 dead_cnt = 0 try: proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=FPING_TIMEOUT_SEC, ) output = proc.stdout.splitlines() for line in output: m = LATENCY_RE.search(line) m = LATENCY_RE.search(line) # 範例輸出: # 活的: "10.10.0.33 : [0], 64 bytes, 1.06 ms (1.06 avg, 0% loss)" # 死的: "10.10.0.1 : [0], timed out" # 或: "10.10.0.2 : -" # 提取 IP parts = line.split(":") if len(parts) < 2: continue ip = parts[0].strip() rest = ":".join(parts[1:]) # 剩餘部分 # 檢查是否有延遲資訊 m = LATENCY_RE.search(rest) if m and "timed out" not in rest: # 活著且有延遲資訊 latency = float(m.group(1)) alive_cnt += 1 rows.append((now, ip, 1, latency, 0)) # print(f"✓ {ip}: {latency} ms") else: # 死掉或 timeout dead_cnt += 1 rows.append((now, ip, 0, -1, 100)) # print(f"✗ {ip}: dead") except Exception as e: print(f"[ping_and_load_batch] exception={repr(e)} range=({start},{end}) size={len(ip_batch)}") dead_cnt = len(ip_batch) for ip in ip_batch: rows.append((now, ip, 0, -1, 100)) # 寫入 Doris if rows: mysql_hook = MySqlHook(mysql_conn_id="doris_db") conn = mysql_hook.get_conn() cur = conn.cursor() sql = """ INSERT INTO ping_results (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate) VALUES (%s, %s, %s, %s, %s) \ """ for i in range(0, len(rows), DB_EXEC_STEP): cur.executemany(sql, rows[i: i + DB_EXEC_STEP]) conn.commit() cur.close() conn.close() print(f"[Batch {start}-{end}] Written {len(rows)} records to Doris") return { "start": start, "end": end, "count": end - start, "alive": alive_cnt, "dead": dead_cnt, } @task(trigger_rule=TriggerRule.ALL_DONE) def summarize(stats: list[dict]) -> None: total = sum(x.get("count", 0) for x in stats) alive = sum(x.get("alive", 0) for x in stats) dead = sum(x.get("dead", 0) for x in stats) alive_pct = (alive * 100 // total) if total > 0 else 0 print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead} | Batches: {len(stats)}") batches = make_batches() stats = ping_and_load_batch.expand(batch=batches) summarize(stats)