Flink CDC(Apache Flink用CDCコネクタ)は、Apache Flinkの一連のソースコネクタであり、ほとんどのデータベースから既存の履歴データと増分変更データをリアルタイムで読み取ることをサポートします。Flink CDCは、データベースのフルデータおよび増分データをメッセージキューおよびデータウェアハウスに同期できます。また、Flink CDCはリアルタイムデータ統合にも使用でき、データベースデータをリアルタイムでデータレイクやデータウェアハウスにインポートするために利用できます。さらに、Flink CDCはデータ処理もサポートしており、そのSQLクライアントを使用してデータベースデータに対してリアルタイムの関連付け、幅広げ、集計を行い、結果をさまざまなストレージに書き込むことができます。CDC(Change Data Capture、つまり変更データキャプチャ)は、データベースの変更を監視し、キャプチャするのに役立ちます。CDCが提供するデータは、履歴データベースの作成、準リアルタイムキャッシュの作成、メッセージキュー(MQ)への提供、ユーザーによるMQの消費と分析・監査など、多くの用途に活用できます。
以下では、Flink CDCを使用してMySQLデータベースからOceanBaseデータベースへデータを同期する方法について説明します。
Flink CDC環境の準備
Flinkと必要な依存パッケージをダウンロードします:
ダウンロードURLからFlinkをダウンロードします。本記事ではFlink 1.15.3を使用し、ディレクトリ
/FLINK_HOME/flink-1.15.3に解凍します。以下に記載されている依存パッケージをダウンロードし、ディレクトリ
/FLINK_HOME/flink-1.15.3/lib/に配置します。
データの準備
MySQLデータベースのデータ準備
OceanBaseデータベースにインポートするためのソースデータとして、MySQLデータベースにテストデータを準備します。
MySQLデータベースにアクセスします。
[xxx@xxx /...] $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p****** <Omit echo information> MySQL [(none)]>データベース
test_mysql_to_ob、テーブルtbl1およびtbl2を作成し、データを挿入します。MySQL [(none)]> CREATE DATABASE test_mysql_to_ob; Query OK, 1 row affected MySQL [(none)]> USE test_mysql_to_ob; Database changed MySQL [test_mysql_to_ob]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT); Query OK, 0 rows affected MySQL [test_mysql_to_ob]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0 MySQL [test_mysql_to_ob]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20)); Query OK, 0 rows affected MySQL [test_mysql_to_ob]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850'); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0
OceanBaseデータベースのデータの準備
OceanBaseデータベースに、ソースデータを格納するテーブルを作成します。
OceanBaseデータベースにログインします。
user001ユーザーでクラスタのmysql001テナントにログインします。[xxx@xxx /...] $obclient -h10.10.10.2 -P2881 -uuser001@mysql001 -p -A Enter password: Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221536981 Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [(none)]>データベース
test_mysql_to_obとテーブルmysql_tbl1_and_tbl2を作成します。obclient [(none)]> CREATE DATABASE test_mysql_to_ob; Query OK, 1 row affected obclient [(none)]> USE test_mysql_to_ob; Database changed obclient [test_mysql_to_ob]> CREATE TABLE mysql_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20)); Query OK, 0 rows affected
FlinkクラスタとFlink SQL CLIの起動
以下のコマンドを使用して、Flinkディレクトリに移動します。
[xxx@xxx /FLINK_HOME] #cd flink-1.15.3以下のコマンドを使用して、Flinkクラスタを起動します。
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/start-cluster.sh起動が成功した場合、
http://localhost:8081/でFlink Web UIにアクセスできます。以下のように表示されます:
説明
./bin/start-cluster.shを実行した後、次のメッセージが表示された場合:bash: ./bin/start-cluster.sh: Permission denied。flink-1.15.3ディレクトリ内のすべての-rw-rw-r--権限を持つファイルの権限を-rwxrwxrwx権限に設定する必要があります。例:
[xxx@xxx /FLINK_HOME/flink-1.15.3] # chmod -R 777 /FLINK_HOME/flink-1.15.3/*以下のコマンドを使用して、Flink SQL CLIを起動します。
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/sql-client.sh起動が成功すると、次のページが表示されます:

チェックポイントの設定
Flink SQL CLIでcheckpointを有効にし、3秒ごとにcheckpointを実行します。
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
MySQL CDCテーブルの作成
Flink SQL CLIでMySQLデータベースに対応するテーブルを作成します。
MySQLデータベース内のtest_mysql_to_obテーブルのtbl1とtbl2について、これらの下層データベーステーブルのデータを同期するために、Flink SQL CLIを使用して対応するテーブルを作成します。
Flink SQL> CREATE TABLE mysql_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxx.xxx.xxx.xxx',
'port' = '3306',
'username' = 'root',
'password' = '******',
'database-name' = 'test_mysql_to_ob',
'table-name' = 'tbl1');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE mysql_tbl2 (col1 INT PRIMARY KEY,
col2 VARCHAR(20))
WITH ('connector' = 'mysql-cdc',
'hostname' = 'xxx.xxx.xxx.xxx',
'port' = '3306',
'username' = 'root',
'password' = '******',
'database-name' = 'test_mysql_to_ob',
'table-name' = 'tbl2');
[INFO] Execute statement succeed.
MySQL CDC Connector WITHオプションの詳細については、Connector Optionsを参照してください。
OceanBase CDCテーブルの作成
Flink SQL CLIでOceanBaseデータベースに対応するテーブルを作成します。関連付けたデータをOceanBaseデータベースに書き込むために、mysql_tbl1_and_tbl2テーブルを作成します。
Flink SQL> CREATE TABLE mysql_tbl1_and_tbl2(
col1 INT PRIMARY KEY,
col2 INT,col3 VARCHAR(20),
col4 VARCHAR(20))
WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://10.10.10.2:2881/test_mysql_to_ob',
'username' = 'root@mysql001',
'password' = '******',
'table-name' = 'mysql_tbl1_and_tbl2');
[INFO] Execute statement succeed.
JDBC SQL Connector WITHオプションの詳細については、Connector Optionsを参照してください。
Flink SQL CLIを使用してOceanBaseデータベースにデータを書き込む
Flink SQLを使用して、tbl1テーブルとtbl2テーブルを関連付け、その後の情報をOceanBaseデータベースに書き込みます。
Flink SQL> INSERT INTO mysql_tbl1_and_tbl2
SELECT t1.col1,t1.col3,t1.col2,t2.col2
FROM mysql_tbl1 t1,mysql_tbl2 t2
WHERE t1.col3=t2.col1;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c5ee92498addf813858e448ec25e85af
説明
このドキュメントのテスト例で使用されているMySQLドライバー(com.mysql.jdbc.Driver)は、MySQL Connector/J 5.1.47バージョンです。新しいバージョンのMySQLドライバー(com.mysql.cj.jdbc.Driver)では、MySQL Connector/J 8.xバージョンを使用してください。
関連データのOceanBaseデータベースへの書き込み状況を確認する
OceanBaseデータベースにログインし、test_mysql_to_ob データベース内のテーブル mysql_tbl1_and_tbl2 のデータを確認します。
obclient [test_mysql_to_ob]> SELECT * FROM mysql_tbl1_and_tbl2;
+------+------+-------------+------+
| col1 | col2 | col3 | col4 |
+------+------+-------------+------+
| 1 | 86 | China | +86 |
| 2 | 886 | Taiwan | +886 |
| 3 | 852 | Hong Kong | +852 |
| 4 | 853 | Macao | +853 |
| 5 | 850 | North Korea | +850 |
+------+------+-------------+------+
5 rows in set
データの更新状況を確認する
MySQLデータベースのテーブル
tbl1とtbl2にそれぞれ1件ずつデータを挿入します。MySQL [test_mysql_to_ob]> INSERT INTO tbl1 VALUES(6,'code',673); Query OK, 1 row affected MySQL [test_mysql_to_ob]> INSERT INTO tbl2 VALUES(673,'+673'); Query OK, 1 row affectedOceanBaseデータベースでデータが同期されているかどうかを確認します。
obclient [test_mysql_to_ob]> SELECT * FROM mysql_tbl1_and_tbl2; +------+------+-------------+------+ | col1 | col2 | col3 | col4 | +------+------+-------------+------+ | 1 | 86 | China | +86 | | 2 | 886 | Taiwan | +886 | | 3 | 852 | Hong Kong | +852 | | 4 | 853 | Macao | +853 | | 5 | 850 | North Korea | +850 | | 6 | 673 | code | +673 | +------+------+-------------+------+ 6 rows in set