めもめも

データエンジニアリング、機械学習について書いてます

AirflowというかComposerで困ったときのメモ

概要

AirflowのマネージドサービスであるComposerの実装で困ったことと、その解決策をまとめました。 随時更新予定です。

composerのジョブを停止する時の注意点

airflowのwebUIからDAGを停止(Pause)することができますが、DAGを再起動(UnPause)した際に気をつけるべきことがあります。

それは、DAGの停止期間が長かった場合、再起動時にDAGが複数回実行される可能性があることです。 停止期間中に、DAGのscheduled_intervalが複数回経過した場合、その回数分のDAGが溜まります。

再起動時のDAGの実行回数は、DAGのcatchupオプションによって制御可能です。

catchupオプションは、以下のような挙動をします。 - catchup=tureにすると、溜まっているDAGが全て動く - catuchup= falseにすると、溜まっているDAGの内、最も新しいものが1つだけ動く

DAGの実行タイミングはこれがわかりやすい。https://cloud.google.com/composer/docs/triggering-dags#more_scheduling_parameter_examples

composer環境内へのファイルコピーに時間がめっちゃかかる問題

Composer のBashOperatorでGCSバケットにデータをコピーするgsutil cpコマンドを行ったら、転送先が実質的には同じであるにもかかわらず指定方法によってパフォーマンスに大きな差が生まれるという事象に遭遇した。

具体的には、以下のような結果になった、 gsutil cp gs://ソースGCSバケット/ソースファイル /home/airflow/gcs/dataの場合、7m51s gsutil cp gs://ソースGCSバケット/ソースファイル gs://バケット名/data の場合、13s

つまり、Composer内にデータをコピーする必要がある場合は、パスではなく、gsutil URIで指定するべきである。

DAGの追加は、「gcloud composer」と「gsutil cp」どっちが良いか

composerの環境を作ると、DAG用の/DAGというGCSバケットが自動で作成されます。 このGCSバケットに、DAGを定義したpythonファイルをアップロードすることで、DAGがcomposerに登録されます。

composerには、DAGを登録するための「gcloud composer environments storage dags import」という専用コマンドが用意されています。 また、GCSバケットにアップロードできれば良いので「gsutil cp」でも対応可能であると公式ドキュメントに記載されています。

どちらでも良いなら、処理の内容が明確になるので、「gcloud composer environments storage dags import」コマンドを使うべきでしょう。

composer自体にデータ加工の処理を実行させても良いか?

結論としては、composerはあくまでもワークフロー管理ツールなので、ETL機能を持たせるべきではないと思います。ETL処理にコンピューティングリソースを取られ、本来のワークフロー管理にリソースが割けない可能性があるからです。

裏返すと、コンピューティングリソースを消費しないような小規模なデータ加工ならcomposerで実行するのはありだと思います。 私の場合だと、文字コードの変換処理(iconvコマンド)をBashOperatorで実装しました。

タスク間のデータの受け渡しはどうやるの?

タスク間でデータを共有するための仕組みとして、xcom(cross comunicationの意味)という仕組みがあります。

しかし、公式ドキュメントによると、xcomの最大サイズは49KBまでのようです。

つまり、xcomは、サイズの小さなメタデータなどの受け渡しには使えるが、BigQueryから取得した大きめのテーブルデータを渡すなどの要件に適していないということです。

大きいサイズのデータをタスク間で受け渡しするには、GCSを経由するか、そもそも大きいサイズのデータの処理をComposerでやるべきなのか考えると良いでしょう。