OceanBaseデータベースを使用する際、即時インポートや遅延インポートなど、さまざまなデータインポートのニーズに対して、異なるデータインポート戦略を採用する必要があります。本記事では、T+0とT+1の2つのシナリオにおいて、OceanBaseデータベースにデータをインポートする方法を詳しく説明し、移行におけるベストプラクティスのサンプルを提供します。
背景
T+0とは、データ生成後に即座に(リアルタイムで)処理・インポートを行うことです。これは通常、システムに対して高同時実行データ書き込み操作のサポートと、データのリアルタイム性と即時可用性の保証を要求します。
T+1とは、データ生産後の翌営業日(「T+1」は取引日プラス1日を表す)に処理・インポートを行うことです。T+0と比較して、T+1シナリオではリアルタイム性の要求が低くなります。
ベースラインデータのインポート
ベースラインデータとは、特定の時点や条件下におけるシステムやプロジェクトの初期データセットを指します。これは、システムの開始状態やパフォーマンスレベルを表し、将来の変更の参照や比較に使用されます。OceanBaseデータベースでは、以下のベースラインデータインポートソリューションを提供しています:
データベース間のオンライン移行
データがOMSでサポートされているソース側にある場合、OMSを使用してデータをインポートすることを推奨します。OMSがデータソースをサポートしていない場合は、DataXを使用してデータをインポートするか、オフラインインポートを使用できます。
OMSの詳細については、OMS使用ドキュメントを参照してください。
DataXの詳細については、DataX使用ドキュメントを参照してください。
ファイルからデータベースへのオフライン移行
OMSがデータソースをサポートしていない場合、データをオフラインファイルとしてエクスポートし、obloader、DataX、またはLOAD DATAを使用してインポートできます。その中でも、obloaderはダイレクトロードをサポートしており、データインポートのパフォーマンスを向上させることができます。
- obloaderの詳細については、obloader使用ドキュメントを参照してください。
- DataXの詳細については、DataX使用ドキュメントを参照してください。
- LOAD DATA LOCALの使用方法については、LOAD DATA (Oracleモード)および LOAD DATA (MySQLモード)を参照してください。
増分データのインポート
T + 0シナリオ
T+0とは、データ生成後に即座に(リアルタイムで)処理・インポートを行うことです。OMSのインクリメンタル移行機能を使用してデータをリアルタイムでOceanBaseデータベースにインポートしたり、FlinkやDataXなどのサードパーティ統合ツールを使用してインポートしたりすることもできます。このうち、DataXとOMSはデータインポートのみをサポートします。Flinkは、リアルタイムデータ計算処理(データのワイドニングや集約など)、ダウンストリームのストレージ、分析、サービスをサポートします。
T + 1シナリオ
T+1とは、データが生成後の翌営業日に処理・インポートを行うことです。T+1データインポートシナリオでは、バッチ処理方式を採用してデータインポートパフォーマンスを最適化できます:
オフラインデータ処理
1日に生成されたデータをまとめて処理し、ピーク時間帯以外にバッチ処理でOceanBaseデータベースにインポートすることで、ビジネスへの影響を軽減します。OMSのインクリメンタル移行機能を使用して翌営業日にデータをOceanBaseデータベースにインポートしたり、FlinkやDataXなどのサードパーティ統合ツールを使用してインポートしたりすることもできます。例えば、DataXを使用してビジネスデータベースに対して日単位で定期的にフルデータ同期を実行できます。
パーティション交換
ターゲットテーブルがパーティションテーブルで、既にデータが存在している場合があります。例えば、日単位でパーティションになっている場合などです。このような場合、ターゲットテーブルと全く同じ構造でパーティションのないテーブルを作成します。データを空のテーブルに挿入します。この時点で、パーティション交換コマンドを実行してデータを交換します。
既存のデータを持つパーティションテーブルに新しいデータを直接インポートすると、パフォーマンスが低下する可能性があります。データのインポート効率を向上させるために、パーティション交換(Partition Exchange)機能を使用できます。パーティション交換の手順は以下のとおりです:
- ターゲットのパーティションテーブルと全く同じ構造の非パーティション一時テーブルを作成します。
- インポート対象のデータをこの一時テーブルに挿入します。
- パーティション交換コマンドを使用して、一時テーブルのデータとターゲットのパーティションテーブルの特定パーティションのデータを高速で交換します。
詳細については、パーティション交換(MySQLモード)および パーティション交換(Oracleモード)ドキュメントを参照してください。
ベストプラクティス
DataXとFlink SQLベースのリアルタイムデータウェアハウスの構築
データ移行方式:
履歴データ
DataXを使用して履歴データをCSVファイルにエクスポートした後、再度DataXを使用してCSVファイルをOceanBaseデータベースにインポートします。データインポートのプロセスで、DataXを使用してCSVファイルをエクスポートする場合、設定ファイルで2881ポートを使用してOceanBaseデータベースに直接接続することを推奨します。2883ポート(OBProxyプロキシ)を使用すると、コマンドによっては別のマシンに配信される可能性があるためです。このような時、別のマシンにDataXがデプロイされておらずCSVファイルがない場合、ファイルが見つからないことがあります。
リアルタイムデータ
Flink SQLを使用してリアルタイムデータを抽出し、データをOceanBaseデータベースに書き込んで、ミリ秒レベルの応答を実現します。テスト結果によれば、データ生成からOceanBaseデータベースへのデータ格納まで、1秒以内に完了できます。
Flink CDCとOceanBaseデータベースベースのリアルタイムデータウェアハウスの構築
このセクションでは、Streaming OLAPソリューションを使用してOceanBaseデータベースのリアルタイムデータウェアハウスを構築する方法を紹介します。 OceanBaseデータベースは、行列混合ストレージのHTAP特性をサポートしているため、1つのシステムでトランザクション処理と複雑な分析をサポートできます。

