|
- from __future__ import annotations
-
- from airflow import DAG
- from airflow.decorators import task
- from airflow.providers.mysql.hooks.mysql import MySqlHook
- from airflow.utils.trigger_rule import TriggerRule
- from datetime import datetime, timedelta
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import subprocess
- import re
-
- TOTAL_IPS = 100000
- BATCH_SIZE = 5000
- FPING_TIMEOUT_SEC = 60
- DB_EXEC_STEP = 2000
- MAX_WORKERS = 5000 # ✅ 每个任务内部并发数
-
- PING_POOL = "ping_pool"
- PING_POOL_SLOTS_PER_TASK = 1
-
- default_args = {
- "owner": "admin",
- "retries": 1,
- "retry_delay": timedelta(minutes=1),
- }
-
- LATENCY_RE = re.compile(r"(\d+\.?\d*)\s*ms")
-
-
- def _chunk_ranges(total: int, size: int) -> list[dict]:
- return [{"start": s, "end": min(s + size, total)} for s in range(0, total, size)]
-
-
- def _gen_ips_by_range(start: int, end: int) -> list[str]:
- ips = []
- for i in range(start, end):
- subnet = i // 255
- host = (i % 255) + 1
- ips.append(f"10.10.{subnet}.{host}")
- return ips
-
-
- with DAG(
- dag_id="05_ping_to_doris_celery_parallel",
- default_args=default_args,
- start_date=datetime(2023, 1, 1),
- catchup=False,
- tags=["monitor", "doris", "celery", "parallel"],
- max_active_runs=1,
- max_active_tasks=20,
- ) as dag:
- @task
- def make_batches() -> list[dict]:
- batches = _chunk_ranges(TOTAL_IPS, BATCH_SIZE)
- print(f"Generated {len(batches)} batches")
- return batches
-
-
- @task(
- pool=PING_POOL,
- pool_slots=PING_POOL_SLOTS_PER_TASK,
- execution_timeout=timedelta(seconds=FPING_TIMEOUT_SEC * 2),
- )
- def ping_and_load_batch(batch: dict) -> dict:
- start, end = int(batch["start"]), int(batch["end"])
- ip_batch = _gen_ips_by_range(start, end)
-
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- rows: list[tuple] = []
- alive_cnt = 0
- dead_cnt = 0
-
- def ping_single_ip(ip: str) -> tuple:
- """ping 单个 IP"""
- try:
- cmd = ["/usr/bin/fping", "-C", "1", "-A", ip]
- proc = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True,
- timeout=5,
- check=False
- )
- output = proc.stdout
-
- m = LATENCY_RE.search(output)
- if m and "timed out" not in output:
- latency = float(m.group(1))
- return (now, ip, 1, latency, 0)
- else:
- return (now, ip, 0, -1, 100)
-
- except Exception:
- return (now, ip, 0, -1, 100)
-
- # ✅ 并行 ping
- import time
- start_time = time.time()
-
- with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
- future_to_ip = {executor.submit(ping_single_ip, ip): ip for ip in ip_batch}
-
- for future in as_completed(future_to_ip):
- result = future.result()
- rows.append(result)
- if result[2] == 1:
- alive_cnt += 1
- else:
- dead_cnt += 1
-
- elapsed = time.time() - start_time
- print(f"[Batch {start}-{end}] Pinged {len(rows)} IPs in {elapsed:.2f}s with {MAX_WORKERS} threads")
-
- # 批量写入数据库
- if rows:
- try:
- mysql_hook = MySqlHook(mysql_conn_id="doris_db")
- conn = mysql_hook.get_conn()
- cur = conn.cursor()
-
- sql = """
- INSERT INTO ping_results
- (monitor_time, target_ip, is_alive, latency_ms, packet_loss_rate)
- VALUES (%s, %s, %s, %s, %s) \
- """
-
- for i in range(0, len(rows), DB_EXEC_STEP):
- cur.executemany(sql, rows[i: i + DB_EXEC_STEP])
-
- conn.commit()
- cur.close()
- conn.close()
-
- print(f"[Batch {start}-{end}] Written {len(rows)} records: {alive_cnt} alive, {dead_cnt} dead")
- except Exception as e:
- print(f"[DB Error] {e}")
- raise
-
- return {
- "start": start,
- "end": end,
- "count": end - start,
- "alive": alive_cnt,
- "dead": dead_cnt,
- "duration": elapsed,
- }
-
-
- @task(trigger_rule=TriggerRule.ALL_DONE)
- def summarize(stats: list[dict]) -> None:
- total = sum(x.get("count", 0) for x in stats)
- alive = sum(x.get("alive", 0) for x in stats)
- dead = sum(x.get("dead", 0) for x in stats)
- total_duration = sum(x.get("duration", 0) for x in stats)
- avg_duration = total_duration / len(stats) if stats else 0
-
- alive_pct = (alive * 100 // total) if total > 0 else 0
- print(f"[SUMMARY] Total: {total} | Alive: {alive} ({alive_pct}%) | Dead: {dead}")
- print(f"[SUMMARY] Batches: {len(stats)} | Avg duration: {avg_duration:.2f}s | Total: {total_duration:.2f}s")
-
-
- batches = make_batches()
- stats = ping_and_load_batch.expand(batch=batches)
- summarize(stats)
|