OceanBase Spark Connectorはバージョン1.1からSpark Catalogを全面的にサポートしています。Spark Catalogを使用することで、ユーザーはより簡潔かつ一貫性のある方法でOceanBaseデータベースにアクセスし、操作できます。
このドキュメントでは、OceanBase Spark Catalogの設定と使用を迅速に完了する方法を説明します。このチュートリアルを通じて、Spark Catalogを使用してOceanBaseデータベースにアクセスし、操作する方法を習得できます。
前提条件
OceanBaseデータベースのデプロイが完了し、MySQLモードのユーザーテナントが作成されていること。テナント作成の詳細については、テナントの作成を参照してください。
手順
ステップ1:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者から、該当するデータベース接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータ説明:
$host:OceanBaseデータベースへの接続IPアドレス。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式ではODPアドレスを使用し、直接接続方式ではOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースへの接続ポート。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセス対象のデータベース名。注意
テナントに接続するユーザーは、データベースに対する
CREATE、INSERT、DROP、およびSELECT権限が付与されていなければなりません。その他のユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式:ユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名。直接接続方式の形式:ユーザー名@テナント名。$password:アカウントのパスワード。
その他の接続文字列の詳細については、OBClientを使用したOceanBaseテナントへの接続を参照してください。
ステップ2:Spark環境の準備
Spark 3.4.4をダウンロードし、指定のディレクトリに解凍します。以下のコマンドを使用してSparkディレクトリに移動し、SPARK_HOME 環境変数をSparkの解凍後のディレクトリに設定します。
以下の手順に従ってSparkをダウンロードし、設定してください:
Spark 3.4.4をダウンロードします。
wget https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgzspark-3.4.4-bin-hadoop3.tgzを指定のディレクトリに解凍します。tar -zxvf spark-3.4.4-bin-hadoop3.tgz -C $SPARK_HOME例:
spark-3.4.4-bin-hadoop3.tgzを指定のディレクトリ/home/admin/test_spark_catalogに解凍します。tar -zxvf spark-3.4.4-bin-hadoop3.tgz -C /home/admin/test_spark_catalogSPARK_HOME環境変数をSparkの解凍後のディレクトリに設定します。export SPARK_HOME=$(pwd)例:
export SPARK_HOME=/home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3
ステップ3:OceanBase Catalogを設定する
OceanBase Spark Connectorをダウンロードします。
対応するspark-connector-oceanbase Jarパッケージをダウンロードします。
このJarパッケージをSpark Homeのjarsディレクトリに移動します:
cp spark-connector-oceanbase-3.4_2.12-1.1.jar $SPARK_HOME/jars/
または、
spark-3.4.4-bin-hadoop3.tgzの解凍先ディレクトリから直接spark-connector-oceanbase Jarパッケージをダウンロードすることもできます。Spark Homeのjarsディレクトリに移動します。
例:
cd /home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/jarsspark-connector-oceanbaseをダウンロードします。
wget https://repo1.maven.org/maven2/com/oceanbase/spark-connector-oceanbase-3.4_2.12/1.1/spark-connector-oceanbase-3.4_2.12-1.1.jar
MySQLドライバーをダウンロードします。
MySQLドライバー Jarパッケージをダウンロードします。
このJarパッケージをSpark Homeのjarsディレクトリに移動します:
cp mysql-connector-j-8.2.0.jar $SPARK_HOME/jars/
または、
spark-3.4.4-bin-hadoop3.tgzの解凍先ディレクトリから直接MySQLドライバーJarパッケージをダウンロードすることもできます。Spark Homeのjarsディレクトリに移動します。
例:
cd /home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/jarsspark-connector-oceanbaseをダウンロードします。
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.2.0/mysql-connector-j-8.2.0.jar
Spark設定ファイルを編集します。デフォルトは
$SPARK_HOME/conf/spark-defaults.confです。例:
説明
例のIPアドレスとパスワードはマスキングされています。検証時にはご自身のデータベースの実環境に合わせて入力してください。
$SPARK_HOME/confディレクトリに移動します。cd /home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/confOceanBase Catalogを設定します。
vi spark-defaults.conf以下の内容を入力します:
spark.sql.catalog.ob=com.oceanbase.spark.catalog.OceanBaseCatalog spark.sql.catalog.ob.url=jdbc:mysql://10.10.10.1:2881 spark.sql.catalog.ob.username=root@mysql001 spark.sql.catalog.ob.password=****** spark.sql.catalog.ob.schema-name=test
Spark SQL CLIを起動します。
$SPARK_HOME/bin/spark-sql例:
/home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/bin/spark-sql実行結果は次のとおりです:
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/04/07 15:52:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/04/07 15:52:05 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 25/04/07 15:52:05 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist 25/04/07 15:52:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0 25/04/07 15:52:09 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore admin@10.10.10.1 25/04/07 15:52:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException Spark master: local[*], Application Id: local-1744012324296 spark-sql (default)>その後、Spark SQL CLIで
use ob;を入力し、手順3で設定したOceanBase catalogに切り替えます。spark-sql (default)> use ob;実行結果は次のとおりです:
25/04/07 16:11:06 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Time taken: 2.544 seconds spark-sql (test)>
ステップ4:Spark CLIを使用してOceanBase Catalogを利用する
Spark-SQLを使用してOceanBaseデータベースのテーブルを作成する
Spark SQL CLIに以下のテーブル作成ステートメントを入力します:
spark-sql (test)> CREATE TABLE orders (
order_id INT COMMENT 'order id',
order_date TIMESTAMP,
customer_name STRING,
price DOUBLE,
product_id INT,
order_status BOOLEAN);
実行結果は次のとおりです:
25/04/07 16:14:25 WARN OceanBaseMySQLDialect: Ignored unsupported table property: owner
Time taken: 0.085 seconds
実行が成功した後、OBClientを使用してOceanBaseデータベースに接続すると、
ordersテーブルが作成されていることが確認できます。$obclient -h10.10.10.1 -P2881 -uroot@mysql001 -p****** -A test Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221487689 Server version: OceanBase 4.2.1.8 (r108020012024111712-585a11c3514ac7882b041453a529050ac62c6180) (Built Nov 17 2024 12:49:45) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [test]> show tables; +----------------+ | Tables_in_test | +----------------+ | orders | +----------------+ 1 row in setSpark SQL CLIで
show tables;を入力すると、先ほど作成したordersテーブルを確認できます:spark-sql (test)> show tables;実行結果は次のとおりです:
orders Time taken: 0.044 seconds, Fetched 1 row(s)
Spark-SQLを使用してOceanBaseデータベースのテーブルにデータを書き込む
Spark SQL CLIで以下の書き込みステートメントを実行します:
spark-sql (test)> INSERT INTO orders VALUES
(1, now(), 'zs', 12.2, 12, true),
(2, now(), 'ls', 121.2, 12, true),
(3, now(), 'xx', 123.2, 12, true),
(4, now(), 'jac', 124.2, 12, false),
(5, now(), 'dot', 111.25, 12, true);
結果は次のとおりです:
Time taken: 1.433 seconds
その後、OBClientを使用してOceanBaseデータベースに接続すると、orders テーブルのデータをクエリできます。
obclient [test]> SELECT * FROM orders;
結果は次のとおりです:
+----------+---------------------+---------------+--------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+--------+------------+--------------+
| 1 | 2025-04-07 16:22:34 | zs | 12.2 | 12 | 1 |
| 2 | 2025-04-07 16:22:34 | ls | 121.2 | 12 | 1 |
| 4 | 2025-04-07 16:22:34 | jac | 124.2 | 12 | 0 |
| 5 | 2025-04-07 16:22:34 | dot | 111.25 | 12 | 1 |
| 3 | 2025-04-07 16:22:34 | xx | 123.2 | 12 | 1 |
+----------+---------------------+---------------+--------+------------+--------------+
5 rows in set
Spark-SQLを使用してOceanBaseデータベースのテーブルをクエリする
Spark SQL CLIで上記に書き込んだデータをクエリします。
spark-sql (test)> SELECT * FROM orders;
結果は次のとおりです:
1 2025-04-07 16:22:34 zs 12.2 12 true
2 2025-04-07 16:22:34 ls 121.2 12 true
4 2025-04-07 16:22:34 jac 124.2 12 false
5 2025-04-07 16:22:34 dot 111.25 12 true
3 2025-04-07 16:22:34 xx 123.2 12 true
Time taken: 0.462 seconds, Fetched 5 row(s)
ステップ5:外部システムのデータをOceanBaseデータベースに同期する
Hiveテーブルの作成とデータの挿入
Spark SQL CLIで以下のステートメントを実行してHiveテーブルを作成します。
spark-sql (test)> DROP TABLE spark_catalog.default.orders; Time taken: 0.827 seconds spark-sql (test)> CREATE TABLE spark_catalog.default.orders ( order_id INT, order_date TIMESTAMP, customer_name STRING, price DOUBLE, product_id INT, order_status BOOLEAN ) USING PARQUET;実行結果は次のとおりです。
25/04/07 16:37:54 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory. 25/04/07 16:37:54 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist 25/04/07 16:37:54 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 25/04/07 16:37:54 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist Time taken: 0.457 secondsSpark SQL CLIで以下のステートメントを実行して、Hiveテーブルにデータを挿入します。
spark-sql (test)> INSERT INTO spark_catalog.default.orders VALUES (1, now(), 'zs', 12.2, 12, true), (2, now(), 'ls', 121.2, 12, true), (3, now(), 'xx', 123.2, 12, true), (4, now(), 'jac', 124.2, 12, false), (5, now(), 'dot', 111.25, 12, true);実行結果は次のとおりです。
Time taken: 1.378 seconds
HiveテーブルデータをOceanBaseデータベースに同期する
Spark SQL CLIで以下の書き込みステートメントを実行します。
spark-sql (test)> INSERT INTO orders
SELECT * FROM spark_catalog.default.orders;
戻り結果は次のとおりです:
Time taken: 0.523 seconds
OBClientを使用してOceanBaseデータベースに接続し、orders テーブルのデータをクエリすると、データが正常に同期されたことが確認できます。
obclient [test]> SELECT * FROM orders;
戻り結果は次のとおりです:
+----------+---------------------+---------------+--------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+--------+------------+--------------+
| 1 | 2025-04-07 16:22:34 | zs | 12.2 | 12 | 1 |
| 2 | 2025-04-07 16:22:34 | ls | 121.2 | 12 | 1 |
| 4 | 2025-04-07 16:22:34 | jac | 124.2 | 12 | 0 |
| 5 | 2025-04-07 16:22:34 | dot | 111.25 | 12 | 1 |
| 3 | 2025-04-07 16:22:34 | xx | 123.2 | 12 | 1 |
| 4 | 2025-04-07 16:42:04 | jac | 124.2 | 12 | 0 |
| 5 | 2025-04-07 16:42:04 | dot | 111.25 | 12 | 1 |
| 1 | 2025-04-07 16:42:04 | zs | 12.2 | 12 | 1 |
| 2 | 2025-04-07 16:42:04 | ls | 121.2 | 12 | 1 |
| 3 | 2025-04-07 16:42:04 | xx | 123.2 | 12 | 1 |
+----------+---------------------+---------------+--------+------------+--------------+
10 rows in set
環境のクリーンアップ
チュートリアル終了後、必要に応じて以下のコマンドを使用してSpark-SQL CLIを停止できます。
spark-sql (test)> quit;