概要
Apache Flinkは、オープンソースの分散ストリーム処理フレームワークであり、リアルタイムおよびバッチデータ処理のシナリオで広く利用されています。OceanBase Flink DirectLoad コネクタ( flink-connector-oceanbase-directload)は、OceanBaseデータベース専用の高性能データインポートツールであり、OceanBaseデータベースのDirect Load技術を活用して、非常に高いスループットで大量データを高速にデータベースに書き込むことができます。
Direct Loadは、OceanBaseが提供する高性能なデータインポート方式です。従来のSQLステートメントINSERT方式とは異なり、Direct LoadはSQL層の解析およびトランザクション処理プロセスをバイパスし、データを直接ストレージ層に書き込むことで、より高いインポート性能を実現します。この方式は、一度に大量のデータをインポートするシナリオに特に適しています。
DirectLoadコネクタと通常のJDBCコネクタの比較
| 比較項目 | DirectLoadコネクタ | JDBCコネクタ |
|---|---|---|
| 書き込み方式 | ダイレクトロード、ストレージ層への直接書き込み | SQLステートメントINSERTで書き込み |
| スループット | 非常に高く、大規模データの処理に適しています | 相対的に低い |
| 適用シナリオ | バッチデータのインポート、データ移行 | リアルタイムストリーミング書き込み、CDC同期 |
| データストリームタイプ | 有界ストリーム(Bounded)のみサポート | 無限ストリーム(Unbounded)をサポート |
| インポート中のテーブル状態 | 目標テーブルはロックされ、クエリのみサポート | テーブルは正常に読み書き可能です |
| サポートされる操作 | INSERT、UPDATE_AFTER |
INSERT、UPDATE、DELETE |
コアな強み
- 超高スループット:従来のJDBC方式と比較して、性能が数倍、数十倍向上します。
- 並行書き込み:複数ノードによる並行書き込みをサポートし、クラスタリソースを最大限に活用します。
- シンプルで使いやすい:Flink SQLを使用するだけで、複雑な設定は不要です。
- 柔軟な競合処理:主キー競合時の処理戦略として、停止、置換、無視のいずれかを選択できます。
使用方法と制限
DirectLoadコネクタを使用する前に、以下の特性と制限について必ずご確認ください。
サポートされる機能
- バッチデータの書き込み:大規模なデータインポートに最適化されており、数千万件から数億件のデータを高速に書き込むことができます。
- 有界ストリーム処理:ファイルやデータベースのスナップショットなど、有界(Bounded)データソースをサポートしています。
- 複数のインポートモード:
full(全量)、inc(増分)、inc_replace(増分置換)の3つのモードをサポートしています。 - 柔軟な競合解決戦略:主キー重複時の処理として、
STOP_ON_DUP、REPLACE、IGNOREの3つの戦略を提供しています。 - 並行書き込み:Flinkの並列書き込みをサポートし、クラスタリソースを最大限に活用できます。
- 互換性:OceanBaseのMySQLモードとOracleモードの両方をサポートしています。
使用制限
- 有界ストリームのみサポート:データソースは有界ストリーム(Bounded Stream)である必要があります。無限に続く無界ストリーム(Unbounded Stream)の書き込みはサポートしていません。
- インポート中のテーブルロック:データインポート(一括ロード)の実行中はターゲットテーブルがロックされます。この間、
SELECTクエリのみが許可され、INSERT、UPDATE、DELETEなどの書き込み操作は実行できません。 - DELETE操作は未サポート:Flink Table/SQL側では
INSERTとUPDATE_AFTERの変更タイプのみをサポートしており、DELETE、UPDATE_BEFOREはサポートしていません。 - バッチ処理向け:Flink Batchモードでの使用を推奨します。リアルタイムストリーミング書き込みのシナリオには適していません。
注意
リアルタイムストリーミング書き込みや無界ストリーム処理が必要な場合は、標準の flink-connector-oceanbase コネクタを使用してください。これはJDBCベースであり、無界ストリームおよびリアルタイム書き込みをサポートしています。
適用シナリオ
DirectLoadコネクタは、以下のシナリオに適しています。
- データ移行:他のデータベースやデータウェアハウスからOceanBaseデータベースへ一括データを移行する。
- オフラインETL:定期的に一括でデータを処理し、インポートする。例えば、毎日のデータ集計など。
- 大量の履歴データのインポート:一度に大量の履歴データをインポートする。
- データ初期化:新システムに一括でデータを初期化する。
適用されないシナリオ
以下のシナリオでは、DirectLoadコネクタの使用は推奨されません。
- リアルタイムストリーミング書き込み:継続的にデータを書き込むリアルタイムシナリオ。
- CDCデータ同期:データベースの変更をリアルタイムで同期するシナリオ。
- 低遅延要件:書き込み遅延に厳しい要件があるシナリオ。
- 高頻度の小規模なバッチ書き込み:頻繁な小規模なデータ書き込み。
バージョン要件と依存関係
| ソフトウェア | バージョン要件 | 説明 |
|---|---|---|
| Apache Flink | 1.15 以上 | パフォーマンスと安定性向上のため、1.15以降の使用を推奨します。 |
| JDK | 8 以上 | — |
| OceanBase | 以下のいずれかのバージョン範囲を満たす: • 4.3.0 BP1 以降 • 4.2.4 - 4.3.0 • 4.2.1 BP7 - 4.2.2.0 |
MySQL モードと Oracle モードをサポートしています |
| OceanBase (Incremental Mode) | 4.3.2 以上 | インポートモード inc または inc_replace を使用する場合は必要です |
クイックスタート
このセクションでは、DirectLoad コネクタの使用を迅速に開始する方法を説明します。コネクタの取得から初めてのデータインポートを完了するまで、手順をステップバイステップで説明します。
ステップ1:ローカルFlinkクラスタのクイックデプロイ(単一ノード)
Apache Flinkのダウンロード
Flink公式ダウンロードページにアクセスし、Stable Releaseを選択します(推奨バージョンは1.15以降、例:1.18または1.19)。
例(Linux/macOSのターミナルコマンド):
# Flinkのダウンロード(例:1.18.1)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# 解凍
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# ディレクトリに移動
cd flink-1.19.3
# 環境変数の設定(推奨)
export FLINK_HOME=$(pwd)
ローカルFlinkクラスタの起動
Flinkディレクトリで以下のコマンドを実行します:
# クラスタの起動(JobManager + TaskManagerを含む)
$FLINK_HOME/bin/start-cluster.sh
正常に起動すると、以下のような出力が表示されます:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
起動に失敗した場合は、ログを確認します:
# JobManagerのログを確認
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# TaskManagerのログを確認
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Flinkの正常な動作を確認する
ブラウザで http://localhost:8081 にアクセスし、Flink Dashboardが表示されることを確認します。1つのTaskManagerが表示され、利用可能なslotが1以上であることを確認します。
ステップ2:OceanBase DirectLoadコネクタJARのデプロイ
コネクタJARのダウンロード
Maven Centralにアクセス:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/
バージョン(例:1.5.0)を選択し、対応するJARファイルをダウンロードします:
# 例:バージョン1.5.0のダウンロード
wget https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/1.0.0/flink-sql-connector-oceanbase-directload-1.5.0.jar
注意:ファイル名は flink-sql-connector-oceanbase-directload-<version>.jar でなければなりません。これはFlink SQLがコネクタを認識するための鍵です。
JARファイルをFlinkのlib/ディレクトリにコピー
# JARファイルをFlinkのlibディレクトリにコピー
cp flink-sql-connector-oceanbase-directload-1.5.0.jar $FLINK_HOME/lib/
重要:Flinkは起動時にlib/ディレクトリ内のすべてのJARファイルを自動的に読み込みます。そのため、コネクタは自動的に登録されます。
Flinkクラスタの再起動(JARの有効化)
# 停止
$FLINK_HOME/bin/stop-cluster.sh
# 再起動
$FLINK_HOME/bin/start-cluster.sh
ステップ3:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者から、対応するデータベース接続文字列を取得します。例えば:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータの説明:
$host:OceanBaseデータベースへの接続IPアドレス。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式ではODPのアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースへの接続ポート。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセスするデータベースの名前。注意
テナントに接続するユーザーには、データベースに対する
CREATE、INSERT、DROP、およびSELECTの権限が必要です。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式:ユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名。直接接続方式の形式:ユーザー名@テナント名。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
注意
OBServerサーバーに接続する際、sysテナントでシステムビュー DBA_OB_SERVERS をクエリすることで、OBServerのRPCポート番号を取得できます。
ネットワーク接続性を確認する
Flinkジョブを実行する環境がOceanBaseデータベースにアクセスできるようにするには、以下のコマンドを実行します:
テストコマンド:
# RPCポートの接続性をテストする
nc -zv <oceanbase-host> <rpc-port>
ステップ4:OceanBaseデータベースにターゲットテーブルを作成する
例:
-- OceanBaseデータベースに接続する
USE test;
-- ターゲットテーブルを作成する
CREATE TABLE `t_user` (
`id` INT NOT NULL,
`username` VARCHAR(50) DEFAULT NULL,
`age` INT DEFAULT NULL,
`score` DECIMAL(10,2) DEFAULT NULL,
PRIMARY KEY (`id`)
) COMMENT 'ユーザー情報テーブル';
ステップ5:Flink SQL Clientを起動してテストする
# SQL Clientを起動する
$FLINK_HOME/bin/sql-client.sh
正常に起動すると、Flink SQL Clientのインタラクティブなコマンドラインインターフェースが表示されます:
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
ステップ6:最初のFlink SQLサンプル
Flink SQL Clientを起動する
cd $FLINK_HOME
./bin/sql-client.sh
バッチモードに設定する
-- オプション:実行モードをBATCHに設定すると、パフォーマンスが向上します
SET 'execution.runtime-mode' = 'BATCH';
-- オプション:並列度を設定します(データ量とクラスタリソースに応じて調整してください)
SET 'parallelism.default' = '4';
DirectLoad Sinkテーブルを作成する
Flink SQLで対応するターゲットテーブルを作成し、OceanBaseのt_userテーブルにマッピングします。
CREATE TABLE t_user (
id INT,
username STRING,
age INT,
score DECIMAL(10,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = 'xxx.x.x.x', -- OceanBaseのホストアドレス
'port' = 'xxxx', -- RPCポート
'tenant-name' = 'your_tenant-name', -- テナント名
'username' = 'your_username', -- ユーザー名
'password' = 'your_password', -- パスワード
'schema-name' = 'test', -- データベース名
'table-name' = 't_user' -- テーブル名
);
パラメータの説明:
connector:固定値 oceanbase-directloadhostとport:OceanBaseのホストアドレスとRPCポート番号。OBServerサービスに接続する場合は、sysテナントでシステムビューDBA_OB_SERVERSをクエリすることで、OBServerのRPCポート番号を取得できます。tenant-name:テナント名usernameとpassword:データベースのユーザー資格情報(注記:ユーザー名には@tenantのサフィックスは含まれません)schema-nameとtable-name:ターゲットのデータベースとテーブル名
説明
上記は最小パラメータセットを使用しています。詳細なパラメータ設定については、後の章の「-パラメータの設定」を参照してください。
テストデータを挿入する
-- テストデータをいくつか挿入します
INSERT INTO t_user
VALUES
(1, 'Alice', 25, 95.5),
(2, 'Bob', 30, 88.0),
(3, 'Charlie', 28, 92.3),
(4, 'David', 35, 87.8),
(5, 'Eve', 22, 96.0);
実行すると、Flinkはジョブを送信し、データのインポートを開始します。コンソールには、次のようないくつかの出力が表示されます。
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
ジョブの実行について:
- ジョブはバックグラウンドで実行され、Flink Web UI(http://localhost:8081)でジョブの進行状況を確認できます。
- ジョブのステータスは、
INITIALIZING→RUNNING→FINISHEDと変化します。 - DirectLoadの最終的なコミットは、ジョブ終了段階で行われます。データの検証を行う前に、ジョブのステータスが
FINISHEDになるのを必ず待ってください。
データの書き込みを検証する
ジョブが正常に完了したら、OceanBaseでクエリを実行して検証します。
SELECT * FROM test.t_user ORDER BY id;
クエリ結果は次のとおりです。
+----+----------+------+-------+
| id | username | age | score |
+----+----------+------+-------+
| 1 | Alice | 25 | 95.50 |
| 2 | Bob | 30 | 88.00 |
| 3 | Charlie | 28 | 92.30 |
| 4 | David | 35 | 87.80 |
| 5 | Eve | 22 | 96.00 |
+----+----------+------+-------+
5 rows in set
ステップ7:停止とクリーンアップ(オプション)
テストが完了したら、Flink クラスターを停止できます。
# Flink クラスターを停止する
cd $FLINK_HOME
./bin/stop-cluster.sh
SQL Client を終了します。
-- SQL Client で実行
QUIT;
または、直接 Ctrl+D を押して SQL Client を終了できます。
注意
- ジョブの完了を待つ:バッチ読み込みの最終的なコミットはジョブの終了段階で行われるため、ジョブのステータスが
"FINISHED"になるのを待ってからデータを確認してください。 - インポート中のテーブルロック:
INSERTステートメントが実行されている間、ターゲットテーブルt_userはロックされ、この期間中はテーブルのクエリは可能です。ただし、書き込み操作は実行できません。 - バッチモードの使用を推奨:DirectLoad コネクタは有界ストリームのみをサポートしているため、バッチモードを使用し、データソースが有界であることを確認してください。
- エラー処理:ジョブが失敗した場合は、ログ内のエラー情報を確認してください。一般的な問題には、ネットワーク接続の失敗、アカウントの権限不足、パラメータ設定の誤りなどが含まれます。
パラメータの詳細
この節では、DirectLoadコネクタのすべての設定パラメータについて詳しく説明します。*マークが付いているパラメータは必須パラメータです。
接続パラメータ
| パラメータ名 | 必須 | デフォルト値 | タイプ | 説明 |
|---|---|---|---|---|
connector* |
はい | なし | String | コネクタタイプ。固定値:oceanbase-directload |
host* |
はい | なし | String | OceanBaseデータベースのホストアドレス。例:127.0.0.1またはドメイン名 |
port* |
はい | 2882 |
Integer | ダイレクトロードで使用されるRPCポート。デフォルトは 2882 |
tenant-name* |
はい | なし | String | テナント名。例:test |
username* |
はい | なし | String | データベースのユーザー名。例:root。注意:ここには接続文字列形式(例: root@test)ではなく、単純なユーザー名を入力します |
password* |
はい | なし | String | データベースのパスワード |
schema-name* |
はい | なし | String | データベース名(Schema)。例:test |
table-name* |
はい | なし | String | 対象のテーブル名。例:t_user |
インポート動作パラメータ
| パラメータ名 | 必須 | デフォルト値 | タイプ | 説明 |
|---|---|---|---|---|
load-method |
いいえ | full |
String | インポートモード。 指定可能な値: full、inc、inc_replace※詳細は後述の「 load-methodの詳細」を参照してください。 |
dup-action |
いいえ | REPLACE |
String | 主キー重複時の処理戦略。 指定可能な値: - STOP_ON_DUP:重複発生時にインポートを停止- REPLACE:既存のレコードを置換- IGNORE:重複レコードを無視(スキップ)※詳細は後述の「 dup-actionの詳細」を参照してください。 |
max-error-rows |
いいえ | 0 |
Long | 許容する最大エラー行数。このしきい値を超えるとインポートが失敗します。0 の場合、エラーは一切許容されません(即座に失敗します)。 |
パフォーマンスチューニングパラメータ
| パラメータ名 | 必須 | デフォルト値 | タイプ | 説明 |
|---|---|---|---|---|
parallel |
いいえ | 8 |
Integer | サーバー側の並列度。このパラメータは、OceanBaseサーバーが今回のインポートタスクを処理するために使用するCPUリソースの数を決定します。 詳細については、下記の「 parallelの詳細」を参照してください |
buffer-size |
いいえ | 1024 |
Integer | 書き込みバッファのサイズ(単位:行数)。この行数に達すると、1回のフラッシュ書き込みがトリガーされます。 詳細については、下記の「 buffer-sizeのチューニング推奨」を参照してください |
timeout |
いいえ | 7d |
Duration | 単一のダイレクトロードタスクのタイムアウト時間。サポート形式:1d、12h、30m、3600s |
heartbeat-timeout |
いいえ | 60s |
Duration | クライアントのハートビートタイムアウト時間 |
heartbeat-interval |
いいえ | 10s |
Duration | クライアントのハートビート間隔時間 |
主要パラメータの詳細
load-methodの詳細
load-methodパラメータは、バッチインポートのモードを決定します。異なるモードは異なるシナリオに適しています。
full(フルインポート)
- デフォルト値で、空のテーブルまたは少量のデータを含むテーブルに適しています
- インポートされたデータは、major sstableに直接書き込まれます
- カラムストアテーブルのメリット:インポート完了後、データはカラムストア形式で保存されるため、クエリ性能が最適化され、追加のマージは不要です
- 使用シナリオ:
- 初回インポートで空のテーブルにデータを追加する
- ターゲットテーブルのデータ量が非常に小さく、再構築が許容される
- 最適なカラムストアクエリ性能を追求する
-- fullモードの例
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'full',
...
);
inc(通常の増分インポート)
- 空でないテーブルにデータを追加するのに適しています
- 主キーの重複チェックが行われ、重複が発見された場合は、
dup-actionのポリシーに従って処理されます - バージョン要件:OceanBase 4.3.2以降のバージョン
- 制限:
dup-actionがREPLACEの場合には、現在はサポートされていません - カラムストアテーブルの注意点:データはストレージに書き込まれますが、カラムストアは現在サポートされていません。インポート後、クエリ性能は行ストアと同じであり、カラムストアのクエリ性能に達するには一度マージを待つ必要があります
-- incモードの例
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc',
'dup-action' = 'STOP_ON_DUP', -- REPLACEは使用できません
...
);
inc_replace(増分置換インポート)
- 空でないテーブルにデータをインポートし、既存のレコードを上書きする場合に適しています
- 主キーの重複チェックを行わず、既存の主キーのデータを直接上書きします(
REPLACEの効果に相当します) - バージョン要件:OceanBase 4.3.2以降のバージョン
dup-actionパラメータは無視されます(重複チェックを行わないため)- カラムストアテーブルの注意点:
incモードと同じく、カラムストアのクエリ性能に達するにはマージを待つ必要があります
-- inc_replaceモードの例
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc_replace', -- dup-actionパラメータは無効です
...
);
モード選択の推奨
| シナリオ | 推奨モード | 理由 |
|---|---|---|
| 初回インポートで空のテーブルにデータを追加する | full |
性能が最適で、カラムストアテーブルのクエリ効果が最も良好です |
| 空でないテーブルにデータをインポートし、重複が発生した場合に既存のデータを上書きする | inc_replace |
既存のデータを自動的に上書きします |
| 空でないテーブルにデータをインポートし、重複を許容しない | inc + dup-action=STOP_ON_DUP |
データの一貫性を厳密に制御します |
dup-actionの詳細
dup-actionパラメータは、主キーの重複を処理する際に使用されます(load-method=fullまたはincのときのみ有効)。
STOP_ON_DUP(インポートを停止)
- 主キーの重複が発生すると、インポートを即座に停止し、ジョブ全体が失敗します
- 適用シナリオ:
- データの一貫性を厳密に要求する
- 主キーの重複を許容しない
- データの品質問題を迅速に発見する必要がある
'dup-action' = 'STOP_ON_DUP'
REPLACE(置き換え)
- 主キーの重複が発生した場合、新しいレコードで既存のレコードを置き換えます
- 適用シナリオ:
- 既存のデータを上書きすることが許容される
- インポートするデータが最新バージョンである
- 注意:
load-method=incの場合は、現在サポートされていません
'dup-action' = 'REPLACE'
IGNORE(無視)
- 主キーの重複が発生した場合、既存のレコードを保持し、新しいレコードを無視します
- 適用シナリオ:
- 旧データを基準とする
- 増分インポート時に既存のレコードを上書きしないようにする
'dup-action' = 'IGNORE'
parallelの詳細
parallelパラメータはサーバー側の並列度であり、OceanBaseサーバーが今回のインポートタスクを処理するために使用するCPUリソースの量を決定します。
主要な特性
- クライアント側の並行度とは無関係:
parallelはサーバー側のパラメータであり、Flinkの並行度とは独立した概念です。 - テナント設定に制限されます:サーバーはテナントのCPU設定に基づいて並列度の上限を自動的に制限します。クライアント側で設定した値が範囲外であってもエラーは出ません。
- パーティション分布に影響されます:実際の並列度は、テーブルのパーティション分布にも影響されます。
実際の並列度計算ルール
- 単一ノードの並列度上限 =
MIN(テナントCPU数 × 2, parallel設定値) - 実際の総並列度 =
単一ノードの並列度上限 × パーティション分布のノード数
例1:単一ノードのシナリオ
- テナント設定:2C
parallel設定:10- テーブルのパーティションが1つのノードに配置されている
- 実際の並列度 =
MIN(2 × 2, 10) × 1 = 4
例2:複数ノードのシナリオ
- テナント設定:2C
parallel設定:10- テーブルのパーティションが2つのノードに均等に分散されている
- 実際の並列度 =
MIN(2 × 2, 10) × 2 = 8
最適化の推奨事項
- 大規模なデータインポートを行う場合、
parallelを適切に増やすことで、コミットステージの所要時間を大幅に短縮できます。 - テナントのCPU構成に基づいて適切な値を設定することを推奨します。値を過剰に大きくしても効果は限定的であり、逆に小さすぎるとパフォーマンスのボトルネックとなります。
- 一般的に、テナントCPU数の 2~4倍 を設定値の目安とすることが推奨されます。
-- 例:4Cのテナントに推奨される設定
'parallel' = '8' -- または '16'
buffer-sizeの最適化の推奨事項
buffer-sizeはクライアント側の書き込みバッファサイズを表し、単位は行数です。この行数に達すると、1回のフラッシュ書き込みがトリガーされます。
最適化の推奨事項
- データ量が大規模で1行あたりのデータ量が小さい場合:適宜増やす(例:2048、4096)ことで、フラッシュ回数を減らし、スループットを向上させることができます。
- 1行あたりのデータ量が大きい場合:値を大きく設定するとメモリ負荷が高まるため、デフォルト値または適宜小さく設定することを推奨します。
- メモリが不足している場合:
buffer-sizeを減らすことで、TaskManagerのOOMを回避できます。
推奨値
| シナリオ | 推奨値 |
|---|---|
| 1行 < 1KB かつメモリが十分 | 2048 – 4096 |
| 1行 1KB – 10KB | 1024(デフォルト値) |
| 1行 > 10KB またはメモリが不足している場合 | 512 |
-- 例
'buffer-size' = '2048'
使用例
このセクションでは、より完全で実用的な使用例を提供します。さまざまなシナリオと設定をカバーしています。
例1:基本的なバッチインポート
これは最も基本的な使用例であり、初心者が迅速に始めるのに適しています。
-- 1. バッチモードに設定
SET 'execution.runtime-mode' = 'BATCH';
-- 2. Sinkテーブルを作成(最小パラメータセットを使用)
CREATE TABLE orders_sink (
order_id BIGINT,
user_id BIGINT,
product_name STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'test',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'test',
'table-name' = 'orders'
);
-- 3. データソースからインポート(ここでは別のテーブルから読み取ると仮定)
INSERT INTO orders_sink
SELECT order_id, user_id, product_name, amount, order_time
FROM orders_source;
例2:空のテーブルへの全量インポート(full モード)
大量のデータを初めて空のテーブルにインポートするシナリオに適しています。
SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '8'; -- データ量に応じて調整
CREATE TABLE user_profile_sink (
user_id BIGINT,
username STRING,
email STRING,
age INT,
city STRING,
register_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'prod',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'userdb',
'table-name' = 'user_profile',
-- fullモードを使用(空のテーブルに適しています)
'load-method' = 'full',
-- パフォーマンスのチューニング
'parallel' = '16', -- サーバー側の並列度
'buffer-size' = '2048'
);
-- CSVファイルまたは他のデータソースからインポート
INSERT INTO user_profile_sink
SELECT * FROM user_profile_csv_source;
例3:増分置換インポート(inc_replaceモード)
新しいデータで古いデータを上書きする必要があるシナリオに適用されます。
SET 'execution.runtime-mode' = 'BATCH';
CREATE TABLE product_info_sink (
product_id BIGINT,
product_name STRING,
price DECIMAL(10, 2),
stock INT,
update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'ecommerce',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'productdb',
'table-name' = 'product_info',
-- inc_replaceモードを使用(既存のデータを自動的に上書き)
'load-method' = 'inc_replace', -- dup-actionパラメータは無視されます
-- パフォーマンス設定
'parallel' = '16',
'buffer-size' = '2048'
);
-- 更新データをインポート(既存のレコードを自動的に上書き)
INSERT INTO product_info_sink
SELECT * FROM product_info_latest;
例4:Kafkaから有界データを読み取りインポートする
この例では、Kafkaから有界データ(boundedモード)を読み取り、OceanBaseにインポートする方法を示します。
SET 'execution.runtime-mode' = 'BATCH';
-- Kafka Source(有界モード)を作成
CREATE TABLE kafka_source (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-import-group',
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset', -- 有界モード:最新のoffsetまで読み取ったら停止
'format' = 'json'
);
-- OceanBase Sinkを作成
CREATE TABLE event_sink (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'analytics',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'eventdb',
'table-name' = 'events',
'load-method' = 'full',
'parallel' = '16'
);
-- データをインポート
INSERT INTO event_sink
SELECT * FROM kafka_source;
例5:ファイルからデータをインポートする
CSVファイルからOceanBaseにデータを一括インポートします。
SET 'execution.runtime-mode' = 'BATCH';
-- Sourceファイルの作成
CREATE TABLE csv_source (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2)
) WITH (
'connector' = 'filesystem',
'path' = 'file:///data/employees.csv',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'false'
);
-- OceanBase Sinkの作成
CREATE TABLE employee_sink (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '192.168.1.100',
'port' = '2882',
'tenant-name' = 'hrms',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'hr',
'table-name' = 'employees',
'load-method' = 'full',
'parallel' = '8',
'buffer-size' = '2048',
'max-error-rows' = '10' -- 最大10行のエラーを許容
);
-- インポートの実行
INSERT INTO employee_sink
SELECT * FROM csv_source;
DirectLoad コネクタのベストプラクティス
パフォーマンスの最適化
Flinkの並列度を適切に設定する
Flinkの並列度は、並列で実行されるWriterタスクの数を決定します。適切に設定することで、クラスタリソースを最大限に活用できます。
-- データ量とクラスタリソースに基づいて並列度を設定
SET 'parallelism.default' = '8';
parallel パラメータのチューニング(サーバー側の並列度)
parallel パラメータは、コミットステージのパフォーマンスに大きな影響を与えます。
チューニングの戦略:
- 大規模なデータインポートの際、
parallelを適切に増やすことで、コミットステージの所要時間を大幅に短縮できます。 - タイプのCPU数の 2~4倍 に設定することを推奨します。
- サーバー側は自動的に制限するため、設定値が大きすぎても心配ありません。
-- 例:8コアのテナント
'parallel' = '16' -- または '32'
パフォーマンス比較例
parallel |
コミットステージの所要時間 |
|---|---|
| 8(デフォルト) | 約10分 |
| 16 | 約5分 |
| 32 | 約3分 |
buffer-size のチューニング
データの特性に応じてバッファサイズを調整します:
-- 小行データ(1行 < 1KB)
'buffer-size' = '4096'
-- 中程度の行データ(1行 1~10KB)
'buffer-size' = '1024' -- デフォルト値
-- 大行データ(1行 > 10KB)
'buffer-size' = '512'
プロダクション環境での推奨事項
適切な実行時間帯(ウィンドウ)の選択
ダイレクトロード中はテーブルがロックされるため、以下の点に注意してください:
- 業務のオフピーク時間帯を選びます。例えば、深夜や週末など
- 関係者に事前に通知します。他の業務やアプリケーションへの影響を防ぐため、事前に関係者へ通知を行ってください。
- インポート処理が長時間テーブルを占有し続けないよう、適切なタイムアウト時間を設定してください。
-- データ量に似合わせてタイムアウトを2時間に設定
'timeout' = '2h'
シナリオに応じて load-method を選択する
| シナリオ | 推奨 load-method |
理由 |
|---|---|---|
| 初回インポート(空のテーブル) | full |
パフォーマンスが最適で、カラムストアのクエリ性能も最高です |
| 定期的な増分データ追加 | inc |
増分シナリオに適しており、コンフリクトチェックをサポートします |
| 毎日の全量更新(ディメンションテーブル) | inc_replace |
旧データを自動的に上書きするため、手動での削除は不要です |
| 歴史データのリカバリ | full(一時テーブルにインポートし、切り替える) |
在庫テーブルへの影響を回避します |
カラムストアテーブルの使用に関する推奨事項
ターゲットがカラムストアテーブルの場合、異なる load-method の影響に注意する必要があります:
fullモード(推奨):- データを直接カラムストア形式で書き込みます
- インポート完了後のクエリ性能が最適です
- 空のテーブルまたは再構築を許容できるテーブルにのみ適用されます
inc/inc_replaceモード:- データをダンプ形式(行ストア形式)で書き込みます
- インポート後のクエリ性能は行ストアの性能に相当します
- カラムストアの性能に達するには、1回のメジャーコンパクションを待つ必要があります
- 高いクエリ性能を追求する場合は、手動でメジャーコンパクションをトリガーすることを推奨します:
-- OceanBaseで手動でメジャーコンパクションをトリガー
ALTER SYSTEM MAJOR FREEZE;
よくある質問
Q1:インポート中に他の書き込み操作が失敗した場合はどうすればいいですか?
問題の概要:ダイレクトロードを実行している間に、ターゲットテーブルに対する INSERT/UPDATE/DELETE 操作が失敗しました。
原因:これはダイレクトロードの固有の特性です。インポート中はターゲットテーブルがロックされ、SELECT 操作のみが許可されます。
解決策:
ソリューション1:ピーク時外しインポート
- インポートタスクを業務の低需要時間帯(例えば深夜)にスケジュールします
- 事前に業務担当者と連携し、書き込み操作を一時停止する
ソリューション2:中間テーブルを使用する
-- 1. 一時テーブルにインポート CREATE TABLE target_table_tmp LIKE target_table; -- DirectLoad を使用して target_table_tmp にインポート -- 2. テーブル名を切り替える RENAME TABLE target_table TO target_table_old, target_table_tmp TO target_table;- まず一時テーブルにインポートします
- インポート完了後にテーブル切り替えまたはデータマージ操作を行います
ソリューション3:通常のJDBCコンネクタを使用する
- ロックを許容できない場合は、
flink-connector-oceanbaseコネクタに切り替える - 一部の性能を犠牲にすることで、テーブルの可用性を確保する
- ロックを許容できない場合は、
Q2:ジョブが終了しない / データがコミットされない場合はどうすればいいですか?
問題の概要:Flinkジョブが RUNNING 状態のままになり、OceanBaseでデータが確認できません。
原因:DirectLoadコンネクタの最終コミットは、入力の終了(end-of-input)段階で行われます。入力が無限ストリームの場合、ジョブは終了せず、データもコミットされません。
トラブルシューティング手順:
バッチモードが設定されているか確認します:
SET 'execution.runtime-mode' = 'BATCH';データソースが有界ストリームであるか確認します:
- ファイルデータソース:天然に有界
- Kafka:
scan.bounded.modeを設定する必要があります - JDBC:天然に有界
- カスタムソース:end-of-input 信号が正しく送信されているか確認する
Flink Web UIを確認します:
- ジョブの状態を確認する
- Backpressureが発生していないか確認する
- 各オペレーターの処理進捗を確認する
Q3:load-method をどのように選択すればいいですか?
決定木:
最初のインポートで空のテーブルにデータをインポートする?
├─ はい → full モードを使用する(性能が最適で、カラムストアの効果が最も良い)
└─ いいえ(非空のテーブルにインポートする場合)
└─ 既存のレコードを上書きする必要がありますか?
├─ はい → inc_replace モードを使用する
└─ いいえ → inc モードを使用する
└─ 主キーの重複を許可しますか?
├─ いいえ → dup-action = STOP_ON_DUP
├─ 許可する(新しいデータを保持)→ dup-action = REPLACE
└─ 許可する(古いデータを保持)→ dup-action = IGNORE
特殊なシナリオ:
- カラムストアテーブルで最適なクエリ性能を追求する場合:
fullモードを優先的に使用する - OceanBaseのバージョンが4.3.2未満の場合:
fullモードのみ使用可能 - 頻繁な増分インポートが必要な場合:
incまたはinc_replaceモードを使用する
Q4:カラムストアテーブルで inc/inc_replace モードを使用した後、クエリが遅い場合はどうすればいいですか?
問題の概要:inc または inc_replace モードを使用してカラムストアテーブルにデータをインポートした後、クエリのパフォーマンスが期待ほどではありません。
原因:inc/inc_replace モードで書き込まれたデータは、行ストア形式のダンプとして保存されるため、メジャーコンパクションが完了するまでカラムストア形式に変換されません。
解決策:
メジャーコンパクションを手動でトリガーする(推奨):
-- OceanBaseで実行 ALTER SYSTEM MAJOR FREEZE; -- コンパクションの進捗を確認 SELECT * FROM oceanbase.DBA_OB_MAJOR_COMPACTION;自動コンパクションを待つ: OceanBaseは定期的に自動でコンパクションをトリガーしますが、設定によっては比較的長い時間かかる場合があります。
Q5:インポート速度が遅い場合、どう最適化すればいいですか?
原因と対処法:
Flinkの並行度が低すぎる:
SET 'parallelism.default' = '16'; -- 並行度を増加サーバー側の並行度が低すぎる:
'parallel' = '16' -- サーバー側の並行度を増加buffer-sizeが小さすぎる:
'buffer-size' = '2048' -- バッファを増大OceanBaseのリソース不足:
- OceanBaseクラスタの負荷を確認する
- スケールアップや低ピーク時のインポートを検討する