DataXは、Alibaba Cloud DataWorksのデータ統合機能のオープンソース版であり、Alibabaグループ内で広く利用されているオフラインデータ同期ツール/プラットフォームです。DataXは、MySQL、Oracle、SQL Server、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS、OceanBaseなど、さまざまな異種データソース間で効率的なデータ同期機能を実現しています。
OceanBaseデータベースCommunity Editionのお客様は、DataXオープンソースサイトからソースコードをダウンロードし、ご自身でコンパイルすることができます。コンパイル時には、必要に応じてpom.xml内で使用しないデータベースプラグインを除外することができます。そうしない場合、コンパイルされたバンドルは非常に大きくなります。
フレームワーク設計
DataXはオフラインデータ同期フレームワークとして、「Framework + Plugin」モデルで構築されています。データソースの読み取りと書き込みをReader/Writerプラグインとして抽象化し、同期フレームワーク全体に組み込んでいます。
Readerはデータ収集モジュールとして、データソースからデータを収集し、データをFrameworkに送信します。
Writerはデータ書き込みモジュールとして、Frameworkからデータを継続的に取得し、データを宛先に書き込みます。
FrameworkはReaderとWriterを接続し、両者間のデータ転送チャネルとして機能するとともに、バッファリング、ストリーミング制御、並行処理、データ変換などのコア技術的課題を処理します。
DataXはタスク単位でデータを移行します。各タスクは1つのテーブルのみを処理し、各タスクにはjson形式の設定ファイルがあり、その設定ファイルにはreaderとwriterの2部分が含まれます。readerとwriterはそれぞれ、DataXがサポートするデータベースの読み書きプラグインです。例えば、OceanBaseのテーブルデータをOracleデータベースに移行する場合、OceanBaseデータベースからデータを読み取ってOracleデータベースに書き込むため、使用するプラグインはOceanBaseデータベース用のoceanbasev10readerプラグインとOracleデータベース用のoraclewriterプラグインを組み合わせて使用します。ここではoceanbasev10readerとoraclewriterプラグインについて説明します。
oceanbasev10readerプラグイン
実装原理に関して、oceanbasev10readerはDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、INSERT文を生成します。データ書き込み時に主キーまたは一意キーの競合が発生した場合、OceanBaseデータベースのMySQLテナントはreplaceモードを使用してテーブル内のすべてのフィールドを更新できます。一方、OceanBaseデータベースのOracleテナントでは現在、Insert方式しか利用できません。
oceanbasev10readerプラグインは、OceanBaseデータベースからデータを読み取る機能を実装しています。下層の実装において、oceanbasev10readerプラグインはJavaクライアント(下層のMySQL JDBCまたはOceanBase Client)を使用してobproxy経由でOceanBaseデータベースにリモート接続し、対応するSQL文を実行してOceanBaseデータベースからデータをSELECTします。
簡単に言えば、OceanBaseデータベースはJavaクライアント(下層のMySQL JDBCまたはOceanBase Client)を使用してobproxy経由でOceanBaseデータベースにリモート接続し、ユーザー設定情報に基づいてクエリ文を生成してから、リモートのOceanBaseデータベースに送信します。リモートのOceanBaseデータベースは、そのSQL文の実行結果をDataX独自のデータ型を用いて抽象化されたデータセットとして組み立て、次のWriter処理に渡します。
oraclewriterプラグイン
oraclewriterプラグインは、データをOracleメインデータベースのターゲットテーブルに書き込む機能を実装しています。下層の実装において、oraclewriterプラグインはJDBCを使用してリモートのOracleデータベースに接続し、対応するinsert into ...文を実行してデータをOracleデータベースに書き込みます。Oracleデータベース内部では、データはバッチごとにコミットされます。
実装原理に関して、OracleWriterはDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、ユーザーの設定に基づいて対応するSQL文insert into...を生成して、データをOracleメインデータベースのターゲットテーブルに挿入します。
詳細な機能とパラメータの説明については、公式ドキュメントを参照してください:oraclewriterプラグイン。
DataX設定ファイル
設定ファイルの例:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello、你好、世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
注意
DataXはテーブルデータのみを移行します。ターゲット側に対応するテーブルオブジェクト構造を事前に作成しておく必要があります。
json 設定ファイルをDataXのディレクトリ job またはカスタムパスに配置します。実行方法は以下のとおりです:
$bin/datax.py job/stream2stream.json
出力情報:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
タスク開始時刻 : 2021-08-26 11:05:59
タスク終了時刻 : 2021-08-26 11:06:09
タスク合計時間 : 10s
タスク平均流量 : 38B/s
レコード書き込み速度 : 2rec/s
読み取りレコード総数 : 20
読み書き失敗総数 : 0
DataXタスクの実行終了時には、上記の出力に含まれる平均流量、書き込み速度、読み書き失敗総数などを含む簡単なタスクレポートが生成されます。
DataXの job のパラメータ settings では、速度パラメータやエラーレコードの許容度などを指定できます。
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
パラメータの説明:
errorLimitはエラー報告レコード数の許容度を示し、この制限を超えた場合、タスクは中断して終了します。channelは並列数であり、理論的には並列数が多いほど移行性能が向上します。しかし、実際の運用ではソース側の読み取り負荷、ネットワーク転送性能、およびターゲット側の書き込み性能も考慮する必要があります。
環境準備
tarパッケージのダウンロードURL:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
インストールファイルの解凍:
tar zxvf datax.tar.gz
cd datax
ディレクトリ構成は以下のとおりです:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
インストールファイル内には、以下のいくつかのディレクトリがあります。
| ディレクトリ名 | 説明 |
|---|---|
| bin | 実行可能ファイルのディレクトリ。このディレクトリ内のdatax.pyは、DataXタスクの起動スクリプトです。 |
| conf | ログファイル設定ディレクトリ。このディレクトリには、dataxとタスクに関係ない設定ファイルが保存されています。 |
| lib | 実行時に依存するパッケージ。このディレクトリには、DataXの実行に必要なグローバルjarファイルが保存されています。 |
| job | このディレクトリには、dataxのインストールをテスト検証するためのタスク設定ファイルが1つあります。 |
| log | ログファイルディレクトリ。このディレクトリには、dataxタスクの実行ログが保存されています。dataxの実行時、デフォルトではログが標準出力に出力されると同時にlogディレクトリにも書き込まれます。 |
| plugin | プラグインファイルディレクトリ。このディレクトリには、DataXがサポートするさまざまなデータソースプラグインが保存されています。 |
DataXを使用してOceanBaseデータをOracleデータベースに移行する例
OceanBaseデータをOracleデータベースに移行する際、ソース側とターゲット側が同時にDataXサーバーとネットワーク接続できない場合は、CSVファイルを経由して移行できます。ソース側のデータベースとターゲット側のデータベースが同時にDataXサーバーと接続できる場合は、DataXを使用して直接ソースからターゲットへデータを移行できます。
例:OceanBaseからtest.t1テーブルのデータをOracleデータベースのtest.t1に移行します。
myjob.json設定ファイルは以下のとおりです:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"username": "******",
"password": "******",
"column": ["*"],
"connection": [
{
"table": ["t1"],
"jdbcUrl": ["jdbc:oceanbase://172.30.xxx.xxx:2883/test"]
}
]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": ["truncate table t1"],
"connection": [
{
"jdbcUrl": "jdbc:oracle:thin:@172.30.xxx.xxx:1521:test",
"table": ["TEST.T1"]
}
],
"username": "******",
"password":"******",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
パラメータの説明
| パラメータ | 説明 |
|---|---|
| name | データベースに接続するためのリーダーまたはライターに対応するデータベースプラグインの名前を記述します。そのうち、MySQLのリーダープラグインはmysqlreader、OceanBaseのライタープラグインはoceanbasev10writerです。具体的なリーダーとライターのプラグインについては、dataxのドキュメントを参照してください:DataXデータソースガイド。 |
| jdbcUrl | 接続先のデータベースへのJDBC情報を記述し、JSON配列で表現します。また、1つのデータベースに複数の接続アドレスを入力することも可能です。JSON配列には、1つのJDBC接続情報を入力するだけで構いません。jdbcUrlはMySQL公式仕様に従っており、接続属性の制御情報も入力できます。詳細については、MySQL公式ドキュメントを参照してください。
注意
|
| username | データソースのユーザー名 |
| password | データソースの指定されたユーザー名のパスワード |
| table | 同期するテーブルを選択します。JSON配列で表現するため、複数のテーブルを同時に抽出することができます。複数のテーブルを設定する場合、ユーザー自身が複数のテーブルが同一のスキーマ構造であることを確認する必要があります。MySQLReaderは、テーブルが同一の論理テーブルであるかどうかをチェックしません。
注意tableはconnection設定ユニットに含める必要があります。
|
| column | 設定したテーブル内で同期する列名の集合を、JSON配列でフィールド情報を記述します。columnsは['*']として設定することは推奨されません。これは、テーブル構造に変更があるとこの設定も変更されてしまうためです。推奨される設定方法は、具体的な列名を指定することです。列のカットオフ、つまり一部の列のみをエクスポートすることがサポートされています。列の並べ替え、つまりテーブルのスキーマ情報に従わずに列をエクスポートすることもサポートされています。定数設定もサポートされており、ユーザーはMySQL SQL構文に従って設定する必要があります:["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]。
説明
|
| where | フィルタ条件。MySQLReaderは指定されたcolumn、table、where条件に基づいてSQLを連結し、このSQLに基づいてデータ抽出を行います。実際のビジネスシナリオでは、通常当日のデータを同期することを選択します。その場合、where条件をgmt_create > $bizdateと指定できます。
注意
|
jobファイルを設定した後、そのjobを実行します。コマンドは以下のとおりです:
python datax.py ../job/myjob.json
詳細情報
DataXのオープンソースコードおよびその他の詳細については、DataXを参照してください。