在Airflow中编写执行MySQL查询的任务可以通过使用PythonOperator
来执行查询的Python函数。以下是一个简单的示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily')
def execute_mysql_query():
# 连接MySQL数据库
conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database")
cursor = conn.cursor()
# 执行查询
cursor.execute("SELECT * FROM table")
# 获取结果
rows = cursor.fetchall()
for row in rows:
print(row)
# 关闭连接
conn.close()
mysql_task = PythonOperator(
task_id='execute_mysql_query',
python_callable=execute_mysql_query,
dag=dag
)
mysql_task
在这个例子中,首先创建了一个DAG,并定义了一个Python函数execute_mysql_query
,该函数连接到MySQL数据库,执行查询并打印结果。然后使用PythonOperator
来执行这个函数,并将其添加到DAG中。当DAG运行时,该任务将连接到MySQL数据库执行查询。