您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

166 行
5.1 KiB

  1. from __future__ import annotations
  2. from airflow import DAG
  3. from airflow.decorators import task
  4. from airflow.providers.mysql.hooks.mysql import MySqlHook
  5. from airflow.utils.trigger_rule import TriggerRule
  6. from datetime import datetime, timedelta
  7. from concurrent.futures import ThreadPoolExecutor, as_completed
  8. import subprocess
  9. import re
  10. TOTAL_IPS = 100000
  11. BATCH_SIZE = 5000
  12. FPING_TIMEOUT_SEC = 60
  13. DB_EXEC_STEP = 2000
  14. MAX_WORKERS = 5000 # ✅ 每个任务内部并发数
  15. PING_POOL = "ping_pool"
  16. PING_POOL_SLOTS_PER_TASK = 1
  17. default_args = {
  18. "owner": "admin",
  19. "retries": 1,
  20. "retry_delay": timedelta(minutes=1),
  21. }
  22. LATENCY_RE = re.compile(r"(\d+\.?\d*)\s*ms")
  23. def _chunk_ranges(total: int, size: int) -> list[dict]:
  24. return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)]
  25. def _gen_ips_by_range(start: int, end: int) -> list[str]:
  26. ips = []
  27. for i in range(start, end):
  28. subnet = i // 255
  29. host = (i % 255) + 1
  30. ips.append(f"10.10.{subnet}.{host}")
  31. return ips
  32. with DAG(
  33. dag_id="05_ping_to_doris_celery_parallel",
  34. default_args=default_args,
  35. start_date=datetime(2023, 1, 1),
  36. catchup=False,
  37. tags=["monitor", "doris", "celery", "parallel"],
  38. max_active_runs=1,
  39. max_active_tasks=20,
  40. ) as dag:
  41. @task
  42. def make_batches() -> list[dict]:
  43. batches = _chunk_ranges(TOTAL_IPS, BATCH_SIZE)
  44. print(f"Generated {len(batches)} batches")
  45. return batches
  46. @task(
  47. pool=PING_POOL,
  48. pool_slots=PING_POOL_SLOTS_PER_TASK,
  49. execution_timeout=timedelta(seconds=FPING_TIMEOUT_SEC * 2),
  50. )
  51. def ping_and_load_batch(batch: dict) -> dict:
  52. start, end = int(batch["start"]), int(batch["end"])
  53. ip_batch = _gen_ips_by_range(start, end)
  54. now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  55. rows: list[tuple] = []
  56. alive_cnt = 0
  57. dead_cnt = 0
  58. def ping_single_ip(ip: str) -> tuple:
  59. """ping 单个 IP"""
  60. try:
  61. cmd = ["/usr/bin/fping", "-C", "1", "-A", ip]
  62. proc = subprocess.run(
  63. cmd,
  64. stdout=subprocess.PIPE,
  65. stderr=subprocess.STDOUT,
  66. text=True,
  67. timeout=5,
  68. check=False
  69. )
  70. output = proc.stdout
  71. m = LATENCY_RE.search(output)
  72. if m and "timed out" not in output:
  73. latency = float(m.group(1))
  74. return (now, ip, 1, latency, 0)
  75. else:
  76. return (now, ip, 0, -1, 100)
  77. except Exception:
  78. return (now, ip, 0, -1, 100)
  79. # ✅ 并行 ping
  80. import time
  81. start_time = time.time()
  82. with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  83. future_to_ip = {executor.submit(ping_single_ip, ip): ip for ip in ip_batch}
  84. for future in as_completed(future_to_ip):
  85. result = future.result()
  86. rows.append(result)
  87. if result[2] == 1:
  88. alive_cnt += 1
  89. else:
  90. dead_cnt += 1
  91. elapsed = time.time() - start_time
  92. print(f"[Batch {start}-{end}] Pinged {len(rows)} IPs in {elapsed:.2f}s with {MAX_WORKERS} threads")
  93. # 批量写入数据库
  94. if rows:
  95. try:
  96. mysql_hook = MySqlHook(mysql_conn_id="doris_db")
  97. conn = mysql_hook.get_conn()
  98. cur = conn.cursor()
  99. sql = """
  100. INSERT INTO ping_results
  101. (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
  102. VALUES (%s, %s, %s, %s, %s) \
  103. """
  104. for i in range(0, len(rows), DB_EXEC_STEP):
  105. cur.executemany(sql, rows[i: i + DB_EXEC_STEP])
  106. conn.commit()
  107. cur.close()
  108. conn.close()
  109. print(f"[Batch {start}-{end}] Written {len(rows)} records: {alive_cnt} alive, {dead_cnt} dead")
  110. except Exception as e:
  111. print(f"[DB Error] {e}")
  112. raise
  113. return {
  114. "start": start,
  115. "end": end,
  116. "count": end - start,
  117. "alive": alive_cnt,
  118. "dead": dead_cnt,
  119. "duration": elapsed,
  120. }
  121. @task(trigger_rule=TriggerRule.ALL_DONE)
  122. def summarize(stats: list[dict]) -> None:
  123. total = sum(x.get("count", 0) for x in stats)
  124. alive = sum(x.get("alive", 0) for x in stats)
  125. dead = sum(x.get("dead", 0) for x in stats)
  126. total_duration = sum(x.get("duration", 0) for x in stats)
  127. avg_duration = total_duration / len(stats) if stats else 0
  128. alive_pct = (alive * 100 // total) if total > 0 else 0
  129. print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead}")
  130. print(f"[SUMMARY] Batches: {len(stats)} | Avg duration: {avg_duration:.2f}s | Total: {total_duration:.2f}s")
  131. batches = make_batches()
  132. stats = ping_and_load_batch.expand(batch=batches)
  133. summarize(stats)