Flink CDCを使用して、MySQLのフルデータと増分データをOceanBaseデータベースに同期し、ダウンストリームがサブスクライブするODSデータ層を形成します。サブスクライブと同時に、データの読み取りと加工・ワイドニングを行い、ダウンストリームのOceanBaseデータベースに書き込んでDWDデータ層を形成し、集約によってDWSデータ層を形成します。この時、OceanBaseデータベースはクエリサービスとデータ消費を提供します。このソリューションでは、OceanBaseデータベースがKVサービス、分析サービス、Kafkaなどのコンポーネントを置き換えます。また、OceanBaseデータベースの各層は照会、更新、修正が可能です。そのため、あるデータ層で問題が発生した場合、その層のテーブルデータを直接調査して修正でき、より効率的な調査が可能となります。
以下の例では、注文明細テーブルを集約し、DWS層の統計テーブルに書き込んでから、各店舗の1日あたりの売上高を取得します。
OceanBase CDCのデータソースを定義します。これはordersテーブルに由来するものです。
CREATE TABLE dwd_orders ( order_id BIGINT, order_user_id BIGINT, order_shop_id BIGINT, order_product_id VARCHAR, order_fee DECIMAL(20,2), order_updated_time TIMESTAMP(3), pay_id BIGINT, pay_create_time TIMESTAMP(3), PRIMARY KEY (order id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'user@test_tenant', 'password' = 'pmsd', 'tenant-name' = 'test_tenant', 'database-name' = 'test_db', 'table-name' = 'orders', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', 'logproxy.host' = '127.0.0.1', 'logproxy.port' = '2983' );FlinkCDCを使用して店舗売上高を統計し、JDBCテーブルをOceanBaseデータベースのテーブルに書き込んで、店舗指標の統計層を形成します。
CREATE TABLE dws_shops ( shop_id BIGINT, ds STRING, -- 当日の支払い総額 paid_buy_fee_sum DECIMAL(20, 2), PRIMARY KEY(shop_id,ds) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'shops', 'username' = 'user@test_tenant', 'password' = 'pswd' );注文明細層(dwd_orders)のフルデータと増分データをリアルタイムで読み取り、リアルタイムで集約・加工を行います。その後、ダウンストリームのOceanBaseデータベース(dws_shops)に書き込みます。dwd_shopsテーブルは、別のFlinkによって読み取り・加工され、次の層の結果テーブルを形成します。これにより、ストリーミングデータウェアハウス全体の階層構造を構築します。
INSERT INTO dws_shops SELECT order_shop_id, date_format(pay_create_time, 'yyyyMMdd') AS ds, -- 日付フォーマットの修正 SUM(order_fee) AS paid_buy_fee_sum FROM dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_shop_id, date_format(pay_create_time, 'yyyyMMdd');
注意点
データのインポート後は、クエリパフォーマンスを向上させるために、フルマージを一度実行することを推奨します。
バッチインポート
インポートの完了後に一度マージします。
リアルタイムインポート
リアルタイムインポートの完了後にマージします。リアルタイムインポートが継続的なインポートで、人為的な介入によるマージのタイミングがない場合は、システムによる自動スケジューリングで対応します。