Apache Airflowは、バッチ処理向けのワークフローを開発、スケジューリング、監視するためのオープンソースプラットフォームです。Airflowのすべてのワークフローは、Pythonコードで定義できます。Webインターフェースでワークフローの状態を管理できます。
前提条件
- Apache Airflowがインストール済みであること。詳細については、Apache Airflow公式サイトを参照してください。
- OceanBaseデータベースがインストール済みであり、MySQLモードのテナントが作成されていること。
- 接続対象のOBServerノードのIPと、Airflowがインストールされているマシンとの間でネットワーク接続が確立されていることを確認してください。
データベース接続情報の取得
OceanBaseデータベースのデプロイ担当者または管理者に連絡し、対応するデータベース接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータの説明:
$host:OceanBaseデータベースの接続IPです。OceanBase Database Proxy(ODP)接続方式ではODPアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースの接続ポートです。ODP接続方式のデフォルトは2883で、ODPのデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセスするデータベース名です。$user_name:テナントの接続アカウントです。ODP接続の一般的な形式はユーザ名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名です。直接接続方式の形式はユーザー名@テナント名です。$password:アカウントのパスワードです。
注意
テナントに接続するユーザーは、そのデータベースに対するCREATE、INSERT、DROP、SELECT権限を持っている必要があります。
例:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
AirflowでOceanBaseデータソースを追加して接続する
AirflowのWeb UIを開きます。
Admin -> Connectionsに移動します。
「+」記号をクリックして、新しい接続を追加します。
以下のフィールドを入力します。
構成パラメータ 説明 Connection Id ob (任意の識別子で構いません)。 Connection Type MySQL Host 接続文字列の -hパラメータから取得した、OceanBaseデータベースの接続IPアドレスです。例:xxx.xxx.xxx.xxx。Schema 接続文字列の -Dパラメータから取得した、アクセスするデータベース名です。Login 接続文字列の -uパラメータから取得した、アカウント名です。例:test_user001@mysql001。Password 接続文字列の -pパラメータから取得した、アカウントのパスワードです。Port 接続文字列の -Pパラメータから取得した、OceanBaseデータベースの接続ポートです。直接接続方式のデフォルトは2881、ODP経由の接続のデフォルトは2883です。作成が成功すると、Airflowのタスクでこの
obというConnection Idを参照してOceanBaseデータベースにアクセスできるようになります。
Airflowタスクの例
AirflowにOceanBaseデータベースを追加した後、以下の内容を作成することで、AirflowがOceanBaseデータベースからデータを読み込んで出力する処理を実装できます。
Airflowのインストールディレクトリ内のdagsフォルダに、query.pyというファイルを新規作成し、以下の内容を記述します。
from airflow import DAG from airflow.utils.dates import days_ago from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'retries': 0, } def fetch_and_print_data(): hook = MySqlHook(mysql_conn_id='ob') sql = "SELECT * FROM person LIMIT 1;" connection = hook.get_conn() cursor = connection.cursor() cursor.execute(sql) rows = cursor.fetchall() for row in rows: print(row) with DAG( dag_id='sql_query', default_args=default_args, schedule_interval='@daily', start_date=days_ago(1), catchup=False, ) as dag: run_and_print = PythonOperator( task_id='run_and_print', python_callable=fetch_and_print_data, ) run_and_printairflow tasks test sql_query run_and_printを実行すると、personテーブルの最初のデータが出力されます。