温馨提示×

如何在Airflow中编写执行MySQL查询的任务

小樊
89
2024-08-10 22:38:44
栏目: 云计算

在Airflow中编写执行MySQL查询的任务可以通过使用PythonOperator来执行查询的Python函数。以下是一个简单的示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1
}

dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily')

def execute_mysql_query():
    # 连接MySQL数据库
    conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database")
    cursor = conn.cursor()
    
    # 执行查询
    cursor.execute("SELECT * FROM table")
    
    # 获取结果
    rows = cursor.fetchall()
    for row in rows:
        print(row)
    
    # 关闭连接
    conn.close()

mysql_task = PythonOperator(
    task_id='execute_mysql_query',
    python_callable=execute_mysql_query,
    dag=dag
)

mysql_task

在这个例子中,首先创建了一个DAG,并定义了一个Python函数execute_mysql_query,该函数连接到MySQL数据库,执行查询并打印结果。然后使用PythonOperator来执行这个函数,并将其添加到DAG中。当DAG运行时,该任务将连接到MySQL数据库执行查询。

0