IntermediateLast updated: 2026-04-09 • 5 sections
Essential Airflow CLI commands, DAG patterns, operators, and best practices for orchestrating data pipelines.
| Command | Description | Example |
|---|---|---|
| airflow dags list | List all DAGs | airflow dags list -o table |
| airflow dags trigger | Manually trigger a DAG | airflow dags trigger my_dag --conf '{"key":"val"}' |
| airflow dags pause | Pause a DAG | airflow dags pause my_dag |
| airflow dags unpause | Unpause a DAG | airflow dags unpause my_dag |
| airflow tasks test | Test a single task (no state) | airflow tasks test my_dag my_task 2026-01-01 |
| airflow tasks run | Run a task with state tracking | airflow tasks run my_dag my_task 2026-01-01 |
| airflow tasks list | List tasks in a DAG | airflow tasks list my_dag --tree |
| airflow db init | Initialize Airflow metadata DB | airflow db init |
| airflow db upgrade | Upgrade metadata DB schema | airflow db upgrade |
| airflow connections list | List all connections | airflow connections list -o table |
| airflow variables get | Get a variable value | airflow variables get my_variable |
| airflow info | Show Airflow config info | airflow info |
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['[email protected]'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='Daily ETL from sources to warehouse',
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['etl', 'daily', 'production'],
max_active_runs=1,
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_api,
)
transform = SnowflakeOperator(
task_id='transform_data',
snowflake_conn_id='snowflake_prod',
sql='sql/transform.sql',
)
validate = PythonOperator(
task_id='validate_output',
python_callable=run_data_quality_checks,
)
extract >> transform >> validate| Operator | Use Case | Key Parameters |
|---|---|---|
| PythonOperator | Run Python functions | python_callable, op_args, op_kwargs |
| BashOperator | Run shell commands | bash_command, env |
| SnowflakeOperator | Execute Snowflake SQL | snowflake_conn_id, sql, warehouse |
| S3ToSnowflakeOperator | Load S3 files into Snowflake | s3_keys, table, schema, file_format |
| EmailOperator | Send email notifications | to, subject, html_content |
| BranchPythonOperator | Conditional branching | python_callable (must return task_id) |
| TriggerDagRunOperator | Trigger another DAG | trigger_dag_id, conf, wait_for_completion |
| ShortCircuitOperator | Skip downstream if False | python_callable (returns True/False) |
| DummyOperator | No-op for DAG structure | task_id (used for join points) |
| TaskGroup | Visual grouping of tasks | group_id, tooltip |
| Schedule | Cron Expression | Preset String |
|---|---|---|
| Every minute | * * * * * | @once (single run) |
| Hourly | 0 * * * * | @hourly |
| Daily at midnight | 0 0 * * * | @daily |
| Weekly (Sunday) | 0 0 * * 0 | @weekly |
| Monthly (1st) | 0 0 1 * * | @monthly |
| Yearly (Jan 1) | 0 0 1 1 * | @yearly |
| Weekdays 6 AM | 0 6 * * 1-5 | (custom cron) |
| Every 15 min | */15 * * * * | (custom cron) |
schedule_interval uses cron expressions or preset strings (@daily, @hourly) and runs at fixed intervals. Timetables (Airflow 2.2+) are Python classes that allow complex custom schedules like "business days only" or "last Friday of each month". Use timetables when cron expressions can't express your schedule.
Use catchup=False (default recommended) for most DAGs — it only runs for the current interval. Use catchup=True when you need Airflow to backfill historical runs, such as when deploying a new DAG that needs to process past data. Be careful: a start_date months ago with catchup=True will trigger hundreds of runs.
Use XComs (cross-communication) to pass small data between tasks. Tasks can push values with xcom_push() or return values (auto-pushed). Downstream tasks pull with xcom_pull(task_ids='upstream_task'). For large data, write to a shared storage (S3, GCS) and pass the file path via XComs.