Debeziumは、データベースの変更を監視し、データ変動イベントをキャプチャして、イベントストリームとしてさまざまなコンシューマーにエクスポートするためのオープンソースの分散型プラットフォームです。DebeziumはApache Kafkaをベースに実装されており、多様なデータベースシステムをサポートしています。
前提条件
Debeziumを使用する前に、oblogproxyがインストールされていることを確認してください。
使用例
本記事では、Dockerを使用してDebeziumをOceanBaseに接続し、データを取得する方法を簡単に説明します。このデプロイ方法は本番環境での直接使用には適していません。以下の手順に従って操作してください:
Zookeeperを起動します。
docker run -it -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4Kafkaを起動します。
docker run -it -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4Kafka Connectサービスを起動します。このサービスは、Debezium MySQLコネクタを管理するためのREST APIを公開します。
docker run -it -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka quay.io/debezium/connect:2.4payload.jsonファイルを新規作成し、以下のサンプル設定を記述します。{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxxx.cn-hangzhou.oceanbase.aliyuncs.com", "database.port": "3306", "database.user": "root", "database.password": "xxxx", "database.server.id": "1", "topic.prefix": "observer1", "database.include.list": "debe", "table.include.list":"debe.earthquake", "schema.history.internal.store.only.captured.tables.ddl":true, "schema.history.internal.skip.unparseable.ddl":true, "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "snapshot.locking.mode": "none" } }
注意
- MySQLコネクタはOceanBaseの
PURGE TABLEなどの構文を認識できません。schema.history.internal.skip.unparseable.ddlオプションを設定することで回避できます。 - OceanBase独自の仮想テーブルやビューを同期したくない場合があります。
table.include.list または table.exclude.listパラメータを使用してフィルタリングできます。 - curlコマンドを使用して、Kafka ConnectサービスのAPIにPOSTリクエストを送信し、/connectorsリソースに上記の新しいコネクタのJSONドキュメントを添付します。
cat payload.json | tr -d "\n" | curl -X POST -H "Content-Type: application/json" -d @- localhost:8083/connectors/ - コネクタを登録すると、Kafka Connectコンテナ内で大量のログ出力が生成されます。これらの出力から、コネクタが作成されてからOceanBaseのBinlogの読み取りを開始するまでのプロセスを確認できます。
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k observer1.debe.earthquake