SeaTunnelは分散型、高拡張性データ統合プラットフォームで、さまざまなデータストレージシステム間でのデータ転送に使用されます。SeaTunnelは、構成が容易なデータフロータスクプラグイン設定を提供しており、ユーザーはデータ処理および変換のプロセスを柔軟に定義できます。SeaTunnelに関する詳細は、SeaTunnelの GitHubアドレスを参照してください。
前提条件
データ同期を行う前に、以下の情報を確認してください。
- SeaTunnelがダウンロード済みであること。
- Java 1.8以降のバージョンがインストール済みであること。
- MySQLドライバーは
libディレクトリにダウンロード済みであること。 - oblogproxyがインストール済みであること。
SeaTunnelのMySQL CDC Source Connectorを使用して、OceanBaseデータベースのMySQLテナントでSQLクエリを実行する方法を説明します。connector-cdc-mysql は、SeaTunnelのプラグインの1つで、MySQLデータベースのリアルタイムな変更データ(Change Data Capture、CDC)をキャッチし、他システムへストリーム転送します。
操作手順
以下の手順に従って、OceanBaseデータベースのMySQLテナント間でのデータ同期を実行する:
SeaTunnelユーザーを作成します。
CREATE USER 'ユーザー名'@'%' IDENTIFIED BY 'パスワード'; GRANT SELECT ON *.* TO 'ユーザー名';config/plugin_configを設定します。connector-cdc-mysqlを指定して、sh bin/install-plugin.sh 2.3.3を実行します。SeaTunnelを設定します。
設定ファイル
config/v2.streaming.conf.templateを修正します:source { # This is a example source plugin **only for test and demonstrate the feature source plugin** MySQL-CDC { result_table_name = "fake" parallelism = 1 server-id = 5656 username = "ユーザー名" password = "パスワード" table-names = ["seatunnel.full_types"] base-url = "jdbc:mysql://xx.xxx.x.xx:2881/seatunnel" } # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins, # please go to https://seatunnel.apache.org/docs/category/source-v2 }SeaTunnelを起動します。
以下のコマンドを実行して、SeaTunnelを起動します:
./bin/seatunnel.sh --config ./config/v2.streaming.conf.template -e local
SeaTunnelのJDBC OceanBase Source ConnectorとJDBC OceanBase Sink Connectorを使用してデータを同期する方法について説明します。JDBC OceanBase Source Connectorは、OceanBaseの単一テーブルデータのフルロード読み取りと書き込みをサポートします。詳細については、JDBC OceanBase Source Connectorを参照してください。JDBC OceanBase Sink Connectorは、OceanBaseの複数テーブルデータに対するフルロード読み取りと書き込みをサポートします。詳細については、JDBC OceanBase Sink Connectorを参照してください。
操作手順
以下の手順に従って、OceanBaseデータベースのMySQLテナント間でのデータ同期を実行する:
SeaTunnelユーザーを作成します。
CREATE USER 'ユーザー名'@'%' IDENTIFIED BY 'パスワード'; GRANT SELECT, INSERT, UPDATE, DELETE ON *.* TO 'ユーザー名';config/plugin_configを設定します。OceanBase JDBCドライバーを
libディレクトリにダウンロードします。詳細については、OceanBase Clientを参照してください。SeaTunnelを設定します。
設定ファイル
config/ob_to_obを修正します:env { parallelism = 1 job.mode = "BATCH" } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** Jdbc { driver = com.oceanbase.jdbc.Driver url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" user = "ユーザー名" password = "パスワード" query = "SELECT c_bit_1, c_bit_8 FROM source" compatible_mode = "mysql" } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } sink { Jdbc { driver = com.oceanbase.jdbc.Driver url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" user = "ユーザー名" password = "パスワード" query = "insert into sink(c_bit_1, c_bit_8) values (?, ?);" compatible_mode = "mysql" } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/sink }SeaTunnelを起動します。
以下のコマンドを実行して、SeaTunnelを起動します:
./bin/seatunnel.sh --config ./config/ob_to_ob -e local
注意
OceanBaseデータベースはSETタイプをサポートしていません。
Caused by: org.apache.seatunnel.api.table.catalog.DataTypeConvertException: ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data type] - Convert type: SET to SeaTunnel data type error.
詳細については、SeaTunnel MySQL CDCドキュメントを参照してください。