概要
Apache Flinkは、オープンソースの分散ストリーム処理フレームワークであり、リアルタイムおよびバッチデータ処理シナリオで広く利用されています。OceanBase Flink DirectLoadコネクタ(flink-connector-oceanbase-directloadは、OceanBaseデータベース専用に設計された高性能なデータインポートツールです。OceanBaseデータベースのダイレクトロード技術を活用し、極めて高いスループットで大量のデータを迅速にデータベースに書き込むことができます。
ダイレクトロードは、OceanBaseが提供する高性能なデータインポート方法です。従来のSQL文INSERT方式とは異なり、ダイレクトロードはSQL層の解析やトランザクション処理のプロセスをバイパスし、データを直接ストレージ層に書き込むことで、より高いインポート性能を実現します。この方法は、一度に大量のデータをインポートする必要があるシナリオに特に適しています。
DirectLoadコネクタと通常のJDBCコネクタの比較
比較項目 |
DirectLoadコネクタ |
JDBCコネクタ |
|---|---|---|
| 書き込み方式 | ダイレクトロードによるストレージ層への直接書き込み | SQLステートメント INSERT による書き込み |
| スループット | 非常に高く、大量データに適しています | 相対的に低い |
| 適用シナリオ | バッチデータインポート、データ移行 | リアルタイムストリーミング書き込み、CDC同期 |
| データストリームタイプ | 界付きストリーム(Bounded)のみサポート | 無限ストリーム(Unbounded)をサポート |
| インポート中のテーブル状態 | ターゲットテーブルがロックされ、クエリのみサポート | テーブルは通常の読み書き可能 |
| サポート操作 | INSERT、UPDATE_AFTER |
INSERT、UPDATE、DELETE |
主な利点
- 超高スループット:従来のJDBC方式と比較して、パフォーマンスが数倍から数十倍向上します。
- 並列書き込み:複数ノードでの並列書き込みをサポートし、クラスタリソースを最大限に活用します。
- シンプルで使いやすい:Flink SQLを通じて利用でき、複雑な設定は不要です。
- 柔軟な競合処理:さまざまな主キー競合処理戦略(停止、置換、無視)をサポートします。
使用方法と制限
DirectLoadコネクタを使用する前に、以下の特性と制限を必ず理解しておいてください。
サポートされる機能
- バッチデータ書き込み:大量データのインポートに特化しており、数千万~数億レベルのデータの高速な書き込みをサポートします。
- 有界ストリーム処理:ファイルやデータベーススナップショットなどの有界データソースをサポートします。
- 複数のインポートモード:
full(フル)、inc(増分)、inc_replace(増分置換)をサポートします。 - 柔軟な競合戦略:主キー競合処理戦略として
STOP_ON_DUP、REPLACE、IGNOREの3種類を提供します。 - 並列書き込み:Flinkの高並列度書き込みをサポートし、クラスタリソースを最大限活用します。
- 互換性:OceanBaseのMySQLモードとOracleモードをサポートします。
使用上の制限
- 有界ストリームのみサポート:データソースは有界(Bounded Stream)である必要があり、連続する無界ストリームの書き込みはサポートされません。
- インポート中のテーブルロック:ダイレクトロード実行中、ターゲットテーブルはロックされます。この時点では
SELECTクエリ操作のみ許可され、そのテーブルに対するINSERT、UPDATE、DELETEなどの書き込み操作はできません。 - DELETE操作はサポートされていない:FlinkのTable/SQL側は
INSERTとUPDATE_AFTERの変更タイプの書き込みのみをサポートし、DELETE、UPDATE_BEFOREはサポートされていません。 - バッチ処理シナリオ:Flinkバッチモードの使用を推奨します。リアルタイムストリーミング書き込みシナリオには適していません。
注意
リアルタイムストリーミング書き込みや無界データストリームの処理が必要な場合は、プロジェクト内の flink-connector-oceanbase コネクタを使用してください。これはJDBCに基づいて実装されており、無界ストリームとリアルタイム書き込みをサポートします。
適用シナリオ
DirectLoadコネクタは、以下のシナリオに適しています:
- データ移行:他のデータベースやデータウェアハウスからOceanBaseデータベースへの一括データ移行。
- オフラインETL:日次データ集計など、定期的なデータのバッチ処理とインポート。
- 履歴データのインポート:一度に大量の履歴データをインポートする場合。
- データ初期化:新しいシステムへのデータの一括初期化。
適用外シナリオ
以下のシナリオでは、DirectLoadコネクタの使用は推奨されません:
- リアルタイムストリーミング書き込み:継続的にデータを書き込む必要があるリアルタイムシナリオ。
- CDCデータ同期:データベース変更をリアルタイムで同期する必要があるシナリオ。
- 低遅延要件:書き込み遅延に対する厳格な要件があるシナリオ。
- 高頻度・小バッチ書き込み:頻繁な小バッチデータ書き込み。
バージョン要件と依存関係
ソフトウェア |
バージョン要件 |
説明 |
|---|---|---|
| Apache Flink | 1.15 以降のバージョン | より優れたパフォーマンスと安定性を得るには、1.15 以降のバージョンを推奨します |
| JDK | 8 以降のバージョン | — |
| OceanBase | いずれかの以下のバージョン範囲を満たすこと: • 4.3.0 BP1 以降 • 4.2.4 - 4.3.0 • 4.2.1 BP7 - 4.2.2.0 |
MySQLモードとOracleモードをサポートしています |
| OceanBase (増分モード) | 4.3.2 以降のバージョン | inc または inc_replace インポートモードを使用する場合が必要です |
クイックスタート
このセクションでは、DirectLoadコネクタの使用方法を手早く習得できるように、コネクタの取得から初回データインポートの完了までの手順を説明します。
ステップ1:ローカルFlinkクラスタのクイックデプロイ(単一マシン版)
Apache Flinkのダウンロード
Flink公式ダウンロードページにアクセスし、Stable Releaseを選択します(1.15+を推奨、例:1.18または1.19)。
例(Linux/macOSターミナルコマンド):
# Flinkのダウンロード(1.18.1を例として)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# 解凍
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# ディレクトリに移動
cd flink-1.19.3
# 環境変数の設定(オプションですが推奨されます)
export FLINK_HOME=$(pwd)
ローカルFlinkクラスタの起動
Flinkディレクトリで以下を実行します:
# クラスタの起動(JobManager + TaskManagerを含む)
$FLINK_HOME/bin/start-cluster.sh
起動に成功すると、次のような出力が表示されます:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
起動に失敗した場合は、ログを確認できます:
# JobManagerログの確認
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# TaskManagerログの確認
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Flinkが正常に稼働しているか確認する
ブラウザを開いてアクセスします:http://localhost:8081, Flink Dashboardが表示され、1つのTaskManagerと利用可能なスロット数が1以上であることが確認できます。
ステップ2:OceanBase DirectLoadコネクタJARのデプロイ
コネクタJARのダウンロード
Maven Centralにアクセスします:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/
バージョン(例:1.5.0)を選択し、対応するJARファイルをダウンロードします:
# 例:1.5.0バージョンをダウンロード
wget https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/1.0.0/flink-sql-connector-oceanbase-directload-1.5.0.jar
注意:ファイル名は flink-sql-connector-oceanbase-directload-<version>.jar である必要があります。これはFlink SQLがコネクタを認識するための鍵となります。
JARをFlinkのlib/ディレクトリに配置
# JARをFlink libディレクトリにコピー
cp flink-sql-connector-oceanbase-directload-1.5.0.jar $FLINK_HOME/lib/
重要なヒント:Flinkは起動時にlib/ディレクトリ内のすべてのJARパッケージを自動的に読み込むため、コネクタは自動的に登録されます。
Flinkクラスタの再起動(JARの有効化)
# まず停止
$FLINK_HOME/bin/stop-cluster.sh
# その後起動
$FLINK_HOME/bin/start-cluster.sh
ステップ3:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者から、該当するデータベース接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータ説明:
$host:OceanBaseデータベースへの接続IPアドレス。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式ではODPアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースへの接続ポート。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセス対象のデータベース名。注意
テナントに接続するユーザーは、データベースに対する
CREATE、INSERT、DROP、およびSELECT権限が付与されていなければなりません。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式:ユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名。直接接続方式の形式:ユーザー名@テナント名。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
注意
OBServerサーバーに接続する際は、sysテナントでシステムビュー DBA_OB_SERVERS をクエリすることで、OBServerのRPCポート番号を取得できます。
ネットワーク接続性の確認
Flinkジョブを実行する環境がOceanBaseデータベースにアクセスできることを確認します:
テストコマンド:
# RPCポートの接続性をテストする
nc -zv <oceanbase-host> <rpc-port>
ステップ4:OceanBaseにターゲットテーブルを作成する
例:
-- OceanBaseデータベースに接続
USE test;
-- ターゲットテーブルを作成
CREATE TABLE `t_user` (
`id` INT NOT NULL,
`username` VARCHAR(50) DEFAULT NULL,
`age` INT DEFAULT NULL,
`score` DECIMAL(10,2) DEFAULT NULL,
PRIMARY KEY (`id`)
) COMMENT 'ユーザー情報テーブル';
ステップ5:Flink SQLクライアントを起動してテストする
# SQLクライアントを起動する
$FLINK_HOME/bin/sql-client.sh
正常に起動すると、Flink SQLクライアントの対話型コマンドラインインターフェースが表示されます。
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
ステップ6:最初のFlink SQLインデックス
Flink SQLクライアントの起動
cd $FLINK_HOME
./bin/sql-client.sh
Batchモードへの設定
-- オプション:実行モードをBATCHに設定することで、パフォーマンスが向上します。
SET 'execution.runtime-mode' = 'BATCH';
-- オプション:並列度を設定します(データ量とクラスタリソースに応じて調整してください)。
SET 'parallelism.default' = '4';
DirectLoad Sinkテーブルの作成
Flink SQLで対応するターゲットテーブルを作成し、OceanBaseのt_userテーブルにマッピングします。
CREATE TABLE t_user (
id INT,
username STRING,
age INT,
score DECIMAL(10,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = 'xxx.x.x.x', -- OceanBaseホストアドレス
'port' = 'xxxx', -- RPCポート
'tenant-name' = 'your_tenant-name', -- テナント名
'username' = 'your_username', -- ユーザー名
'password' = 'your_password', -- パスワード
'schema-name' = 'test', -- データベース名
'table-name' = 't_user' -- テーブル名
);
パラメータ説明:
connector:固定値 oceanbase-directloadhostとport:OceanBaseのホストアドレスとRPCポート番号。OBServerサーバー側に接続する際は、sysテナントでシステムビューDBA_OB_SERVERSをクエリすることでOBServerのRPCポート番号を取得できます。tenant-name:テナント名usernameとpassword:データベースユーザー認証情報(注意:ユーザー名には @tenant のサフィックスが含まれません)schema-nameとtable-name:ターゲットデータベースとテーブル名
説明
上記は最小限のパラメータセットです。より詳細なパラメータ設定については、後続の章「-パラメータの詳細な設定」を参照してください。
テストデータの挿入
-- いくつかのテストデータを挿入します
INSERT INTO t_user
VALUES
(1, 'Alice', 25, 95.5),
(2, 'Bob', 30, 88.0),
(3, 'Charlie', 28, 92.3),
(4, 'David', 35, 87.8),
(5, 'Eve', 22, 96.0);
実行後、Flinkはジョブを送信し、データのインポートを開始します。コンソールには次のような出力が表示されます:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
ジョブの実行について:
- ジョブはバックグラウンドで実行されます。Flink Web UI(http://localhost:8081)でジョブの進捗状況を確認できます。
- ジョブの状態は以下のように変化します:
INITIALIZING→RUNNING→FINISHED。 - DirectLoadの最終的な送信はジョブ終了時に行われます。データを検証する前に、必ずジョブの状態が
FINISHEDになるのを待ってください。
データ書き込みの検証
ジョブが正常に完了した後、OceanBaseでクエリを実行して検証します。
SELECT * FROM test.t_user ORDER BY id;
クエリ結果は次のとおりです。
+----+----------+------+-------+
| id | username | age | score |
+----+----------+------+-------+
| 1 | Alice | 25 | 95.50 |
| 2 | Bob | 30 | 88.00 |
| 3 | Charlie | 28 | 92.30 |
| 4 | David | 35 | 87.80 |
| 5 | Eve | 22 | 96.00 |
+----+----------+------+-------+
5 rows in set
ステップ7:クラスタの停止とクリーンアップ(オプション)
テスト終了後、Flinkクラスタを停止できます。
# Flinkクラスタを停止する
cd $FLINK_HOME
./bin/stop-cluster.sh
SQL Clientでログアウトします。
-- SQL Clientで実行する
QUIT;
または、Ctrl+Dキーを押してSQL Clientから直接ログアウトすることもできます。
重要なヒント
- ジョブ完了の待機:ダイレクトロードの最終コミットはジョブ終了段階で行われます。データの検証を行う前に、ジョブの状態が
"FINISHED"に変わるまで必ず待ってください。 - インポート中のテーブルロック:
INSERTステートメントの実行中、ターゲットテーブルt_userはロックされます。この間は、そのテーブルに対するクエリのみ可能で、書き込み操作はできません。 - Batchモードの使用を推奨:DirectLoadコネクタは有界ストリームのみをサポートしています。Batchモードの使用を推奨し、データソースが有界であることを確認してください。
- エラー処理:ジョブが失敗した場合は、ログ内のエラー情報を確認してください。一般的な問題には、ネットワーク接続失敗、アカウント権限不足、パラメータ設定エラーなどが含まれます。
設定パラメータの詳細
このセクションでは、DirectLoadコネクタのすべての設定パラメータについて詳しく説明します。* マークが付いているパラメータは 必須 です。
接続パラメータ
パラメータ名 |
必須 |
デフォルト値 |
型 |
説明 |
|---|---|---|---|---|
connector* |
はい | なし | String | コネクタタイプ。固定値:oceanbase-directload |
host* |
はい | なし | String | OceanBaseデータベースのホストアドレス。例:127.0.0.1またはドメイン名 |
port* |
はい | 2882 |
Integer | ダイレクトロードで使用するRPCポート。デフォルトは2882 |
tenant-name* |
はい | なし | String | テナント名。例:test |
username* |
はい | なし | String | データベースのユーザー名。例:root。注意:ここに入力するのは純粋なユーザー名であり、接続文字列形式(例: root@test)ではありません |
password* |
はい | なし | String | データベースのパスワード |
schema-name* |
はい | なし | String | データベース名(スキーマ)。例:test |
table-name* |
はい | なし | String | ターゲットテーブル名。例:t_user |
インポート動作パラメータ
パラメータ名 |
必須 |
デフォルト値 |
型 |
説明 |
|---|---|---|---|---|
load-method |
いいえ | full |
String | インポートモード。サポートされているモードは以下のとおりです:full、inc、inc_replace詳細については、「 load-method の詳細」を参照してください。 |
dup-action |
いいえ | REPLACE |
String | 主キー競合処理ポリシー。サポートされているオプションは以下のとおりです: - STOP_ON_DUP:競合が発生するとインポートを直ちに停止します- REPLACE:既存のレコードを置き換えます- IGNORE:競合レコードを無視します詳細については、「 dup-action の詳細」を参照してください。 |
max-error-rows |
いいえ | 0 |
Long | 容認可能な最大誤り行数。このしきい値を超えた場合、インポートは失敗します。0 に設定すると、いかなるエラーも容認しません。 |
パフォーマンスチューニングパラメータ
パラメータ名 |
必須 |
デフォルト値 |
型 |
説明 |
|---|---|---|---|---|
parallel |
いいえ | 8 |
Integer | サーバー側の並列度。このパラメータは、OceanBaseサーバーがこのインポートタスクを処理するために使用するCPUリソース量を決定します。 詳細は後述の「 parallel の詳細」を参照してください |
buffer-size |
いいえ | 1024 |
Integer | 書き込みバッファサイズ(単位:行数)。この行数に達すると、1回のフラッシュ書き込みがトリガーされます。 詳細は後述の「 buffer-size のチューニング推奨事項」を参照してください |
timeout |
いいえ | 7d |
Duration | 単一のダイレクトロードタスクのタイムアウト時間。サポート形式:1d、12h、30m、3600s |
heartbeat-timeout |
いいえ | 60s |
Duration | クライアントハートビートのタイムアウト時間 |
heartbeat-interval |
いいえ | 10s |
Duration | クライアントハートビートの間隔時間 |
主要パラメータの詳細説明
load-method の詳細
load-method パラメータは、ダイレクトロードのモードを決定します。異なるモードは異なるシナリオに適用されます。
full(フルインポート)
- デフォルト値、空のテーブルやデータが少ないテーブルに適しています。
- インポートされたデータは直接メジャースタテブルに書き込まれます。
- カラムストアテーブルの利点:インポート完了後、データはすぐにカラムストア形式となり、クエリパフォーマンスが最適化され、追加のメジャーコンパクションは不要です。
- 使用シナリオ:
- 初めて空のテーブルにデータをインポートする場合
- ターゲットテーブルのデータ量が非常に少なく、再構築を許容できる場合
- 最適なカラムストアクエリパフォーマンスを追求する場合
-- fullモードの例
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'full',
...
);
inc(通常の増分インポート)
- 空ではないテーブルにデータを追加する場合に適しています。
- 主キー競合チェックを実行し、競合が検出された場合は
dup-action戦略に従って処理します。 - バージョン要件:OceanBase 4.3.2以降のバージョン
- 制限:
dup-actionをREPLACEに設定することは現在サポートされていません。 - カラムストアテーブルへの注意:データ書き込みはダンプに転送されます(カラムストアは現在ダンプをサポートしていないため)、インポート後のクエリパフォーマンスは行ストアのパフォーマンスとなります。カラムストアクエリパフォーマンスを達成するには、一度のメジャーコンパクションを待つ必要があります。
-- incモードの例
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc',
'dup-action' = 'STOP_ON_DUP', -- REPLACEは使用できません
...
);
inc_replace(増分置換インポート)
- 空ではないテーブルにデータをインポートし、既存のレコードを上書きする必要がある場合に適しています。
- 主キー競合チェックを行わず、既存の主キーを持つデータを直接上書きします(
REPLACEの効果と同じ)。 - バージョン要件:OceanBase 4.3.2以降のバージョン
dup-actionパラメータは無視されます(競合チェックを行わないため)。- カラムストアテーブルへの注意:
incモードと同様、メジャーコンパクション後にカラムストアクエリパフォーマンスが得られます。
-- inc_replaceモードの例
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc_replace', -- dup-actionパラメータは無効です
...
);
モード選択の推奨事項
シナリオ |
推奨モード |
理由 |
|---|---|---|
| 空のテーブルへの初回インポート | full |
パフォーマンスが最適で、カラムストアテーブルのクエリ効果が最良 |
| 空ではないテーブルへのインポート。競合が発生する可能性があり、上書きが必要な場合 | inc_replace |
古いデータを自動的に上書きします |
| 空ではないテーブルへのインポート。競合は許可されない場合 | inc + dup-action=STOP_ON_DUP |
データの一貫性を厳密に制御します |
dup-action の詳細
dup-action パラメータは、主キー競合が発生した場合の処理方法を指定します(load-method=full または inc の場合にのみ有効)。
STOP_ON_DUP(インポートの停止)
- 主キー競合が発生すると、インポートを直ちに停止し、ジョブ全体が失敗します。
- 適用シナリオ:
- データの一貫性に対する要求が厳しい場合
- 主キー競合を許容できない場合
- データ品質の問題を迅速に検出する必要がある場合
'dup-action' = 'STOP_ON_DUP'
REPLACE(置換)
- 主キー競合が発生すると、古いレコードを新しいレコードで置き換えます。
- 適用シナリオ:
- 既存のデータを上書きすることを許可する場合
- インポートするのが最新バージョンのデータの場合
- 注意:
load-method=incの場合は現在サポートされていません。
'dup-action' = 'REPLACE'
IGNORE(無視)
- 主キー競合が発生すると、古いレコードを保持し、新しいレコードを無視します。
- 適用シナリオ:
- 古いデータを優先する場合
- 増分インポート時に既存のレコードを上書きしないようにする場合
'dup-action' = 'IGNORE'
parallel の詳細
parallel パラメータはサーバー側の並列度であり、OceanBase サーバーがこのインポートタスクの処理にどれだけの CPU リソースを使用するかを決定します。
重要な特性
- クライアントの並行性とは無関係:
parallelはサーバー側のパラメータであり、Flink の並列度とは独立した概念です。 - テナント設定による制限を受ける:サーバー側はテナントの CPU 設定に基づいて並列度の上限を自動的に制限します。クライアント側で設定した値がこの範囲を超えてもエラーは発生しません。
- パーティションの分散状況の影響を受ける:実際の並列度は、テーブルのパーティションの分散状況の影響も受けます。
実際の並列度の計算ルール
- 単一ノードでの並列度の上限 =
MIN(テナントCPU数 × 2, parallel 設定値) - 実際の総並列度 =
単一ノードでの並列度の上限 × パーティションが分散しているノード数
例1:単一ノードの場合
- テナント設定:2C
parallel設定:10- テーブルのパーティションが1つのノードにある
- 実際の並列度 =
MIN(2 × 2, 10) × 1 = 4
例2:複数ノードの場合
- テナント設定:2C
parallel設定:10- テーブルのパーティションが2つのノードに均等に分散している
- 実際の並列度 =
MIN(2 × 2, 10) × 2 = 8
チューニングの推奨事項
- データ量が多いインポートでは、適切に
parallelを大きく設定することで、コミット段階の時間を大幅に短縮できます。 - テナントの CPU 設定に応じた妥当な値を設定することを推奨します。値が大きすぎると効果がなく、小さすぎるとパフォーマンスに影響します。
- 一般的に、テナントの CPU 数の 2~4倍 を設定するのが合理的なスタート点です。
-- 例:4C テナントの推奨設定
'parallel' = '8' -- または '16'
buffer-size のチューニング推奨事項
buffer-size はクライアントの書き込みバッファサイズを表し、単位は行数です。この行数に達すると、1回のフラッシュ(flush)がトリガーされ、データが書き込まれます。
チューニング推奨事項
- データ量が多く、各行のサイズが小さい場合:適宜大きく設定することができます(例:2048、4096)。これにより、フラッシュの回数を減らし、スループットを向上させることができます。
- 各行のデータサイズが大きい場合:メモリ負荷を防ぐため、値を大きく設定しすぎないように注意してください。デフォルト値のままにするか、適宜小さく設定することを推奨します。
- メモリが逼迫している場合:
buffer-sizeを小さくして、TaskManager の OOM を回避します。
推奨値
シナリオ |
推奨値 |
|---|---|
| 単一行 < 1KB、メモリ十分な場合 | 2048 – 4096 |
| 単一行 1KB – 10KB | 1024(デフォルト値) |
| 単一行 > 10KB または メモリ不足の場合 | 512 |
-- 例
'buffer-size' = '2048'
使用例
このセクションでは、さまざまなシナリオと設定を網羅した、より完全で実用的な使用例を提供します。
例1:基本的なバッチインポート
これは最も基本的な使用例で、初心者がすぐに始めるのに適しています。
-- 1. Batchモードに設定する
SET 'execution.runtime-mode' = 'BATCH';
-- 2. Sinkテーブルを作成する(最小パラメータセットを使用)
CREATE TABLE orders_sink (
order_id BIGINT,
user_id BIGINT,
product_name STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'test',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'test',
'table-name' = 'orders'
);
-- 3. データソースからインポートする(ここでは別のテーブルから読み取ることを想定しています)
INSERT INTO orders_sink
SELECT order_id, user_id, product_name, amount, order_time
FROM orders_source;
例2:空テーブルへのフルインポート(fullモード)
初めて大量のデータを空テーブルにインポートするシナリオに適しています。
SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '8'; -- データ量に応じて調整
CREATE TABLE user_profile_sink (
user_id BIGINT,
username STRING,
email STRING,
age INT,
city STRING,
register_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'prod',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'userdb',
'table-name' = 'user_profile',
-- fullモードを使用する(空テーブルに適しています)
'load-method' = 'full',
-- パフォーマンスチューニング
'parallel' = '16', -- サーバー側の並列度
'buffer-size' = '2048'
);
-- CSVファイルやその他のデータソースからインポートする
INSERT INTO user_profile_sink
SELECT * FROM user_profile_csv_source;
例3:増分置換インポート(inc_replaceモード)
古いデータを新しいデータで上書きするシナリオに適しています。
SET 'execution.runtime-mode' = 'BATCH';
CREATE TABLE product_info_sink (
product_id BIGINT,
product_name STRING,
price DECIMAL(10, 2),
stock INT,
update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'ecommerce',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'productdb',
'table-name' = 'product_info',
-- inc_replaceモードを使用する(既存データを自動的に上書きする)
'load-method' = 'inc_replace', -- dup-actionパラメータは無視される
-- パフォーマンス設定
'parallel' = '16',
'buffer-size' = '2048'
);
-- 更新データのインポート(古いレコードを自動的に上書きする)
INSERT INTO product_info_sink
SELECT * FROM product_info_latest;
例4:Kafkaから有界データを読み取り、インポートする
この例では、Kafkaから有界データ(boundedモード)を読み取り、OceanBaseにインポートする方法を示します。
SET 'execution.runtime-mode' = 'BATCH';
-- Kafkaソースを作成(有界モード)
CREATE TABLE kafka_source (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-import-group',
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset', -- 有界モード:最新のoffsetに達したら停止する
'format' = 'json'
);
-- OceanBaseシンクを作成
CREATE TABLE event_sink (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'analytics',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'eventdb',
'table-name' = 'events',
'load-method' = 'full',
'parallel' = '16'
);
-- データをインポート
INSERT INTO event_sink
SELECT * FROM kafka_source;
例5:ファイルからデータをインポートする
CSVファイルからOceanBaseに一括してデータをインポートします。
SET 'execution.runtime-mode' = 'BATCH';
-- ソースファイルを作成
CREATE TABLE csv_source (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2)
) WITH (
'connector' = 'filesystem',
'path' = 'file:///data/employees.csv',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'false'
);
-- OceanBase Sinkを作成
CREATE TABLE employee_sink (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '192.168.1.100',
'port' = '2882',
'tenant-name' = 'hrms',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'hr',
'table-name' = 'employees',
'load-method' = 'full',
'parallel' = '8',
'buffer-size' = '2048',
'max-error-rows' = '10' -- 最大10行のエラーを許容
);
-- インポートを実行
INSERT INTO employee_sink
SELECT * FROM csv_source;
DirectLoadコネクタの使用に関するベストプラクティス
パフォーマンス最適化
Flink並列度の適切な設定
Flinkの並列度は、並列するWriterタスクの数を決定します。適切に設定することで、クラスタリソースを最大限に活用できます。
-- データ量とクラスタリソースに応じて並列度を設定
SET 'parallelism.default' = '8';
parallelパラメータのチューニング(サーバー側並列度)
parallelパラメータは、コミット段階のパフォーマンスに大きな影響を与えます。
チューニング戦略:
- 大量データのインポート時には、
parallelを適切に引き上げることで、コミット段階の時間を大幅に短縮できます。 - テナントCPU数の 2–4倍 に設定することを推奨します。
- 設定が高すぎる心配は不要です。サーバー側が自動的に制限します。
-- 例:8Cテナント
'parallel' = '16' -- または '32'
パフォーマンス比較例
parallel |
コミット段階の所要時間 |
|---|---|
| 8(デフォルト) | 約10分 |
| 16 | 約5分 |
| 32 | 約3分 |
buffer-sizeのチューニング
データ特性に応じてバッファサイズを調整します:
-- 小行データ(各行 < 1KB)
'buffer-size' = '4096'
-- 中等行データ(各行 1–10KB)
'buffer-size' = '1024' -- デフォルト値
-- 大行データ(各行 > 10KB)
'buffer-size' = '512'
本番環境での推奨事項
適切なインポート時間帯の選択
ダイレクトロード中はテーブルがロックされるため、以下を推奨します:
- ビジネスのオフピーク時間帯を選択:例えば深夜や週末
- 関係者への事前通知:他の業務への影響を避けるため
- 適切なタイムアウト時間の設定:インポートタスクが長時間テーブルを占有するのを防ぐため
-- 2時間のタイムアウトを設定(データ量に応じて評価)
'timeout' = '2h'
シナリオに応じたload-methodの選択
シナリオ |
推奨 load-method |
理由 |
|---|---|---|
| 初回インポートでテーブルが空の場合 | full |
パフォーマンスが最適で、カラムストアクエリの効果が最良 |
| 定期的な増分データの追加 | inc |
増分シナリオに適しており、競合チェックをサポート |
| 毎日のディメンションテーブルのフル更新 | inc_replace |
古いデータを自動的に上書きするため、手動で削除する必要はありません |
| 過去データのロールバック | full(一時テーブルへのインポート後に切り替える) |
オンラインテーブルへの影響を避ける |
カラムストアテーブルの使用に関する推奨事項
対象がカラムストアテーブルの場合、異なるload-methodの影響に注意する必要があります:
fullモード(推奨):- データは直接カラムストア形式で書き込まれます。
- インポート完了後のクエリパフォーマンスが最適です。
- 空のテーブル、または再構築を許容できるテーブルにのみ適しています。
inc/inc_replaceモード:- データはダンプ(行ストア形式)に書き込まれます。
- インポート後のクエリパフォーマンスは行ストアのパフォーマンスです。
- カラムストアのパフォーマンスに達するには、メジャーコンパクションを一度待つ必要があります。
- クエリパフォーマンスを重視する場合、マージを手動でトリガーすることを推奨します:
-- OceanBaseでマージを手動でトリガー
ALTER SYSTEM MAJOR FREEZE;
よくある質問
Q1:インポート中に他の書き込み操作が失敗した場合はどうすればよいですか?
問題の概要:ダイレクトロードを実行中に、ターゲットテーブルに対する INSERT/UPDATE/DELETE 操作が失敗します。
原因:これはダイレクトロードの固有の特性です。インポート中、ターゲットテーブルはロックされ、SELECT 操作のみが許可されます。
解決策:
方法1:閑散時にインポートする
- インポートタスクを業務の閑散時間帯(例:深夜)にスケジュールします。
- 事前に業務担当者と連絡を取り、書き込み操作を一時停止してもらいます。
方法2:中間テーブルを使用する
-- 1. 一時テーブルにインポート CREATE TABLE target_table_tmp LIKE target_table; -- DirectLoadを使用してtarget_table_tmpにインポート -- 2. テーブル名を切り替える RENAME TABLE target_table TO target_table_old, target_table_tmp TO target_table;- まず一時テーブルにインポートします。
- インポート完了後、テーブル切り替えまたはデータマージ操作を実行します。
方法3:通常のJDBCコネクターを使用する
- テーブルのロックを受け入れられない場合は、
flink-connector-oceanbaseコネクターに切り替えてください。 - テーブルの可用性を優先し、パフォーマンスを一部犠牲にします。
- テーブルのロックを受け入れられない場合は、
Q2:ジョブが終了しない / データがコミットされない場合はどうすればよいですか?
問題の概要:Flinkジョブが常に RUNNING 状態であり、OceanBaseではデータを検索できません。
原因:DirectLoadコネクターの最終コミットは、入力終了(end-of-input)段階で発生します。入力が無限ストリームの場合、ジョブは終了せず、データもコミットされません。
トラブルシューティング手順:
バッチモードが設定されていないか確認します:
SET 'execution.runtime-mode' = 'BATCH';データソースが有限ストリームであるか確認します:
- ファイルデータソース:本来有限
- Kafka:
scan.bounded.modeを設定する必要があります。 - JDBC:本来有限
- カスタムSource:end-of-input信号が正しく送信されているか確認します。
Flink Web UIを確認します:
- ジョブの状態を確認します。
- バックプレッシャーが発生していないか確認します。
- 各演算子の処理進捗を確認します。
Q3:load-method はどのように選択しますか?
意思決定ツリー:
空のテーブルへの初回インポートですか?
├─ はい → fullモードを使用する(パフォーマンス最適、カラムストア効果最良)
└─ いいえ(非空のテーブルへのインポート)
└─ 既存のレコードを上書きする必要がありますか?
├─ はい → inc_replaceモードを使用する
└─ いいえ → incモードを使用する
└─ 主キー競合を許容しますか?
├─ いいえ → dup-action = STOP_ON_DUP
├─ 許容(新規データを保持)→ dup-action = REPLACE
└─ 許容(古いデータを保持)→ dup-action = IGNORE
特殊なシナリオ:
- カラムストアテーブルで最適なクエリパフォーマンスを求める場合:
fullモードを優先的に使用します。 - OceanBaseバージョン < 4.3.2:
fullモードのみ使用可能です。 - 頻繁な増分インポートが必要な場合:
incまたはinc_replaceモードを使用します。
Q4:カラムストアテーブルで inc/inc_replace モードを使用した後、クエリが遅い場合はどうすればよいですか?
問題の概要:inc または inc_replace モードでカラムストアテーブルにデータをインポートした後、クエリパフォーマンスが期待通りになりません。
原因:inc/inc_replace モードではデータ書き込みがダンプ(行ストア形式)され、カラムストア形式に変換するにはメジャーコンパクションを一度待つ必要があります。
解決策:
手動でメジャーコンパクションをトリガーする(推奨):
-- OceanBaseで実行 ALTER SYSTEM MAJOR FREEZE; -- マージの進捗を確認する SELECT * FROM oceanbase.DBA_OB_MAJOR_COMPACTION;自動マージを待つ: OceanBaseは定期的に自動でマージをトリガーしますが、時間がかかる場合があります(設定による)。
Q5:インポート速度が非常に遅い場合、どのように最適化しますか?
考えられる原因と解決策:
Flinkの並列度が低すぎる:
SET 'parallelism.default' = '16'; -- 並列度を増やすサーバー側の並列度が低すぎる:
'parallel' = '16' -- サーバー側の並列度を増やすbuffer-sizeが小さすぎる:
'buffer-size' = '2048' -- バッファサイズを大きくするOceanBaseのリソース不足:
- OceanBaseクラスタの負荷を確認する
- スケールアウトを検討するか、オフピーク時にインポートを実行する