Apache Flinkは、分散型で高スループット、高性能なリアルタイムデータ処理シナリオ向けに設計されたオープンソースのストリーム処理フレームワークです。OceanBaseは公式にJDBCベースのFlink Connectorを提供しており、Flinkから標準的なJDBCプロトコルを介してOceanBaseに効率的にデータを書き込むことができます。MySQL互換モード と Oracle互換モード をサポートしており、リアルタイムデータ同期やストリーム処理結果の書き込みなどのシナリオに適しています。
コネクターの特長
**Flink Connector OceanBase(JDBCコネクター)**は、JDBCドライバーを介してデータをOceanBaseに書き込みます。主な特長は以下のとおりです:
- 標準JDBCプロトコル:成熟したJDBC標準プロトコルを使用してOceanBaseと通信するため、互換性が高く、統合しやすいです。
- デュアルモードサポート:OceanBaseのMySQL互換モードとOracle互換モードを同時にサポートします。
- 柔軟なドライバー選択:
- MySQL互換モードでは、MySQL JDBCドライバーまたはOceanBase JDBCドライバーを使用できます。
- Oracle互換モードでは、OceanBase JDBCドライバーのみを使用できます。
- 接続プール管理:Druid接続プールに基づいており、接続の再利用とパラメータ最適化をサポートします。
- バッチ書き込みの最適化:バッファリングとバッチコミットをサポートし、書き込みスループットを向上させます。
適用シナリオ:リアルタイムデータ同期(CDC、ストリーム処理結果の書き込み)。
バージョン要件と依存関係
ソフトウェア |
バージョン要件 |
説明 |
|---|---|---|
| Apache Flink | 1.15 以降のバージョン | より優れたパフォーマンスと安定性を得るには、1.15 以降のバージョンを推奨します |
| JDK | 8 以降のバージョン | — |
| OceanBase | V3.x、V4.x 以降を推奨 | MySQLモードとOracleモードをサポートしています。OceanBaseデータベースがインストール済みで、MySQL/Oracleモードのテナントが作成されていることを確認してください。 |
JDBCドライバーについて
使用するOceanBaseモードに応じて、適切なJDBCドライバーを準備する必要があります。
MySQL互換モード
以下のいずれかのドライバーを使用できます:
MySQL JDBCドライバー(MySQL Connector/J 8.0以降のバージョンを推奨)
- ダウンロードURL:https://dev.mysql.com/downloads/connector/j/
OceanBase JDBCドライバー
- 依存関係名:
com.oceanbase:oceanbase-client:2.4.9 - ダウンロードURL:https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
説明
ダウンロードしたドライバーのJARファイルをFlinkの
libディレクトリに配置する必要があります。具体的な操作手順については、後の章を参照してください。- 依存関係名:
Oracle互換モード(OceanBaseデータベースEnterprise Edition)
OceanBase JDBCドライバーの使用が必須です:
- 依存関係名:
com.oceanbase:oceanbase-client:2.4.9 - ダウンロードURL:https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
手順
ステップ1:ローカルFlinkクラスタの迅速なデプロイ(単一マシン版)
Apache Flinkのダウンロード
Flink公式ダウンロードページにアクセスし、Stable Release(1.15+を推奨、例:1.18または1.19)を選択します。
例(Linux/macOSターミナルコマンド):
# Flinkのダウンロード(1.19.3を例として)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# 解凍
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# ディレクトリに移動
cd flink-1.19.3
# 環境変数の設定(オプションですが推奨されます)
export FLINK_HOME=$(pwd)
ローカルFlinkクラスタの起動
Flinkディレクトリで以下を実行します:
# クラスタの起動(JobManager + TaskManagerを含む)
$FLINK_HOME/bin/start-cluster.sh
起動に成功すると、次のような出力が表示されます:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
起動に失敗した場合は、ログを確認できます:
# JobManagerログの確認
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# TaskManagerログの確認
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Flinkが正常に動作しているか確認する
ブラウザを開いてアクセスします:http://localhost:8081,您可以看到 Flink Dashboardには、1つのTaskManagerが表示され、利用可能なslotsが1以上であることを確認できます。
ステップ2:JDBCコネクタをダウンロードしてデプロイする
JDBCベースのコネクタJARファイル
flink-sql-connector-oceanbase-${version}.jarをダウンロードします:- ダウンロードURL:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase/
- バージョン(例:1.6.0)を選択し、対応するJARファイルをダウンロードします。
注意
ファイル名は
flink-sql-connector-oceanbase-<version>.jarである必要があります。これはFlink SQLがコネクタを認識するための鍵となります。JARファイルをFlinkの
libディレクトリにコピーします:# ダウンロードしたファイルが現在のディレクトリにあることを仮定します。FLINK_HOMEはFlinkのインストールディレクトリです。 cp flink-sql-connector-oceanbase-1.6.0.jar $FLINK_HOME/lib/
ステップ3:JDBCドライバーの準備とデプロイ
使用しているOceanBaseモードに応じて、対応するJDBCドライバーをダウンロードし、配置します。
方法1:MySQL互換モード
以下のいずれかのドライバーを使用できます:
MySQL JDBCドライバー(8.0以降のバージョンを推奨)
- ダウンロードURL:https://dev.mysql.com/downloads/connector/j/
- Platform Independentバージョン(ZIPまたはTAR.GZ)を選択します。
- 解凍後、
mysql-connector-j-x.x.x.jarファイルを見つけます。
OceanBase JDBCドライバー
- ダウンロードURL:https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
- 最新バージョンを選択してJARファイルをダウンロードします。
ダウンロードしたドライバーのJARファイルをFlinkの
libディレクトリに配置します:# 例:MySQL JDBCドライバーをコピーする cp mysql-connector-j-8.0.33.jar $FLINK_HOME/lib/
方法2:Oracle互換モード
OceanBase JDBCドライバーのみを使用する必要があります:
- ダウンロードURL:https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
- 最新バージョン(例:2.4.9)を選択してJARファイルをダウンロードします。
# OceanBase JDBCドライバーをコピーする
cp oceanbase-client-2.4.9.jar $FLINK_HOME/lib/
Flinkクラスターの再起動(JARの有効化)
# まず停止する
$FLINK_HOME/bin/stop-cluster.sh
# その後起動する
$FLINK_HOME/bin/start-cluster.sh
ステップ4:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者からデータベース接続情報を取得し、ログインします。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータ説明:
$host:OceanBaseデータベースの接続IPアドレス。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式ではODPアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースの接続ポート。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセス対象のデータベース名。注意
テナントに接続するユーザーは、データベースに対する
CREATE、INSERT、DROP、およびSELECT権限が付与されていなければなりません。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式はユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名です。直接接続方式の形式はユーザー名@テナント名です。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
ネットワーク接続性を確認する
Flinkジョブを実行する環境がOceanBaseデータベースにアクセスできることを確認します:
テストコマンド:
# RPCポートの接続性をテストする
nc -zv <oceanbase-host> <rpc-port>
-
:接続文字列の $host、すなわちOceanBaseデータベースの接続IPアドレス。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式ではODPアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。 -
:接続文字列の $port、すなわちOceanBaseデータベースの接続ポート。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。
ステップ5:OceanBaseにターゲットテーブルを準備する
OceanBaseデータベースのMySQL互換モードのtestデータベースに、ターゲットテーブルt_sinkを作成します。
USE test;
CREATE TABLE `t_sink`
(
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
ステップ6:Flink SQLクライアントを起動してテストする
SQLクライアントを使用してSQLステートメントを実行し、データを書き込みます。
# Flinkのインストールディレクトリに移動します(まだそのディレクトリにいない場合)
cd $FLINK_HOME
# SQLクライアントを起動します
./bin/sql-client.sh
正常に起動すると、Flink SQLクライアントの対話型コマンドラインインターフェースが表示されます。
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
これでSQLクライアントでSQLステートメントを実行できます。
ステップ7:JDBC接続を設定し、データを書き込む
MySQL互換モード
SQLクライアントでFlinkターゲットテーブルを作成し、OceanBaseのt_sinkテーブルにマッピングします。
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:mysql://127.0.0.1:2881/test', ------例です。実際の環境に応じて正しいURLを入力してください。
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster', ------例です。接続文字列から取得した実際のテナント名を入力してください。
'password' = 'your_password', ------接続文字列から取得した実際のパスワードを入力してください。
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100/',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
設定説明:
url:OceanBaseのJDBC接続アドレス。形式はjdbc:mysql://host:port/databaseです。schema-nameとtable-name:ターゲットデータベースとテーブル名。usernameとpassword:OceanBaseデータベースのユーザー名とパスワード。- その他のパラメータは、パフォーマンスと信頼性の最適化に使用されます。
テーブルの作成が成功したら、テストデータを挿入します。
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
実行が成功すると、次のような出力が表示されます。
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
データ書き込みの検証
OceanBaseデータベースでクエリを実行し、データが正常に書き込まれたかどうかを検証します。
# OceanBase MySQL互換モードに接続します。以下は例です。接続文字列から取得した接続情報を正しく入力してください。
mysql -h127.0.0.1 -P2881 -uroot@test#obcluster -p
-- データのクエリ
USE test;
SELECT * FROM t_sink;
期待される結果:
+----+----------+-------+
| id | username | score |
+----+----------+-------+
| 1 | Tom | 89 |
| 2 | Jerry | 88 |
+----+----------+-------+
2 rows in set
説明
- 主キーの競合により、
id=1のレコードは最新の値(Tom, 89)に更新されます。 - コネクタはJDBCプロトコルを介してOceanBaseと通信し、標準のUPsertセマンティクスを使用します。
Oracle互換モード
OceanBaseデータベースEnterprise EditionのOracle互換モードでは、OceanBase JDBCドライバーに対応するurlとdriver-class-nameを指定する必要があります。
SQL Clientで実行します:
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS', ------例です。実際の環境に合わせて正しいURLを入力してください。
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster', ------例です。接続文字列から取得した実際のテナント名を入力してください。
'password' = 'pswd', ------接続文字列から取得した実際のパスワードを入力してください。
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
Oracleモードの設定ポイント:
url:jdbc:oceanbase://プロトコルを使用します。driver-class-name:com.oceanbase.jdbc.Driverを指定する必要があります。schema-nameとtable-name:Oracleモードでは通常、大文字を使用します。
テストデータを挿入します:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
データ書き込みの検証
# OceanBase Oracle互換モードに接続します。以下は例です。接続文字列から取得した接続情報を正しく入力してください。
obclient -h127.0.0.1 -P2881 -uSYS@test#obcluster -p
-- データのクエリ
SELECT * FROM T_SINK;
説明
OracleモードもUPSERTセマンティクスをサポートしており、主キーが競合する場合は最新の値に自動的に更新されます。
ステップ8:クラスタの停止とクリーンアップ(オプション)
テスト終了後、Flinkクラスタを停止できます。
# Flinkクラスタを停止する
cd $FLINK_HOME
./bin/stop-cluster.sh
SQL Clientでログアウトします。
-- SQL Clientで実行する
QUIT;
JDBCコネクタの設定について
以下はJDBCコネクタの主なパラメータです:
パラメータ名 |
必須 |
デフォルト値 |
型 |
説明 |
|---|---|---|---|---|
url |
はい | - | String | データベースのJDBC URL |
username |
はい | - | String | 接続ユーザー名 |
password |
はい | - | String | 接続パスワード |
schema-name |
はい | - | String | 接続するスキーマ名またはデータベース名 |
table-name |
はい | - | String | テーブル名 |
driver-class-name |
いいえ | com.mysql.cj.jdbc.Driver |
String | JDBCドライバークラス名。 MySQL互換モードでは、 'com.mysql.cj.jdbc.Driver'(デフォルト値)または'com.oceanbase.jdbc.Driver'を使用できます。Oracle互換モードでは、 'com.oceanbase.jdbc.Driver'のみを使用できます |
druid-properties |
いいえ | - | String | Druid接続プールのプロパティ。複数の値はセミコロンで区切ります |
sync-write |
いいえ | false |
Boolean | 同期書き込みを有効にするかどうか。trueに設定すると、バッファを使用せずに直接データベースに書き込みます |
buffer-flush.interval |
いいえ | 1s |
Duration | バッファのリフレッシュ間隔です。'0'に設定すると、定期的なリフレッシュが無効になります |
buffer-flush.buffer-size |
いいえ | 1000 |
Integer | バッファサイズ |
max-retries |
いいえ | 3 |
Integer | 失敗時のリトライ回数 |
memstore-check.enabled |
いいえ | true |
Boolean | メモリチェックを有効にするかどうか |
memstore-check.threshold |
いいえ | 0.9 |
Double | メモリ使用量のしきい値が最大制限値に対する割合 |
memstore-check.interval |
いいえ | 30s |
Duration | メモリ使用量チェックの間隔 |
partition.enabled |
いいえ | false |
Boolean | パーティション計算機能を有効にするかどうか。データをパーティション単位で書き込みます。'sync-write'と'direct-load.enabled'の両方がfalseの場合にのみ有効になります |
説明
このコネクタはJDBCプロトコルに基づき、Druid接続プールを介してデータベース接続を管理します。
適用シナリオ
リアルタイムデータ同期
JDBCコネクタを使用してリアルタイムにデータを書き込み、非同期バッチ書き込みの利点を活かしてスループットを向上させます。JDBCコネクタは組み込みのバッファメカニズムにより、複数のレコードをバッチ処理としてOceanBaseにコミットするため、継続的なリアルタイムデータストリームの書き込みに適しています。
典型的なユースケース:
CDCと組み合わせたデータベース同期 JDBCコネクタは、CDCデータソース(例:MySQL CDC)と組み合わせて使用し、データベース間のリアルタイム同期を実現できます。JDBCコネクタはSink側として機能し、標準JDBCプロトコルを通じてCDCがキャプチャした変更データをOceanBaseに書き込みます。
ストリーム処理結果の書き込み Flinkのストリーム処理結果をリアルタイムにOceanBaseに書き込み、複雑なデータ処理と変換をサポートします。
リアルタイム業務データ同期 業務システムのデータ変更をOceanBaseに継続的に同期し、データウェアハウスのリアルタイム更新やリアルタイムレポート作成などのシナリオに利用します。
Flink Connectorの使用に関するベストプラクティス
パラレル度の調整によるパフォーマンス向上
Flinkタスクのパラレル度を調整することで、書き込みパフォーマンスを大幅に向上させることができます。パラレル度が高いほど、同時にOceanBaseに書き込む接続数が増え、スループットも向上します。
設定方法:Flink SQL Clientでグローバルなパラレル度を設定します:
SET 'parallelism.default' = '8';
パラレル度チューニングの推奨事項:
- データ量とOceanBaseクラスタのパフォーマンスに応じて、適切にパラレル度を設定します。
- OceanBaseの負荷状況を監視し、書き込み時の過度な負荷を避けます。
バッチ書き込みのパフォーマンス最適化
buffer-flush.buffer-size と buffer-flush.interval パラメータを調整して、レイテンシとスループットのバランスを取ります。
buffer-flush.buffer-size:バッファサイズ。デフォルトは1000件です。buffer-flush.interval:バッファのリフレッシュ間隔。デフォルトは1秒です。
高スループットシナリオでは、buffer-size を適宜大きくし、interval を延長することができます。
信頼性の確保
max-retriesパラメータを設定して、失敗時のリトライ回数を制御します(デフォルトは3回)。memstore-check.enabledを有効にして、OceanBaseのメモリオーバーフローを防ぎます(デフォルトで有効)。sync-write=trueを使用すると、データのリアルタイム可視性が保証されますが、パフォーマンスは犠牲になります(各レコードがJDBC経由で直接コミットされるため)。
監視の推奨事項
- Flinkタスクのバックプレッシャー状況を監視し、パフォーマンスボトルネックを早期に発見します。
- OceanBaseの書き込みパフォーマンス指標(TPS、QPS、応答時間など)に注目します。
- 各パラレルタスクのデータ偏りを監視し、ロードバランシングを確保します。