最近在弄畫像標簽每天ETL的調(diào)度事情,這篇文章分享一下一個開源的ETL工具Airflow。
一、基礎(chǔ)概念
Airflow是Airbnb內(nèi)部發(fā)起并開源的一個ETL管理平臺,使用Python編寫實現(xiàn)的任務管理、調(diào)度、監(jiān)控工作流平臺。這是其官方文檔地址:Apache Airflow (incubating) Documentation,關(guān)于airflow產(chǎn)品的使用,里面有詳細的介紹。
Airflow的調(diào)度依賴于crontab命令,與crontab相比airflow可以直觀的看到任務執(zhí)行情況、任務之間的邏輯依賴關(guān)系、可以設定任務出錯時郵件提醒、可以查看任務執(zhí)行日志。
而crontab命令管理的方式存在以下幾方面的弊端:
1、在多任務調(diào)度執(zhí)行的情況下,難以理清任務之間的依賴關(guān)系;
2、不便于查看當前執(zhí)行到哪一個任務;
3、任務執(zhí)行失敗時不便于查看執(zhí)行日志,也即不方便定位報錯的任務和錯誤原因;
4、不便于查看調(diào)度流下每個任務執(zhí)行的起止消耗時間,這對于優(yōu)化task作業(yè)是非常重要的;
5、不便于記錄歷史調(diào)度任務的執(zhí)行情況,而這對于優(yōu)化作業(yè)和錯誤排查是很重要的;
Airflow中有兩個最基本的概念:DAG和task,下面主要介紹一下。
DAG是什么:
DAG是Directed Acyclic Graph的縮寫,即有向無環(huán)圖。是所有要執(zhí)行任務腳本(即task)的集合,在這個DAG中定義了各個task的依賴關(guān)系、調(diào)度時間、失敗重啟機制等。通過DAGid來標識每個DAG任務
每個DAG是由1到多個task組成
task是什么:
task是具體執(zhí)行的任務腳本,可以是一個命令行(BashOperator),也可以是python腳本等。
二、主要功能鍵介紹
1、DAG管理
在airflow的主頁,可以看到當前所有的DAG列表(通俗點說就是所有的調(diào)度任務列表),中間“Task by State”那一列顯示任務的執(zhí)行狀態(tài)。深綠色的表示已執(zhí)行成功的task,淺綠色的表示當前正在執(zhí)行的task。
右側(cè)“Links”那一列可以鏈接查看當前DAG任務的依賴關(guān)系、執(zhí)行時間、執(zhí)行腳本等情況。
當點擊具體某一個DAG任務時,就可以進去查看該DAG的調(diào)度依賴、執(zhí)行時長、調(diào)度腳本等具體執(zhí)行情況
2、調(diào)度依賴查看
通過“Graph View”選項可以查看當前調(diào)度任務的依賴關(guān)系,當調(diào)度作業(yè)較為復雜時,這種圖形化方式展示的依賴關(guān)系可以幫助用戶迅速理清。
在用戶畫像的調(diào)度管理中,每天需要執(zhí)行cookieid和userid兩個維度的畫像腳本,因此可以設定并行執(zhí)行任務,讓cookieid和userid的腳本同時執(zhí)行調(diào)度作業(yè)
3、執(zhí)行狀態(tài)
通過“Tree View”選項可以查看當前任務的執(zhí)行狀態(tài),包括當前執(zhí)行到哪一個task,還有哪些task未執(zhí)行。哪些task執(zhí)行成功,哪些task執(zhí)行失敗。
也可以查看歷史上該DAG下面各task的執(zhí)行情況。
4、各task執(zhí)行時間
通過“Gantt”選項可以查看各task任務的執(zhí)行起止時間的甘特圖。
了解各task執(zhí)行的時間可以有針對性地優(yōu)化執(zhí)行時間長的task對應腳本。
5、DAG調(diào)度腳本
通過“Code”選項,可以查看當前DAG調(diào)度的腳本。腳本里面定義了需要執(zhí)行的task、執(zhí)行順序及依賴、調(diào)度時間、失敗發(fā)送郵件或重調(diào)機制等方法
三、腳本實例
在開發(fā)過程中,task腳本是需要被調(diào)度的腳本,在Airflow中主要需要開發(fā)的是DAG腳本,即管理task任務的腳本。通過一個DAG腳本,將各個調(diào)度作業(yè)腳本串起來,按照業(yè)務邏輯去執(zhí)行。
1、DAG腳本
下面通過一個具體DAG腳本實例來了解一下:
from airflow.operators.bash_operator import BashOperator import airflow from airflow.models import DAG from airflow import operators from airflow.contrib.hooks import SSHHook from airflow.models import BaseOperator from airflow.contrib.operators import SSHExecuteOperator from airflow.operators.latest_only_operator import LatestOnlyOperator import os import sys from datetime import timedelta,date,datetime import pendulum from airflow.utils.trigger_rule import TriggerRule default_args = { 'owner': 'superuserprofile', 'depends_on_past': False, 'start_date': datetime(2018, 06, 01), 'email': ['administer@testemail.com'], 'email_on_failure': True , 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=1), } os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6' sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
該段腳本定義了需要引入的包,以及默認的DAG參數(shù)配置,包括task是否依賴上游任務,首次調(diào)度時間、任務失敗接收郵箱、任務失敗是否重新調(diào)起等
dag = DAG( 'superuserprofile', default_args=default_args, description='A userprofile test', schedule_interval='00 08 * * *' )
該段腳本實例化了DAG,設置了DAGid,調(diào)度執(zhí)行時間
gender_task = BashOperator( task_id='gender', bash_command=' sudo -E -H -u userprofile spark-submit --master yarn --deploy-mode client --driver-memory 1g --executor-memory 8g --executor-cores 2 --num-executors 200 /airflow/userprofile_gender.py {{ ds_nodash }} ', dag=dag, trigger_rule=TriggerRule.ALL_DONE ) country_task = BashOperator( task_id='country', bash_command=' sudo -E -H -u userprofile spark-submit --master yarn --deploy-mode client --driver-memory 1g --executor-memory 4g --executor-cores 2 --num-executors 200 /airflow/userprofile_country.py {{ ds_nodash }} ', dag=dag, trigger_rule=TriggerRule.ALL_DONE )
該段腳本設置了兩個需要執(zhí)行的task任務(userprofile_gender.py和userprofile_country.py)的實例化。
task直接的調(diào)度依賴關(guān)系可以通過set_upstream、set_downstream命令或符號>> 、<<來建立。
gender_task .set_upstream(country_task) 命令指gender_task 任務將依賴country_task任務;反之同理
gender_task >> country_task 命令指country_task 任務將依賴gender_task 任務先執(zhí)行完,反之同理
2、命令行執(zhí)行
Airflow通過可視化界面的方式實現(xiàn)了調(diào)度管理的界面操作,但在測試腳本或界面操作失敗的時候,可通過命令行的方式調(diào)起任務。下面介紹幾個常用命令
命令1:airflow list_tasksuserprofile
該命令用于查看當前DAG任務下的所有task的列表
其中userprofile是DAGid,加粗的airflow list_tasks是關(guān)鍵字命令
-----------------------------------------------------------------------
命令2:airflow testuserprofile gender_task 20180601
該命令用于單獨執(zhí)行DAG下面的某個task
其中userprofile是DAGid,gender_task是要具體某個taskid,20180601是執(zhí)行日期。加粗部分是關(guān)鍵字命令
-----------------------------------------------------------------------
命令3:airflow backfill -s2018-06-01-e2018-06-02 userprofile
該命令用于調(diào)起整個DAG腳本執(zhí)行
其中2018-06-01是執(zhí)行腳本的開始日期, 2018-06-02是結(jié)束日期,userprofile是DAGid,加粗部分是關(guān)鍵字命令。
-
開源
+關(guān)注
關(guān)注
3文章
3680瀏覽量
43815 -
python
+關(guān)注
關(guān)注
56文章
4827瀏覽量
86707
原文標題:用戶畫像—Airflow作業(yè)調(diào)度(ETL)
文章出處:【微信號:AI_shequ,微信公眾號:人工智能愛好者社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
【美洲認證系列】美國ETL認證介紹
美國ETL認證簡介
UL和ETL的區(qū)別是什么?
BI軟件的ETL用開源的好,還是自研的好?
路燈ETL認證
如何使用openssl制作一個開源C簽名工具?
一種金融系統(tǒng)專用ETL工具的研究與實現(xiàn)
基于元數(shù)據(jù)的ETL工具集成研究
基于數(shù)據(jù)質(zhì)量監(jiān)管的ETL設計
用于數(shù)據(jù)分析的各類主流ETL 工具比較,哪種最適合你
一款用于Windows的開源反rookit (ARK)工具

多數(shù)據(jù)源數(shù)據(jù)轉(zhuǎn)換和同步的ETL工具推薦
上線 Airflow 官方!DolphinDB 帶來數(shù)據(jù)管理新體驗

評論