OceanBaseデータベースを使用する際、リアルタイムインポートや遅延インポートなど、さまざまなデータインポートのニーズに直面します。これらのシナリオでは、異なるデータインポート戦略を採用する必要があります。本記事では、T+0およびT+1の2つのシナリオにおいて、OceanBaseデータベースへのデータインポート方法を詳しく説明し、最適な移行プラクティスの例を提示します。
背景
T+0とは、データが生成された直後に処理およびインポートを行うことです。これには通常、システムが高並行なデータ書き込み操作をサポートし、データのリアルタイム性と即時利用可能性を保証できることが求められます。
T+1とは、データが生成された翌営業日(「T+1」は取引日に加えて1日を意味します)に処理およびインポートを行うことです。T+0に比べ、T+1の場合はリアルタイム性に対する要求が低くなります。
ベースラインデータのインポート
ベースラインデータとは、特定の時点または条件下でのシステムやプロジェクトの初期データセットを指します。これはシステムの開始状態やパフォーマンスレベルを表し、将来の変更における参照および比較に使用されます。OceanBaseデータベースでは、以下の方法でベースラインデータをインポートできます:
データベース間のオンライン移行
データがOMSがサポートするソースにある場合は、OMSを使用してデータをインポートすることを推奨します。OMSがデータソースをサポートしない場合は、DataXを使用してデータをインポートするか、オフラインでインポートできます。
- DataXの詳細については、DataXドキュメントを参照してください。
ファイルからデータベースへのオフライン移行
OMSがデータソースをサポートしない場合、データをオフラインファイルとしてエクスポートし、obloader、DataX、またはLOAD DATAを使用してインポートできます。その中で、obloaderはダイレクトロードをサポートしており、データインポートのパフォーマンスを向上させることができます。
- DataXの詳細については、DataXドキュメントを参照してください。
- LOAD DATA LOCALの使用方法については、LOAD DATA(Oracleモード)およびLOAD DATA(MySQLモード)を参照してください。
増分データのインポート
T + 0シナリオ
T+0とは、データが生成された直後に(リアルタイムで)処理およびインポートを行うことです。OMSの増分移行機能を使用してOceanBaseデータベースにデータをリアルタイムにインポートするか、FlinkやDataXなどのサードパーティ統合ツールを使用してインポートできます。その中で、DataXとOMSはデータのインポートのみをサポートします。Flinkは、データのワイドニングや集計などのリアルタイムデータ計算処理、および下流のストレージ、分析、サービスをサポートします。
T + 1シナリオ
T+1とは、データが生成された翌営業日に処理およびインポートを行うことです。T+1データインポートシナリオでは、バッチ処理を採用してデータインポートのパフォーマンスを最適化できます:
オフラインデータ処理
1日間に生成されたデータを一括処理し、オフピーク時間帯にOceanBaseデータベースに一括インポートすることで、業務への影響を軽減します。OMSの増分移行機能を使用して、翌営業日にデータをOceanBaseデータベースにインポートするか、FlinkやDataXなどのサードパーティ統合ツールを使用してインポートできます。例えば、DataXを使用して、業務データベースとの日単位の定期全量データ同期を実現できます。
パーティション交換
ターゲットテーブルがパーティションテーブルで、既にデータが存在する場合。例えば、毎日が1つのパーティションです。この場合、ターゲットテーブルと全く同じ構造のテーブルを作成しますが、パーティションは作成しません。そして、空のテーブルにデータを挿入します。その後、データを交換します。パーティション交換のコマンドは以下のとおりです。
既存のデータがあるパーティションテーブルに直接新しいデータをインポートすると、パフォーマンスが低下する可能性があります。データインポート効率を向上させるために、パーティション交換(Partition Exchange)機能を使用できます。パーティション交換の手順は以下のとおりです:
- ターゲットのパーティションテーブルと構造が完全に一致する非パーティションの一時テーブルを作成します。
- インポート対象のデータをこの一時テーブルに挿入します。
- パーティション交換コマンドを使用して、一時テーブルのデータとターゲットのパーティションテーブルの特定のパーティションを迅速に交換します。
詳細については、パーティション交換(MySQLモード)およびパーティション交換(Oracleモード)のドキュメントを参照してください。
ベストプラクティス
DataXとFlink SQLによるリアルタイムデータウェアハウスの構築
データ移行方法:
過去データ
DataXを使用して過去データをCSVファイルにエクスポートし、その後DataXを使用してCSVファイルをOceanBaseデータベースにインポートします。データインポートプロセスにおいて、DataXでCSVファイルをエクスポートする際は、設定ファイルで2881ポートを使用してOceanBaseデータベースに直接接続することを推奨します。2883ポート、すなわちOBProxyプロキシを使用すると、異なるコマンドが別のマシンに分散される可能性があるためです。その場合、もしその別のマシンにDataXがデプロイされておらず、CSVファイルも存在しなければ、ファイルが見つからないことがあります。
リアルタイムデータ
Flink SQLを使用してリアルタイムデータを抽出し、OceanBaseデータベースに書き込むことで、ミリ秒単位の応答を実現します。テストの結果、データ生成からOceanBaseデータベースへのデータ投入まで、1秒以内に完了できます。
Flink CDCとOceanBaseデータベースによるリアルタイムデータウェアハウスの構築
このセクションでは、ストリーミングOLAPプランを使用してOceanBaseデータベースのリアルタイムデータウェアハウスを構築する方法について説明します。OceanBaseデータベースは行と列を混在させたHTAP機能をサポートしているため、単一のシステムでトランザクション処理と複雑な分析をサポートできます。
Flink CDCを使用して、MySQLの全量データと増分データをOceanBaseデータベースに同期し、ODSデータ層を形成して下流のサブスクリプションに供給します。サブスクライブしながらデータを読み取り、加工およびワイドニングを行い、その後下流のOceanBaseデータベースに書き込んでDWDデータ層を形成します。集計を経てDWSデータ層を形成すると、OceanBaseデータベースはクエリサービスとデータ消費を提供できる状態になります。このプランでは、OceanBaseデータベースがKVサービス、分析サービス、Kafkaなどのコンポーネントに取って代わります。さらに、OceanBaseデータベースの各層は検索可能であり、更新および修正も可能です。したがって、特定の層のデータに問題が発生した場合、その層のテーブルデータを直接調査して修正できるため、問題の調査がより効率的になります。
以下の例では、注文明細表を集計し、DWS層の統計表に書き込んだ後、各店舗の1日あたりの販売量を取得する必要があります。
OceanBase CDCのデータソースを定義します。これは、ordersから取得したテーブルです。
CREATE TABLE dwd_orders ( order_id BIGINT, order_user_id BIGINT, order_shop_id BIGINT, order_product_id VARCHAR, order_fee DECIMAL(20,2), order_updated_time TIMESTAMP(3), pay_id BIGINT, pay_create_time TIMESTAMP(3), PRIMARY KEY (order id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'user@test_tenant', 'password' = 'pmsd', 'tenant-name' = 'test_tenant', 'database-name' = 'test_db', 'table-name' = 'orders', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', 'logproxy.host' = '127.0.0.1', 'logproxy.port' = '2983' );Flink CDCを使用して店舗の売上高を集計し、JDBCテーブルをOceanBaseデータベースのテーブルに書き込むことで、店舗指標の統計層を形成します。
CREATE TABLE dws_shops ( shop_id BIGINT, ds STRING, -- 当日完了支払総額 paid_buy_fee_sum DECIMAL(20, 2), PRIMARY KEY(shop_id,ds) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'shops', 'username' = 'user@test_tenant', 'password' = 'pswd' );注文明細層(dwd_orders)の全量データと増分データをリアルタイムで読み取り、リアルタイムで集計・加工します。その後、下流のOceanBaseデータベース(dws_shops)に書き込みます。dws_shopsテーブルは別のFlinkによって読み取り・加工され、次の層の結果テーブルが形成されます。これにより、ストリーミングデータウェアハウス全体の階層構造が構築されます。
INSERT INTO dws_shops SELECT order_shop_id, date_format(pay_create_time, 'yyyyMMdd') AS ds, -- 日付フォーマットを修正 SUM(order_fee) AS paid_buy_fee_sum FROM dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_shop_id, date_format(pay_create_time, 'yyyyMMdd');
注意点
データをインポートした後、クエリのパフォーマンスを向上させるために、フルコンパクションを実行することを推奨します。
バッチインポート
インポート完了後に1回メジャーコンパクションを実行します。
リアルタイムインポート
リアルタイムインポート完了後にメジャーコンパクションを実行します。リアルタイムインポートがヘッドレスで終わりのない継続的なインポートであり、人為的な介入によるメジャーコンパクションのタイミング決定が不要な場合は、システムによる自動スケジューリングで十分です。