Debeziumは、データベースの変更をモニタリングし、データ変更イベントを捉えて、イベントストリーム形式でエクスポートし、さまざまなコンシューマーに配信するためのオープンソースの分散プラットフォームです。DebeziumはApache Kafkaを基盤としており、複数のデータベースシステムをサポートします。
前提条件
Debeziumを使用する前に、oblogproxyがインストールされていることを確認してください。
使用例
本記事では、Dockerを使用してDebeziumをOceanBaseに接続し、データを取得する方法を簡単に示します。このデプロイ方法は、本番環境での直接利用には適していません。以下の手順に従って操作します:
Zookeeperを起動する
docker run -it -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4Kafkaを起動する
docker run -it -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4Kafka Connectサービスを起動します。このサービスは、Debezium MySQLコネクタを管理するためのREST APIを公開しています。
docker run -it -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka quay.io/debezium/connect:2.4新たに
payload.jsonファイルを作成し、以下の設定例を記述します。{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxxx.cn-hangzhou.oceanbase.aliyuncs.com", "database.port": "3306", "database.user": "root", "database.password": "xxxx", "database.server.id": "1", "topic.prefix": "observer1", "database.include.list": "debe", "table.include.list":"debe.earthquake", "schema.history.internal.store.only.captured.tables.ddl":true, "schema.history.internal.skip.unparseable.ddl":true, "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "snapshot.locking.mode": "none" } }
注意
- MySQLコネクタはOceanBaseの
PURGE TABLEなどの構文を認識できません。schema.history.internal.skip.unparseable.ddlオプションを設定することで回避できます。 - OceanBase独自の仮想テーブルとビューを同期させたくない場合は、
table.include.listまたはtable.exclude.listパラメータを使用してフィルタリングできます。 - curlコマンドを使用して、Kafka ConnectサービスのAPIに対して /connectorsリソースへのPOSTリクエストを送信し、上記の新規コネクタのJSONドキュメントを添付します。
cat payload.json | tr -d "\n" | curl -X POST -H "Content-Type: application/json" -d @- localhost:8083/connectors/ - コネクタを登録すると、Kafka Connectコンテナで大量のログが出力されます。これらの出力から、コネクタがOceanBaseのBinlogの作成から読み込みを開始するまでのプロセスを観察できます。
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k observer1.debe.earthquake