from __future__ import annotations from airflow import DAG from airflow.decorators import task from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.utils.trigger_rule import TriggerRule from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed import subprocess import re TOTAL_IPS = 100000 BATCH_SIZE = 5000 FPING_TIMEOUT_SEC = 60 DB_EXEC_STEP = 2000 MAX_WORKERS = 5000 # ✅ 每个任务内部并发数 PING_POOL = "ping_pool" PING_POOL_SLOTS_PER_TASK = 1 default_args = { "owner": "admin", "retries": 1, "retry_delay": timedelta(minutes=1), } LATENCY_RE = re.compile(r"(\d+\.?\d*)\s*ms") def _chunk_ranges(total: int, size: int) -> list[dict]: return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)] def _gen_ips_by_range(start: int, end: int) -> list[str]: ips = [] for i in range(start, end): subnet = i // 255 host = (i % 255) + 1 ips.append(f"10.10.{subnet}.{host}") return ips with DAG( dag_id="05_ping_to_doris_celery_parallel", default_args=default_args, start_date=datetime(2023, 1, 1), catchup=False, tags=["monitor", "doris", "celery", "parallel"], max_active_runs=1, max_active_tasks=20, ) as dag: @task def make_batches() -> list[dict]: batches = _chunk_ranges(TOTAL_IPS, BATCH_SIZE) print(f"Generated {len(batches)} batches") return batches @task( pool=PING_POOL, pool_slots=PING_POOL_SLOTS_PER_TASK, execution_timeout=timedelta(seconds=FPING_TIMEOUT_SEC * 2), ) def ping_and_load_batch(batch: dict) -> dict: start, end = int(batch["start"]), int(batch["end"]) ip_batch = _gen_ips_by_range(start, end) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") rows: list[tuple] = [] alive_cnt = 0 dead_cnt = 0 def ping_single_ip(ip: str) -> tuple: """ping 单个 IP""" try: cmd = ["/usr/bin/fping", "-C", "1", "-A", ip] proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=5, check=False ) output = proc.stdout m = LATENCY_RE.search(output) if m and "timed out" not in output: latency = float(m.group(1)) return (now, ip, 1, latency, 0) else: return (now, ip, 0, -1, 100) except Exception: return (now, ip, 0, -1, 100) # ✅ 并行 ping import time start_time = time.time() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_ip = {executor.submit(ping_single_ip, ip): ip for ip in ip_batch} for future in as_completed(future_to_ip): result = future.result() rows.append(result) if result[2] == 1: alive_cnt += 1 else: dead_cnt += 1 elapsed = time.time() - start_time print(f"[Batch {start}-{end}] Pinged {len(rows)} IPs in {elapsed:.2f}s with {MAX_WORKERS} threads") # 批量写入数据库 if rows: try: mysql_hook = MySqlHook(mysql_conn_id="doris_db") conn = mysql_hook.get_conn() cur = conn.cursor() sql = """ INSERT INTO ping_results (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate) VALUES (%s, %s, %s, %s, %s) \ """ for i in range(0, len(rows), DB_EXEC_STEP): cur.executemany(sql, rows[i: i + DB_EXEC_STEP]) conn.commit() cur.close() conn.close() print(f"[Batch {start}-{end}] Written {len(rows)} records: {alive_cnt} alive, {dead_cnt} dead") except Exception as e: print(f"[DB Error] {e}") raise return { "start": start, "end": end, "count": end - start, "alive": alive_cnt, "dead": dead_cnt, "duration": elapsed, } @task(trigger_rule=TriggerRule.ALL_DONE) def summarize(stats: list[dict]) -> None: total = sum(x.get("count", 0) for x in stats) alive = sum(x.get("alive", 0) for x in stats) dead = sum(x.get("dead", 0) for x in stats) total_duration = sum(x.get("duration", 0) for x in stats) avg_duration = total_duration / len(stats) if stats else 0 alive_pct = (alive * 100 // total) if total > 0 else 0 print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead}") print(f"[SUMMARY] Batches: {len(stats)} | Avg duration: {avg_duration:.2f}s | Total: {total_duration:.2f}s") batches = make_batches() stats = ping_and_load_batch.expand(batch=batches) summarize(stats)