我就废话不多说了,直接上代码吧!
""" 对于每天存储文件,文件数量过多,占用空间 采用保存最新的三个文件 """ from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.models import Variable from sctetl.airflow.utils import dateutils from datetime import datetime,timedelta import logging import os import shutil """ base_dir = "/data" data_dir = "/gather" "gather下边存在不同的文件夹" "/data/gather/test" "test路径下有以下文件夹" "20180812、20180813、20180814、20180815、20180816" """ base_dir = Variable.get("base_dir") data_dir = Variable.get("data_dir") keep = 3 default_arg = { "owner":"airflow", "depends_on_past":False, "start_date":dateutils.get_start_date_local(2018,8,27,18,5), "email":[''], "email_on_failure":False, "email_on_retry":False, "retries":1, "retry_delay":timedelta(minutes=5) } dag = DAG(dag_id="keep_three_day",default_args=default_arg,schedule_interval=dateutils.get_schedule_interval_local(18,5)) def keep_three_day(): path = os.path.join(base_dir, data_dir) date_cates = os.listdir(path) for cate in date_cates: p = os.path.join(base_dir, data_dir, cate) if os.path.isdir(p): dir_names = os.listdir(p) dir_names.sort() for i in dir_names[:-keep]: logging.info("删除目录 {path}".format(path=os.path.join(p, i))) shutil.rmtree(os.path.join(p, i)) with dag: keep_three_file = PythonOperator(task_id="keep_three_file",python_callable=keep_three_day(),dag=dag) keep_three_file
以上这篇python 实现保存最新的三份文件,其余的都删掉就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持亿速云。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。