Apache NiFiは、システム間で効率的かつ信頼性の高いデータ伝送を実現するための自動化されたデータストリーム処理プラットフォームです。OceanBaseデータベースのBinlogデータを読み取り、データをファイルシステム、メッセージキュー(Kafkaなど)、またはHTTPエンドポイントなどのターゲットに配信できます。
前提条件
OceanBaseデータベースのデプロイが完了し、MySQLモードのユーザーテナントが作成されていること。テナント作成の詳細については、テナントの作成を参照してください。
- 作成したMySQL互換モードのテナントでBinlogサービスが有効になっていること。詳細については、OceanBase Binlogサービスを参照してください。
お使いの実行環境がJava Development Kit (JDK) 1.8以上を満たしていること。
Apache NiFiインストールパッケージをダウンロード済みであること。
手順
ステップ1:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者から、該当するデータベース接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータ説明:
$host:OceanBaseデータベースへの接続IPアドレス。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式ではODPアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースへの接続ポート。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセス対象のデータベース名。注意
テナントに接続するユーザーは、データベースに対する
CREATE、INSERT、DROP、およびSELECT権限が付与されていなければなりません。その他のユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式:ユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名。直接接続方式の形式:ユーザー名@テナント名。$password:アカウントのパスワード。
その他の接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
ステップ2:NiFi実行環境を構成する
NiFiインストールパッケージをターゲットディレクトリに解凍します。
以下の方法で
conf/nifi.properties設定ファイルを変更します(本記事ではHTTP接続を例とします):# HTTPSを無効化し、HTTPアクセスを設定する nifi.web.https.host= nifi.web.https.port= nifi.web.http.host=0.0.0.0 nifi.web.http.port=<カスタムポート番号> # 非セキュアなリモート接続を許可する nifi.remote.input.secure=falseMySQLドライバをNiFiサーバーにアップロードします。
NiFiインストールディレクトリで以下のコマンドを実行してサービスを起動します。サービスを停止する場合は、
bin/nifi.sh stopを実行します。# サービスを起動する bin/nifi.sh start
ステップ3:データストリーム処理パイプラインの設定
ブラウザで
http://<nifi_server_ip>:<port>/nifi/にアクセスして、NiFiコンソールに入ります。CaptureChangeMySQL Processorを設定します。
ページ上部のProcessorアイコンをキャンバスにドラッグし、
CaptureChangeMySQLを検索して選択してから、ADD をクリックします。
CaptureChangeMySQL Processorを右クリックして Configure を選択し、PROPERTIES タブで関連パラメータを設定した後、APPLY をクリックします。
プロパティ名説明MySQL Nodes OceanBaseデータベースのアドレスとポート。 MySQL Driver Class Name JDBCドライバークラス名。 com.mysql.cj.jdbc.Driverを指定します。MySQL Driver Location アップロードしたドライバーJARファイルがあるディレクトリです。 Username 作成したデータベースユーザーのユーザー名。 Password データベースユーザーのパスワード。 Database/Schema Name Pattern Binlogを読み取るデータベース(正規表現をサポートします)。 Table Name Pattern Binlogを読み取るテーブル(正規表現をサポートします)。 Include DDL Events DDL変更イベントをキャプチャするには、 trueに設定する必要があります。
PutFile Processorを設定します。
再度ページ上部のProcessorアイコンをキャンバスにドラッグし、
PutFileを検索して選択してから、ADD をクリックします。PutFile Processorを右クリックして Configure を選択し、PROPERTIES タブで
Directoryプロパティを入力します。Binlogファイルはこのディレクトリに出力されます。完了後、APPLY をクリックします。
データストリーム接続を確立します。
キャンバスに戻り、
CaptureChangeMySQLの矢印がPutFileを指すようにクリックして、両者を接続します。表示された Create Connection ポップアップウィンドウで、ADD をクリックします。
キャンバスで、PutFile Processorを右クリックして Configure を選択し、RELATIONSHIPS タブで
failureとsuccessの動作ポリシーを設定します。どちらも terminate を選択します。完了後、APPLY をクリックします。
データ処理フローを開始します。
キャンバスに戻り、CaptureChangeMySQL Processorを右クリックして Start を選択します。
PutFile Processorでも同様に右クリックして Start を選択します。
ステップ4:データ同期を検証する
OceanBaseデータベースにデータを挿入します。
PutFileディレクトリにファイルが生成されているか確認します。

ファイルの内容を確認すると、実行されたSQLステートメントの関連情報を見ることができます。
