このドキュメントでは、Flink-OMTを使用してStarRocksデータベースからOceanBaseデータベースへデータを同期する方法について説明します。サンプル環境の準備、データの準備、タスクの設定、検証など、完全なプロセスが含まれます。
背景
Flink-OMT(flink-oceanbase-migrate-tool)は、OceanBaseが提供するApache Flinkベースのデータ移行ツールであり、StarRocksデータベースからOceanBaseデータベースへのリアルタイムデータ同期をサポートしています。Flink-OMTはエンドツーエンドのデータ統合フレームワークを提供し、テーブル構造の自動同期機能を備えており、複数のデータベースおよびテーブル間の同期やルーティングによる複数のデータベースおよびテーブル間の同期をサポートしています。これにより、StarRocksからOceanBaseへのデータ移行に対して効率的かつ信頼性の高いソリューションを提供します。
使用中に何か問題が発生した場合は、GitHub Issuesページにてフィードバックをお寄せください。
環境準備
システム要件
- オペレーティングシステム:LinuxまたはMacOS
Flink Standaloneクラスタ環境の構築
Flink 1.19.1をダウンロードします。解凍後、以下のコマンドを使用して
flink-1.19.1ディレクトリを開き、FLINK_HOMEをflink-1.19.1が存在するディレクトリに設定します。cd flink-1.19.1conf/flink-conf.yaml設定ファイルにチェックポイントメカニズムを有効にするための以下のパラメータを追加し、チェックポイント間隔を3000ミリ秒に設定します。execution.checkpointing.interval: 3000conf/flink-conf.yaml設定ファイルで以下のパラメータを調整し、各TaskManagerのスロット数を増やします。taskmanager: numberOfTaskSlots: 3Flinkクラスタを起動します。
./bin/start-cluster.sh起動に成功すると、次の図のように
http://localhost:8081/からFlinkページにアクセスできます。
Flink-OMTのダウンロード
flink-omt JARパッケージをダウンロードします。
説明
Flink-OMT GitHubページにアクセスすると、より多くのバージョン更新情報を入手できます。
JARパッケージを
FLINK_HOMEのlibディレクトリに移動します。
データベース環境の準備
このチュートリアルでは、Docker Composeを使用してStarRocksとOceanBaseデータベースをデプロイしてデモを行います。
StarRocksとOceanBaseのDockerサンプル環境の構築
StarRocksとOceanBaseを含むDocker Compose設定ファイルを作成します:
services:
StarRocks:
image: starrocks/allin1-ubuntu
ports:
- "8030:8030"
- "8040:8040"
- "9030:9030"
- "9060:9060"
OceanBase:
image: oceanbase/oceanbase-ce
ports:
- "2881:2881"
- "2882:2882"
environment:
- OB_TENANT_PASSWORD=123456
StarRocksとOceanBaseサービスの起動
docker-compose.ymlファイルがあるディレクトリで、以下のコマンドを実行してコンテナを起動します:
docker-compose up -d
このコマンドは、設定されたすべてのコンテナをdetachedモードで起動します。docker psコマンドを使用して、コンテナが正常に起動したかどうか確認できます。
サンプルデータの準備
StarRocksデータベースのデータ準備
StarRocksデータベースにテストデータベースとテーブルを作成し、サンプルデータを挿入します。
StarRocksデータベースに接続します。
この例では、DBeaverを使用してStarRocksに接続しますが、他のデータベースクライアントツールを使用することもできます。接続情報は以下のとおりです:
接続方法:ホスト
サーバーのアドレス:
localhostポート:
9030ユーザー名:
root
サンプルデータベースとテーブル構造を作成します。
-- パスワードの設定 SET PASSWORD = PASSWORD('123456'); -- データベースの作成 CREATE DATABASE test1; CREATE DATABASE test2; -- test1_orders1テーブルの作成 CREATE TABLE IF NOT EXISTS test1_orders1 ( order_id INT DEFAULT '1' COMMENT 'order id', order_date DATETIME, customer_name VARCHAR(225) DEFAULT 'default', price DOUBLE, product_id INT, order_status BOOLEAN ) DISTRIBUTED BY HASH(order_id) PROPERTIES ( "replication_num" = "1" ); -- test1_orders2テーブルの作成 CREATE TABLE IF NOT EXISTS test1_orders2 ( order_id INT COMMENT 'order id', order_date DATETIME, customer_name VARCHAR(1048576), price DOUBLE, product_id INT, order_status INT DEFAULT '0' ) PRIMARY KEY (order_id) DISTRIBUTED BY HASH(order_id) PROPERTIES ( "replication_num" = "1" ); -- test1_orders3テーブルの作成 CREATE TABLE IF NOT EXISTS test1_orders3 ( order_id INT COMMENT 'order id', order_date DATETIME, customer_name VARCHAR(1048576), price DOUBLE, product_id INT, order_status INT DEFAULT '0' ) PRIMARY KEY (order_id) DISTRIBUTED BY HASH(order_id) PROPERTIES ( "replication_num" = "1" ); -- test2_orders3テーブルの作成 CREATE TABLE IF NOT EXISTS test2_orders3 ( k1 DATE, k2 INT, k3 SMALLINT, v1 VARCHAR(2048), V0 DATETIME DEFAULT CURRENT_TIMESTAMP, v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) ENGINE = OLAP DUPLICATE KEY(k1, k2, k3) PARTITION BY RANGE (k1) ( PARTITION p1 VALUES LESS THAN ("2014-01-01"), PARTITION p2 VALUES LESS THAN ("2014-06-01"), PARTITION p3 VALUES LESS THAN ("2014-12-01") ) DISTRIBUTED BY HASH(k2) PROPERTIES( "replication_num" = "1", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-06-04 00:00:00" ); -- test2_orders4テーブルの作成 CREATE TABLE IF NOT EXISTS test2_orders4 ( id BIGINT COMMENT 'Bigint column', flag TINYINT(1) COMMENT 'Boolean type example', char_col CHAR(10) NOT NULL COMMENT 'Char type example', date_col DATE NOT NULL COMMENT 'Date type example', datetime_col DATETIME COMMENT 'Datetime type example', decimal_col DECIMAL(18, 4) COMMENT 'Decimal type example', double_col DOUBLE COMMENT 'Double type example', float_col FLOAT COMMENT 'Float type example', int_col INT NOT NULL COMMENT 'Int type example', smallint_col SMALLINT COMMENT 'Smallint type example', string_col STRING COMMENT 'String type example, variable-length string', tinyint_col TINYINT COMMENT 'Tinyint type example', varchar_col VARCHAR(255) COMMENT 'Varchar type example, variable-length string', json_col JSON COMMENT 'Json type example, stores JSON formatted data' ) ENGINE=OLAP DUPLICATE KEY(`id`) PARTITION BY (date_col, char_col) DISTRIBUTED BY HASH(`id`) PROPERTIES ( "replication_num" = "1" );テストデータを挿入します。
-- test1_orders1データの挿入 INSERT INTO test1_orders1 (order_id, order_date, customer_name, price, product_id, order_status) VALUES (1, '2024-12-05 10:28:07', 'xx', 2.3, 1, 1); -- test1_orders2データの挿入 INSERT INTO test1_orders2 (order_id, order_date, customer_name, price, product_id, order_status) VALUES (111, '2024-12-05 10:02:31', 'orders2', 2.3, 1, 1); -- test1_orders3データの挿入 INSERT INTO test1_orders3 (order_id, order_date, customer_name, price, product_id, order_status) VALUES (10, '2024-12-05 10:02:31', 'orders3', 2.3, 1, 1), (11, '2024-12-01 10:03:31', 'orders3-2-route', 2.3, 1, 1), (12, '2024-12-02 10:02:35', 'orders3', 2.3, 1, 1); -- test2_orders4データの挿入 INSERT INTO test2_orders4 (id, flag, char_col, date_col, datetime_col, decimal_col, double_col, float_col, int_col, smallint_col, string_col, tinyint_col, varchar_col, json_col) VALUES (1, TRUE, 'A123456789', '2023-01-01', '2023-01-01 10:10:10', 1234.5678, 1.23456789, 1.2345, 123, 12, 'example string 1', 1, 'example varchar 1', '{"key1": "value1"}'), (2, FALSE, 'B987654321', '2023-02-01', '2023-02-02 11:11:11', 9876.5432, 9.87654321, 9.8765, 456, 34, 'example string 2', 2, 'example varchar 2', '{"key2": "value2"}'), (3, TRUE, 'C102938475', '2023-03-01', '2023-03-03 12:12:12', 5678.1234, 5.67812345, 5.6789, 789, 56, 'example string 3', 3, 'example varchar 3', '{"key3": "value3"}'), (4, FALSE, 'D564738291', '2023-04-01', '2023-04-04 13:13:13', 4321.8765, 4.32187654, 4.3211, 101, 78, 'example string 4', 4, 'example varchar 4', '{"key4": "value4"}'), (5, TRUE, 'E019283746', '2023-05-01', '2023-05-05 14:14:14', 8765.4321, 8.76543210, 8.7654, 202, 90, 'example string 5', 5, 'example varchar 5', '{"key5": "value5"}');
Flink-OMTサンプルタスクの設定
データ同期タスクの設定ファイルを作成します。以下の例は、StarRocksからOceanBaseへのデータベース全体の同期タスクを設定する方法を示しており、データソースの接続、ターゲットデータベースの設定、および同期パラメータの設定が含まれています。
標準同期モードの設定:
################################################################################ # Description: Sync StarRocks all tables to OceanBase ################################################################################ source: type: StarRocks jdbc-url: jdbc:mysql://localhost:9030/sys username: root password: 123456 scan-url: localhost:8030 scan.max-retries: 1 tables: test[1-2].orders[0-9] oceanbase: url: jdbc:mysql://localhost:2881/test username: root@test password: 123456 schema-name: test pipeline: name: Sync StarRocks Database to OceanBase parallelism: 2ダイレクトロードモードの設定:
大規模データのシナリオにおいて、Flink-OMTはダイレクトロード方式でOceanBaseデータベースに書き込むことをサポートしています。この方式では、SQL解析層をバイパスし、直接基盤となるデータファイルに領域を割り当ててデータを書き込むため、インポート性能が大幅に向上します。設定例は以下のとおりです:
################################################################################ # Description: Sync StarRocks all tables to OceanBase ################################################################################ source: type: StarRocks jdbc-url: jdbc:mysql://localhost:9030/sys username: root password: 123456 scan-url: localhost:8030 scan.max-retries: 1 tables: test[1-2].orders[0-9] oceanbase: type: direct-load url: jdbc:mysql://localhost:2881/test username: root@test host: localhost port: 2882 password: 123456 schema-name: test pipeline: name: Sync StarRocks Database to OceanBase parallelism: 2
タスクの送信と実行
以下のコマンドを使用して、Flink Standaloneクラスタにタスクを送信します。
<FLINK_HOME>bin/flink run \
-D execution.checkpointing.interval=10s\
-D parallelism.default=1\
-c com.oceanbase.omt.cli.CommandLineCliFront\
lib/flink-omt-1.0-SNAPSHOT_flink-1.19.jar \
-config StarRocks-to-OceanBase.yaml
タスクをdetachedモードで実行するには、次のコマンドを使用します。
<FLINK_HOME>bin/flink run \
-D execution.checkpointing.interval=10s\
-D parallelism.default=18\
-d \
-c com.oceanbase.omt.cli.CommandLineCliFront\
lib/flink-omt-1.0-SNAPSHOT_flink-1.19.jar \
-config StarRocks-to-OceanBase.yaml
送信が成功すると、次のようなメッセージが表示されます。
============= The following tables will be migrate from StarRocks to oceanbase ============
test2.orders4
test2.orders3
test1.orders3
test1.orders1
test1.orders2
Job has been submitted with JobID 957bd429a36e60f4329e7a0412d4a489
Program execution finished
Job with JobID 957bd429a36e60f4329e7a0412d4a489 has finished.
Job Runtime: 3587 ms
データ同期結果の検証
Flinkページでは、「Sync StarRocks Database to OceanBase」というタスクが実行中であることが確認できます。以下の図を参照してください。
DBeaverでOceanBaseデータベースに接続すると、データテーブルの作成が完了し、データが正常に書き込まれていることが確認できます。以下の図を参照してください。
サンプル環境のクリーンアップ
データ同期タスクが完了した後、システムリソースを解放するためにサンプル環境をクリーンアップすることを推奨します。
docker-compose.ymlファイルがあるディレクトリで以下のコマンドを実行して、すべてのコンテナを停止します:docker-compose downFlinkが配置されているディレクトリ(
flink-1.19.1)で以下のコマンドを実行して、Flinkクラスタを停止します:./bin/stop-cluster.sh