Flink CDC(Apache Flink用CDCコネクタ)は、Apache Flinkの一連のソースコネクタであり、ほとんどのデータベースから既存の履歴データと増分変更データをリアルタイムで読み取ることをサポートしています。Flink CDCは、データベースのフルデータおよび増分データをメッセージキューおよびデータウェアハウスに同期できます。また、Flink CDCはリアルタイムデータ統合にも使用でき、データベースデータをリアルタイムでデータレイクやデータウェアハウスにインポートするために利用できます。さらに、Flink CDCはデータ処理もサポートしており、そのSQLクライアントを使用してデータベースデータに対してリアルタイムの関連付け、幅広げ、集計を行い、結果をさまざまなストレージに書き込むことができます。CDC(Change Data Capture、変更データキャプチャ)は、データベースの変更を監視およびキャプチャするのに役立ちます。CDCが提供するデータは、履歴データベースの作成、準リアルタイムキャッシュの作成、メッセージキュー(MQ)への提供、ユーザーによるMQの消費と分析・監査など、多くの用途に利用できます。
本記事では、Flink CDCを使用してOceanBaseデータベースからMySQLデータベースへデータを移行する方法について説明します。
環境準備
OceanBase Binlogサービスの設定
Binlogクラスタの作成を参照して、OceanBase Binlogサービスのインストールとデプロイを完了してください。
Flink環境の設定
Flinkと必要な依存パッケージをダウンロードします:
ダウンロードURLからFlinkをダウンロードします。このドキュメントでは、Flink 1.15.3を使用し、ディレクトリ
flink-1.15.3に解凍します。以下に示す依存パッケージをダウンロードし、それらをディレクトリ
flink-1.15.3/lib/に配置します。
データの準備
OceanBaseデータベースのデータ準備
OceanBaseデータベースにテストデータを準備し、MySQLデータベースへのインポート元とします。
OceanBaseデータベースにログインします。
rootユーザーでクラスタのmysql001テナントにログインします。[xxx@xxx /home/admin] $obclient -h10.10.10.2 -P2881 -uroot@mysql001 -p****** -A Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221536981 Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [(none)]>データベース
test_ob_to_mysql、テーブルtbl1およびtbl2を作成し、データを挿入します。obclient [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected obclient [(none)]> USE test_ob_to_mysql; Database changed obclient [test_ob_to_mysql]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0 obclient [test_ob_to_mysql]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20)); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850'); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0
MySQLデータベースの準備
MySQLデータベースに、ソースデータを格納するテーブルを作成します。
MySQLデータベースにアクセスします。
[xxx@xxx /home/admin] $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p****** <Omit echo information> MySQL [(none)]>データベース
test_ob_to_mysqlとテーブルob_tbl1_and_tbl2を作成します。MySQL [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected MySQL [(none)]> USE test_ob_to_mysql; Database changed MySQL [test_ob_to_mysql]> CREATE TABLE ob_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20)); Query OK, 0 rows affected
FlinkクラスタとFlink SQL CLIの起動
以下のコマンドを使用して、Flinkディレクトリに移動します。
[xxx@xxx /FLINK_HOME] #cd flink-1.15.3以下のコマンドを使用して、Flinkクラスタを起動します。
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/start-cluster.sh起動が成功した場合、
http://localhost:8081/でFlink Web UIにアクセスできます。以下のように表示されます:
説明
./bin/start-cluster.shを実行した後、次のメッセージが表示された場合:bash: ./bin/start-cluster.sh: Permission denied。flink-1.15.3ディレクトリ内のすべての-rw-rw-r--権限を持つファイルの権限を-rwxrwxrwx権限に設定する必要があります。例:
[xxx@xxx /.../flink-1.15.3] # chmod -R 777 /FLINK_HOME/flink-1.15.3/*以下のコマンドを使用して、Flink SQL CLIを起動します。
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/sql-client.sh起動が成功すると、次のようなページが表示されます:

チェックポイントの設定
Flink SQL CLIでcheckpointを有効にし、3秒ごとにcheckpointを実行します。
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
OceanBase CDCテーブルの作成
Flink SQL CLIでOceanBaseデータベースに対応するテーブルを作成します。OceanBaseデータベースのtest_ob_to_mysqlテーブルのtbl1とtbl2について、これらの基盤となるデータベーステーブルのデータを同期するために、Flink SQL CLIを使用して対応するテーブルを作成します。
Flink SQL> CREATE TABLE ob_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl1',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE ob_tbl2 (col1 INT PRIMARY KEY,
col2 VARCHAR(20))
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl2',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
OceanBase CDCのWITHオプションの詳細については、OceanBase CDC Connectorを参照してください。
MySQL CDCテーブルの作成
Flink SQL CLIでMySQLデータベースに対応するテーブルを作成します。同期されたデータをMySQLデータベースに書き込むために、ob_tbl1_and_tbl2 テーブルを作成します。
Flink SQL> CREATE TABLE ob_tbl1_and_tbl2(
col1 INT PRIMARY KEY,
col2 INT,col3 VARCHAR(20),
col4 VARCHAR(20))
WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test_ob_to_mysql',
'username' = 'root',
'password' = '******',
'table-name' = 'ob_tbl1_and_tbl2');
[INFO] Execute statement succeed.
JDBC SQL Connector WITHオプションの詳細については、JDBC SQL Connectorを参照してください。
Flink SQL CLIでMySQLデータベースにデータを書き込む
Flink SQLを使用して、tbl1テーブルとtbl2テーブルを関連付け、その後の情報をMySQLデータベースに書き込みます。
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
SELECT t1.col1,t1.col3,t1.col2,t2.col2
FROM ob_tbl1 t1,ob_tbl2 t2
WHERE t1.col3=t2.col1;
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
> SELECT t1.col1,t1.col3,t1.col2,t2.col2
> FROM ob_tbl1 t1,ob_tbl2 t2
> WHERE t1.col3=t2.col1;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver`. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver`. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9cd180a65cb4e2c4d1a5a91465aa38a3
説明
このドキュメントのテスト例で使用されるMySQLドライバー(com.mysql.jdbc.Driver)は、MySQL Connector/J 5.1.47バージョンです。新しいバージョンのMySQLドライバー(com.mysql.cj.jdbc.Driver)では、MySQL Connector/J 8.xバージョンを使用してください。
関連データのMySQLデータベースへの書き込み状況を確認する
MySQLデータベースにログインし、test_ob_to_mysql データベース内のテーブル ob_tbl1_and_tbl2 のデータを確認します。
MySQL [test_ob_to_mysql]> SELECT * FROM ob_tbl1_and_tbl2;
+------+------+-------------+------+
| col1 | col2 | col3 | col4 |
+------+------+-------------+------+
| 1 | 86 | China | +86 |
| 2 | 886 | Taiwan | +886 |
| 3 | 852 | Hong Kong | +852 |
| 4 | 853 | Macao | +853 |
| 5 | 850 | North Korea | +850 |
+------+------+-------------+------+
5 rows in set