Airflow 是一个 Airbnb 的 Workflow 开源项目,在Github 上已经有超过两千星。Airflow 使用 Python 写的,支持 Python 2/3 两个版本。 传统 Workflow 通常使用 Text Files (json, xml / etc) 来定义 DAG, 然后 Scheduler 解析这些 DAG 文件形成具体的 Task Object 执行;Airflow 没这么干,它直接用 Python 写 DAG definition, 一下子突破了文本文件表达能力的局限,定义 DAG 变得简单。 另外,Airflow 的权限设计、限流设计、以及 Hook/Plugin 的设计都挺有意思,功能性、扩展性良好。当然,项目里的代码质量感觉比较一般,很多地方函数名和实现不太一致,造成理解障碍;也有很多 Flag 和重复出现的定义,显然是当初没有设计好、后面没有精力 Refactor 转而 Hack 的结果。但总体上,可读性中上,系统的扩展性非常好。
左侧 On/Off 按钮控制 DAG 的运行状态,Off 为暂停状态,On 为运行状态。注意:所有 DAG 脚本初次部署完成时均为 Off 状态。
若 DAG 名称处于不可点击状态,可能为 DAG 被删除或未载入。若 DAG 未载入,可点击右侧刷新按钮进行刷新。注意:由于可以部署若干 WebServer,所以单次刷新可能无法刷新所有 WebServer 缓存,可以尝试多次刷新。
Recent Tasks 会显示最近一次 DAG Run(可以理解为 DAG 的执行记录)中 Task Instances(可以理解为作业的执行记录)的运行状态,如果 DAG Run 的状态为 running,此时显示最近完成的一次以及正在运行的 DAG Run 中所有 Task Instances 的状态。
- Last Run 显示最近一次的 execution date。注意:execution date 并不是真实执行时间,具体细节在下文 DAG 配置中详述。将鼠标移至 execution date 右侧 info 标记上,会显示 start date,start date 为真实运行时间。start date 一般为 execution date 所对应的下次执行时间。
在 DAG 的树状图和 DAG 图中都可以点击对应的 Task Instance 以弹出 Task Instance 模态框,以进行 Task Instance 的相关操作。注意:选择的 Task Instance 为对应 DAG Run 中的 Task Instance。
在作业名字的右边有一个漏斗符号,点击后整个 DAG 的界面将只显示该作业及该作业的依赖作业。当该作业所处的 DAG 较大时,此功能有较大的帮助。
Task Instance Details 显示该 Task Instance 的详情,可以从中得知该 Task Instance 的当前状态,以及处于当前状态的原因。例如,若该 Task Instance 为 no status 状态,迟迟不进入 queued 及 running 状态,此时就可通过 Task Instance Details 中的 Dependency 及 Reason 得知原因。
Rendered 显示该 Task Instance 被渲染后的命令。
Run 指令可以直接执行当前作业。
Clear 指令为清除当前 Task Instance 状态,清除任意一个 Task Instance 都会使当前 DAG Run 的状态变更为 running。注意:如果被清除的 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。Clear 有额外的5个选项,均为多选,这些选项从左到右依次为:
- Past: 同时清除所有过去的 DAG Run 中此 Task Instance 所对应的 Task Instance。
- Future: 同时清除所有未来的 DAG Run 中此 Task Instance 所对应的 Task Instance。注意:仅清除已生成的 DAG Run 中的 Task Instance。
- Upstream: 同时清除该 DAG Run 中所有此 Task Instance 上游的 Task Instance。
- Downstream: 同时清除该 DAG Run 中所有此 Task Instance 下游的 Task Instance。
- Recursive: 当此 Task Instance 为 sub DAG 时,循环清除所有该 sub DAG 中的 Task Instance。注意:若当此 Task Instance 不是 sub DAG 则忽略此选项。
Mark Success 指令为讲当前 Task Instance 状态标记为 success。注意:如果该 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。
跨越时间的 DAG 的树表示。如果 pipeline(管道)延迟了,您可以很快地看到哪里出现了错误的步骤并且辨别出堵塞的进程。
图形视图可能是最全面的一种表现形式了。它可以可视化您的 DAG 依赖以及某个运行实例的当前状态。
过去 N 次运行的不同任务的持续时间。通过此视图,您可以查找异常值并快速了解 DAG 在多次运行中花费的时间。
甘特图可让您分析任务持续时间和重叠情况。您可以快速识别系统瓶颈和哪些特定 DAG 在运行中花费了大量的时间。
透明就是一切。虽然您的 pipeline(管道)代码在源代码管理中,但这是一种快速获取 DAG 代码并提供更多上下文的方法。
从上面的页面(树视图,图形视图,甘特图......)中,始终可以单击任务实例,并进入此丰富的上下文菜单,该菜单可以将您带到更详细的元数据并执行某些操作。
外部系统的连接信息存储在 Airflow 元数据数据库中,并在 UI 中进行管理(Menu -> Admin -> Connections)。在那里定义了conn_idconn_id而无需在任何地方硬编码任何此类信息。
可以定义具有相同conn_id许多连接,并且在这种情况下,并且当挂钩使用来自BaseHook的get_connection方法时,Airflow 将随机选择一个连接,允许在与重试一起使用时进行一些基本的负载平衡和容错。
Airflow 还能够通过操作系统中的环境变量引用连接。但它只支持 URI 格式。如果您需要为连接指定extra信息,请使用 Web UI。
如果在 Airflow 元数据数据库和环境变量中都定义了具有相同conn_id连接,则 Airflow 将仅引用环境变量中的连接(例如,给定conn_idpostgres_master,在开始搜索元数据数据库之前,Airflow 将优先在环境变量中搜索AIRFLOW_CONN_POSTGRES_MASTER并直接引用它)。
许多钩子都有一个默认的conn_id,使用该挂钩的 Operator 不需要提供显式连接 ID。 例如,PostgresHook的默认conn_id是postgres_default
XComs 允许任务交换消息,允许更细微的控制形式和共享状态。该名称是“交叉通信”的缩写。XComs 主要由键,值和时间戳定义,但也跟踪创建 XCom 的任务/DAG 以及何时应该可见的属性。任何可以被 pickle 的对象都可以用作 XCom 值,因此用户应该确保使用适当大小的对象。
变量是将任意内容或设置存储和检索为 Airflow 中的简单键值存储的通用方法。可以从 UI(Admin -> Variables),代码或 CLI 列出,创建,更新和删除变量。此外,json 设置文件可以通过 UI 批量上传。虽然管道代码定义和大多数常量和变量应该在代码中定义并存储在源代码控制中,但是通过 UI 可以访问和修改某些变量或配置项会很有用。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。