Airflow 的運作邏輯是由以下三個核心概念組成的:
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
import json
from pathlib import Path
with DAG(
dag_id="taskflow_realistic_etl_path_xcom",
start_date=datetime(2024, 1, 1),
# schedule="0 2 * * *",
catchup=False,
tags=["example", "taskflow", "xcom"],
) as dag:
@task
def extract() -> str:
# 實務上:抓 API/DB 後落地成檔案(回傳「路徑」最推薦)
out = Path("/tmp/airflow_demo_extract.json")
out.write_text(json.dumps({"rows": 3, "data": [1, 2, 3]}))
return str(out) # 這個字串會自動進 XCom
@task
def transform(path: str) -> str:
p = Path(path)
payload = json.loads(p.read_text())
# 假裝做轉換:把每個值 * 10
payload["data"] = [x * 10 for x in payload["data"]]
out = Path("/tmp/airflow_demo_transform.json")
out.write_text(json.dumps(payload))
return str(out)
@task
def load(path: str) -> None:
payload = json.loads(Path(path).read_text())
# 實務上:寫入 Doris / Postgres / S3 / Kafka...
print(f"LOAD rows={payload['rows']} data={payload['data']}")
load(transform(extract()))
# e = extract()
# t = transform(e)
# l = load(t)
# e >> t >> l
| 傳統痛點 | Airflow 解決方案 |
|---|---|
| GUI 黑箱作業 流程邏輯藏在圖形介面與設定檔深處,版控困難,難以 Code Review。 |
Configuration as Code 流程即代碼。邏輯透明、可版控、可測試、可多人協作開發。 |
| **依賴地獄 (Dependency Hell)** 任務 A 失敗,任務 B 卻繼續跑;或跨系統依賴難以管理。 |
DAG 狀態管理 嚴格的依賴控制,支援複雜的邏輯判斷 (Branching) 與跨 DAG 觸發。 |
| 授權費昂貴 商用軟體以 Agent 或 Job 數量計費,擴充成本極高。 |
Open Source 開源免費,無 Agent 數量限制,適合大規模雲端動態擴展。 |
| 人才斷層 年輕工程師不想學專有的商用工具指令。 |
Python 標準 使用通用的 Python 語言,人才庫龐大且易於招募。 |
| 比較維度 | Apache Airflow 3.0 | BMC Control-M |
|---|---|---|
| 核心哲學 | **Code-First (代碼優先)** 適合開發者 (Dev) 與資料工程師。 |
**Config-First (設定優先)** 適合維運人員 (Ops),以 GUI 拖拉為主。 |
| 版控與協作 | 🏆 Git Flow 整合 天生支援 Branch、PR、Code Review 流程。 |
弱 依賴匯出 XML/JSON 進行版控,難以多人同時開發。 |
| 雲端與生態 | 🏆 **雲原生 (Cloud Native)** AWS/GCP/Azure 整合極深,容器化支援佳。 |
傳統強項 強項在 Mainframe (大型主機) 與地端 Legacy 系統。 |
| 成本結構 | 硬體與人力成本 軟體免費,但需投入工程人力維護。 |
高昂授權費 按 Job 數或處理器核心計費,擴充昂貴。 |
| 客製化能力 | 無限 任何 Python 能寫的邏輯都能跑。 |
受限 需依賴原廠提供的 Plugin 或自行開發複雜腳本。 |
兩者在實務上常並存,依工作負載性質選擇合適工具。
Airflow 能夠實現: