概要
RisingWaveは、クラウド環境向けに設計された分散SQLストリーム処理データベースであり、リアルタイムアプリケーションの開発コストを削減することを目指しています。 本記事では、RisingWaveのMySQL-CDC(Change Data Capture)機能を利用してOceanBaseデータベースに接続する方法について説明します。CDC技術により、RisingWaveはOceanBase MySQLデータベース内のデータ変更イベントをリアルタイムでキャプチャし、これらの変更をストリーム形式で処理および分析することができます。
バージョン互換性
- OceanBaseデータベースバージョン:V4.2.1 BP10以降、V4.2.5 BP1以降。
使用上の制限
RisingWave CDCを使用してOceanBaseデータにアクセスする際は、以下の制限にご注意ください:
- 権限要件:OceanBaseデータベースに接続するユーザーは、
SELECT、REPLICATION SLAVE、REPLICATION CLIENT権限が付与されていなければなりません。 - server.id の一意性:
server.idパラメータはデータベース内で一意である必要があります。通常は1000以上のランダムな数値を指定します。 - データ型のマッピング:OceanBaseのデータ型とRisingWaveのデータ型の間にはマッピング関係があり、一部の型は調整が必要になる場合があります。
前提条件
RisingWaveを使用する前に、次のことを確認してください:
- OceanBaseデータベースのデプロイが完了し、MySQLモードのユーザーテナントが作成されていること。テナント作成の詳細については、テナントの作成を参照してください。
- 作成したMySQL互換モードのテナントでBinlogサービスが有効になっていること。詳細については、OceanBase Binlogサービスを参照してください。
- PostgreSQLコマンドラインクライアントツールであるpsqlがインストール済みであり、そのバージョンがターゲットデータベースと互換性があること。インストール状態は端末で
psql --versionを実行して確認できます。 - Dockerのデプロイが完了し、Dockerサービスが実行中であり、現在のユーザーがdockerコマンドを実行する権限を持っていること(docker infoで確認可能)。
手順
ステップ1:データベース接続文字列を取得する
OceanBaseデータベースのデプロイ担当者から接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータ説明:
$host:接続IPアドレス。ODP接続の場合はODPアドレスを使用し、直接接続の場合はOBServer IPを使用します。$port:接続ポート。ODPのデフォルトは2883、直接接続のデフォルトは2881です。$database_name:データベース名。注意
テナントに接続するユーザーには、データベースに対する
CREATE、INSERT、DROP、REPLICATION SLAVE、REPLICATION CLIENT、およびSELECT権限が付与されていなければなりません。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:接続アカウント。ODP形式:ユーザー@テナント#クラスタまたはクラスタ:テナント:ユーザー。直接接続形式:ユーザー@テナント。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
例:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
ステップ2:RisingWaveのデプロイ
Dockerを使用してRisingWaveをデプロイします:
sudo docker run -d --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest single_node
# RisingWaveが正常に起動したかどうかを確認します。
sudo netstat -ntlp | grep 4566
このコマンドは、RisingWaveの単一ノードインスタンスを起動し、ポート4566(RisingWaveサービス)と5691(監視インターフェース)をリッスンします。
ステップ3:OceanBaseデータベースのデータ準備
OceanBaseデータベースで、CDC用のユーザーとテストテーブルを作成します。
CREATE USER '$rs_name'@'%' IDENTIFIED BY '$rs_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '$rs_name'@'%';
FLUSH PRIVILEGES;
create table tbl_t1 (
c01 int,
c02 char,
c03 varchar(10),
c04 text,
c05 bigint,
c06 long,
c07 numeric(10, 2),
c08 decimal(20, 8),
c09 date,
c10 datetime,
c11 datetime(3),
c12 timestamp(6),
primary key (c01, c02)
);
insert into tbl_t1 (c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12)
values (1, 'a', 'abc', 'abcdefg', 123456789000, 99999999999999, 123.456, 12345.6789,
current_date,
current_timestamp,
'2024-01-01 18:18:18.888',
'2024-01-01 06:06:06.666666');
select * from tbl_t1;
ステップ4:RisingWaveをOceanBaseに接続する
psql を使用してRisingWaveに接続します:
psql -h 127.0.0.1 -p 4566 -d dev -U root
RisingWaveでスキーマとMySQL-CDCソースを作成します:
-- スキーマを作成
create schema rwtest;
-- MySQL-CDCソースを作成
drop source if exists rwtest.mysql_cdc cascade;
create source rwtest.mysql_cdc
with (
connector = 'mysql-cdc',
hostname = '$host',
port = '$port',
username = '$rs_name',
password = '$rs_password',
database.name = '$database_name',
server.id = 1001, -- 通常は1000以上のランダムな数値を指定します。データベース内でserver.idが一意であることを確認してください。
--DebeZiumパラメータの設定を指定できます。例えば、未知のDDLステートメントをスキップするように設定できます。
debezium.schema.history.internal.store.only.captured.tables.ddl = 'true'
);
show sources from rwtest;
-- OceanBaseのテーブルにマッピングするためのCDCテーブルを作成
DROP TABLE IF EXISTS rwtest.test_mysql_cdc;
CREATE TABLE rwtest.test_mysql_cdc (
c01 int,
c02 string, -- 文字列型は精度を定義する必要はありません
c03 string,
c04 string,
c05 bigint,
c06 string,
c07 numeric, -- 数字は精度を定義する必要はありません
c08 numeric,
c09 date,
c10 timestamp, -- タイムスタンプ型は精度を定義する必要はありません
c11 timestamp,
c12 timestamptz,
PRIMARY KEY (c01, c02)
) FROM rwtest.mysql_cdc TABLE '$database_name.tbl_t1';
select * from rwtest.test_mysql_cdc;
結果の検証
CDCテーブルをクエリしてデータ同期結果を検証します。
dev=> select * from rwtest.test_mysql_cdc;
c01 | c02 | c03 | c04 | c05 | c06 | c07 | c08 | c09 | c10 | c11 |
c12
-----+-----+-----+---------+--------------+----------------+--------+----------------+------------+---------------------+-------------------------+-------
---------------------------
1 | a | abc | abcdefg | 123456789000 | 99999999999999 | 123.46 | 12345.67890000 | 2026-02-05 | 2026-02-05 14:17:17 | 2024-01-01 18:18:18.888 | 2023-1
2-31 22:06:06.666666+00:00
(1 row)
関連ドキュメント
その他の使用方法については、RisingWave公式ドキュメントを参照してください。