Airflow: ExternalTaskSensor untuk Dags anda
Bagaimana anda hendak pastikan dags yang anda bergantung berjaya sebelum trigger Dag baru
Untuk pengorkestraan pipeline, airflow banyak digunakan. Sebab apa? Mungkin sebab banyak module yang disupport oleh airflow dan ia mudah untuk menyusun task dependency.
Sebagai contoh jika anda ingin membuat dependency seperti ini
Dags file anda
staging_offload = KubernetesPodOperator(
name="staging_offload",
<configuration lain>
dag=dag,
)
clean_offload = KubernetesPodOperator(
name="clean_offload",
<configuration lain>
dag=dag,
)
clean2_offload = KubernetesPodOperator(
name="clean2_offload",
<configuration lain>
dag=dag,
)
summary_offload = KubernetesPodOperator(
name="summary_offload",
<configuration lain>
dag=dag,
)
Anda akan set task flow sepert ini
staging_offload >> clean_offload
Atau anda ingin jalankan task and secara serentak seperti ini
task flow adalah seperti berikut
staging_offload >> [clean_offload,clean2_offload] >> summary_offload
Tetapi bagaimana jika dependency anda dari pipeline/dags yang lain? Sebagai contoh pipeline anda memerlukan offload_fasa_1
untuk complete untuk membolehkan anda trigger offload_fasa_2
:
Ianya boleh dicapai dengan menggunakan ExternalTaskSensor
. Berikut adalah perkara yang anda perlu tambah
Dags file anda versi 2:
upstream_dag = ExternalTaskSensor(
task_id="wait-fasa-1-finish",
external_dag_id="offload_fasa_1",
mode="reschedule",
execution_delta=timedelta(minutes=25),
timeout=60*60
)
staging_offload = KubernetesPodOperator(
name="staging_offload",
<configuration lain>
dag=dag,
)
clean_offload = KubernetesPodOperator(
name="clean_offload",
<configuration lain>
dag=dag,
)
clean2_offload = KubernetesPodOperator(
name="clean2_offload",
<configuration lain>
dag=dag,
)
summary_offload = KubernetesPodOperator(
name="summary_offload",
<configuration lain>
dag=dag,
)
Jadi task flow sepatutnya:
upstream_dag >> staging_offload >> [clean_offload,clean2_offload] >> summary_offload
Itu sahaja. Selamat mencuba