Apache NiFiは、自動化データストリームの処理プラットフォームで、システム間の効率的かつ信頼性の高いデータ転送をサポートします。OceanBaseデータベースのBinlogデータを読み込むことで、データをファイルシステム、メッセージキュー(Kafkaなど)、HTTPエンドポイントなどのターゲットに配信できます。
前提条件
OceanBaseデータベースがデプロイ済みで、MySQLモードのユーザーテナントが作成されていること。ユーザーテナントの作成の詳細については、テナントの作成を参照してください。
- 作成したMySQL互換モードのテナントでBinlogサービスを有効化します。
実行環境は、Java Development Kit (JDK) 1.8以上が必要です。
Apache NiFiインストールパッケージがダウンロード済みであること。
操作手順
ステップ1:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者から、該当するデータベース接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータの説明:
$host:OceanBaseデータベースへの接続IPアドレスを提供します。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式はODPアドレスを使用し、直接接続方式はOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースへの接続ポートを提供します。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセス対象のデータベース名。注意
テナントに接続するユーザーに、データベースに対する
CREATE、INSERT、DROP、およびSELECT権限が付与されていなければなりません。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式:ユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名。直接接続方式の形式:ユーザー名@テナント名。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
ステップ2:NiFi実行環境を設定する
NiFiインストールパッケージをターゲットディレクトリに解凍します。
conf/nifi.properties設定ファイルは、以下のとおり修正します (本記事ではHTTP接続を例として説明します):# HTTPSを無効にしHTTPアクセスを設定する nifi.web.https.host= nifi.web.https.port= nifi.web.http.host=0.0.0.0 nifi.web.http.port=<カスタムポート番号> # 非セキュリティリモート接続を許可する nifi.remote.input.secure=falseMySQLドライバーをNiFiサーバーにアップロードします。
NiFiのインストールディレクトリで、以下のとおりコマンドを実行してサービスを起動します。サービスを停止するには、
bin/nifi.sh stopを実行します。# サービスの起動 bin/nifi.sh start
ステップ3:データフロー処理パイプラインを設定する
ブラウザから
http://<nifi_server_ip>:<port>/nifi/にアクセスして、NiFiコンソールにログインします。CaptureChangeMySQL Processorを設定します。
ページ上部のProcessorアイコンをキャンバスにドラッグし、
CaptureChangeMySQLを検索して選択し、ADD をクリックします。
CaptureChangeMySQL Processorを右クリックして Configure を選択し、PROPERTIES タブで関連するパラメータを設定します。設定が完了したら APPLY をクリックします。
プロパティ名 説明 MySQL Nodes OceanBaseデータベースのアドレスおよびポート。 MySQL Driver Class Name JDBCドライバークラス名は com.mysql.cj.jdbc.Driverです。MySQL Driver Location アップロードされたドライバーJARファイルのディレクトリ。 Username 作成したデータベースユーザーのユーザー名。 Password データベースユーザーのパスワード。 Database/Schema Name Pattern Binlogを読み取る必要があるデータベース(正規表現をサポート)。 Table Name Pattern 読み取る必要があるBinlogテーブル(正規表現をサポート)。 Include DDL Events DDL変更イベントをキャッチするには、 trueに設定する必要があります。
PutFile Processorを設定します。
再びページ上部のProcessorアイコンをキャンバスにドラッグし、
PutFileを検索して選択し、ADD をクリックします。PutFile Processorを右クリックして Configure を選択し、PROPERTIES タブに
Directoryプロパティを入力します。Binlogファイルはこのディレクトリに出力されます。設定が完了したら APPLY をクリックします。
データフロー接続を確立します。
キャンバスに戻り、
CaptureChangeMySQLの矢印をクリックしてPutFileを指し、両者を接続します。表示された Create Connection ポップアップウィンドウで、ADD をクリックします。
キャンバス内で、PutFile Processorを再度右クリックして Configure を選択し、RELATIONSHIPS タブで
failureとsuccessの動作ポリシーを設定します。どちらも terminate を選択します。 設定が完了したら APPLY をクリックします。
データ処理プロセスを開始します。
キャンバスに戻り、CaptureChangeMySQL Processorを右クリックし、Start を選択します。
PutFile Processor上でも同様に右クリックし、Start を選択します。
ステップ4:データ同期を検証する
OceanBaseデータベースにデータを挿入します。
PutFileディレクトリにファイルが生成されていることを確認します。

ファイルの内容を確認することで、実行されたSQLステートメントに関する情報を確認できます。
