OceanBase Spark Connectorは、バージョン1.1からSpark Catalogを全面的にサポートしています。Spark Catalogを使用することで、ユーザーはOceanBaseデータベースにアクセスおよび操作するための、より簡潔で一貫性のある方法を利用できます。
本記事では、OceanBase Spark Catalogの設定と使用方法を迅速に完了するためのガイドを提供します。本チュートリアルでは、Spark Catalogを使用してOceanBaseデータベースにアクセスおよび操作する方法を習得します。
前提条件
OceanBaseデータベースがデプロイ済みで、MySQLモードのユーザーテナントが作成されていること。ユーザーテナントの作成の詳細については、テナントの作成を参照してください。
操作手順
ステップ1:データベース接続情報を取得する
OceanBaseデータベースのデプロイ担当者または管理者から、該当するデータベース接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータの説明:
$host:OceanBaseデータベースへの接続IPアドレスを提供します。OceanBaseデータベースプロキシ(OceanBase Database Proxy、ODP)接続方式はODPアドレスを使用し、直接接続方式はOBServerノードのIPアドレスを使用します。$port:OceanBaseデータベースへの接続ポートを提供します。ODP接続方式のデフォルトポートは2883で、ODPデプロイ時にカスタマイズ可能です。直接接続方式のデフォルトポートは2881で、OceanBaseデータベースのデプロイ時にカスタマイズ可能です。$database_name:アクセス対象のデータベース名。注意
テナントに接続するユーザーは、データベースに対する
CREATE、INSERT、DROP、およびSELECT権限が付与されていなければなりません。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:テナントの接続アカウント。ODP接続の一般的な形式:ユーザー名@テナント名#クラスタ名またはクラスタ名:テナント名:ユーザー名。直接接続方式の形式:ユーザー名@テナント名。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
ステップ2:Spark環境を準備する
Spark 3.4.4をダウンロードし、指定されたディレクトリに解凍します。以下のコマンドを使用してSparkのディレクトリに移動し、SPARK_HOME 環境変数をSparkの解凍済みディレクトリに設定します:
以下の手順に従ってSparkをダウンロードし、設定してください。
Spark 3.4.4をダウンロードします。
wget https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgzspark-3.4.4-bin-hadoop3.tgzを指定されたディレクトリに解凍します。tar -zxvf spark-3.4.4-bin-hadoop3.tgz -C $SPARK_HOME例:
spark-3.4.4-bin-hadoop3.tgzを指定されたディレクトリ/home/admin/test_spark_catalogに解凍します。tar -zxvf spark-3.4.4-bin-hadoop3.tgz -C /home/admin/test_spark_catalogSPARK_HOME環境変数をSparkの解凍済みディレクトリに設定します。export SPARK_HOME=$(pwd)例:
export SPARK_HOME=/home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3
ステップ3:OceanBase Catalogを設定する
OceanBase Spark Connectorをダウンロードします。
対応する spark-connector-oceanbase Jarパッケージをダウンロードします。
このJarパッケージをSpark Homeのjarsディレクトリに移動します。
cp spark-connector-oceanbase-3.4_2.12-1.1.jar $SPARK_HOME/jars/
または、解凍された
spark-3.4.4-bin-hadoop3.tgzの指定ディレクトリで、spark-connector-oceanbase Jarパッケージを直接ダウンロードします。Spark Homeのjarsディレクトリに移動します。
例:
cd /home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/jarsspark-connector-oceanbaseをダウンロードします。
wget https://repo1.maven.org/maven2/com/oceanbase/spark-connector-oceanbase-3.4_2.12/1.1/spark-connector-oceanbase-3.4_2.12-1.1.jar
MySQLドライバーをダウンロードします。
MySQLドライバー Jarパッケージをダウンロードします。
このJarパッケージをSpark Homeのjarsディレクトリに移動します。
cp mysql-connector-j-8.2.0.jar $SPARK_HOME/jars/
または、解凍された
spark-3.4.4-bin-hadoop3.tgzディレクトリで、MySQLドライバーJarパッケージを直接ダウンロードします。Spark Homeのjarsディレクトリに移動します。
例:
cd /home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/jarsspark-connector-oceanbaseをダウンロードします。
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.2.0/mysql-connector-j-8.2.0.jar
Spark設定ファイルを編集します。デフォルトは
$SPARK_HOME/conf/spark-defaults.confです。例:
説明
例に示すIPアドレスとパスワードは匿名化処理されており、検証時には自身のデータベースの実際の環境に合わせて入力する必要があります。
$SPARK_HOME/confディレクトリに移動します。cd /home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/confOceanBase Catalogを設定します。
vi spark-defaults.conf以下の内容を入力します:
spark.sql.catalog.ob=com.oceanbase.spark.catalog.OceanBaseCatalog spark.sql.catalog.ob.url=jdbc:mysql://10.10.10.1:2881 spark.sql.catalog.ob.username=root@mysql001 spark.sql.catalog.ob.password=****** spark.sql.catalog.ob.schema-name=test
Spark SQL CLIを起動します。
$SPARK_HOME/bin/spark-sql例:
/home/admin/test_spark_catalog/spark-3.4.4-bin-hadoop3/bin/spark-sql結果は以下のとおりです:
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/04/07 15:52:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/04/07 15:52:05 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 25/04/07 15:52:05 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist 25/04/07 15:52:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0 25/04/07 15:52:09 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore admin@10.10.10.1 25/04/07 15:52:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException Spark master: local[*], Application Id: local-1744012324296 spark-sql (default)>その後、Spark SQL CLIで
use ob;を入力し、ステップ3で設定したOceanBase catalogに切り替えます。spark-sql (default)> use ob;結果は以下のとおりです:
25/04/07 16:11:06 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Time taken: 2.544 seconds spark-sql (test)>
ステップ4:Spark CLIでOceanBase Catalogを使用する
Spark-SQLを使用したOceanBaseデータベーステーブルの作成
Spark SQL CLIに以下のテーブル作成ステートメントを入力します。
spark-sql (test)> CREATE TABLE orders (
order_id INT COMMENT 'order id',
order_date TIMESTAMP,
customer_name STRING,
price DOUBLE,
product_id INT,
order_status BOOLEAN);
結果は以下のとおりです:
25/04/07 16:14:25 WARN OceanBaseMySQLDialect: Ignored unsupported table property: owner
Time taken: 0.085 seconds
実行が成功したら、OBClientを使用してOceanBaseデータベースに接続すると、
ordersテーブルが作成されていることを確認できます。$obclient -h10.10.10.1 -P2881 -uroot@mysql001 -p****** -A test Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221487689 Server version: OceanBase 4.2.1.8 (r108020012024111712-585a11c3514ac7882b041453a529050ac62c6180) (Built Nov 17 2024 12:49:45) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [test]> show tables; +----------------+ | Tables_in_test | +----------------+ | orders | +----------------+ 1 row in setSpark SQL CLIに
show tables;を入力することで、作成したordersテーブルを確認することもできます。spark-sql (test)> show tables;結果は以下のとおりです:
orders Time taken: 0.044 seconds, Fetched 1 row(s)
Spark-SQLを使用したOceanBaseデータベーステーブルへのデータの書き込み
Spark SQL CLIで以下の書き込みステートメントを実行します。
spark-sql (test)> INSERT INTO orders VALUES
(1, now(), 'zs', 12.2, 12, true),
(2, now(), 'ls', 121.2, 12, true),
(3, now(), 'xx', 123.2, 12, true),
(4, now(), 'jac', 124.2, 12, false),
(5, now(), 'dot', 111.25, 12, true);
結果は以下のとおりです:
Time taken: 1.433 seconds
その後、OBClientを使用してOceanBaseデータベースに接続して、orders テーブルのデータをクエリできます。
obclient [test]> SELECT * FROM orders;
結果は以下のとおりです:
+----------+---------------------+---------------+--------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+--------+------------+--------------+
| 1 | 2025-04-07 16:22:34 | zs | 12.2 | 12 | 1 |
| 2 | 2025-04-07 16:22:34 | ls | 121.2 | 12 | 1 |
| 4 | 2025-04-07 16:22:34 | jac | 124.2 | 12 | 0 |
| 5 | 2025-04-07 16:22:34 | dot | 111.25 | 12 | 1 |
| 3 | 2025-04-07 16:22:34 | xx | 123.2 | 12 | 1 |
+----------+---------------------+---------------+--------+------------+--------------+
5 rows in set
Spark-SQLを使用したOceanBaseデータベーステーブルのクエリ
Spark SQL CLIで、上記に書き込まれたデータをクエリします。
spark-sql (test)> SELECT * FROM orders;
結果は以下のとおりです:
1 2025-04-07 16:22:34 zs 12.2 12 true
2 2025-04-07 16:22:34 ls 121.2 12 true
4 2025-04-07 16:22:34 jac 124.2 12 false
5 2025-04-07 16:22:34 dot 111.25 12 true
3 2025-04-07 16:22:34 xx 123.2 12 true
Time taken: 0.462 seconds, Fetched 5 row(s)
ステップ5:外部システムのデータをOceanBaseデータベースに同期する
hiveテーブルの作成とデータの挿入
Spark SQL CLIで以下のステートメントを実行して、hiveテーブルを作成します。
spark-sql (test)> DROP TABLE spark_catalog.default.orders; Time taken: 0.827 seconds spark-sql (test)> CREATE TABLE spark_catalog.default.orders ( order_id INT, order_date TIMESTAMP, customer_name STRING, price DOUBLE, product_id INT, order_status BOOLEAN ) USING PARQUET;結果は以下のとおりです:
25/04/07 16:37:54 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory. 25/04/07 16:37:54 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist 25/04/07 16:37:54 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 25/04/07 16:37:54 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist Time taken: 0.457 secondsSpark SQL CLIで以下のステートメントを実行し、hiveテーブルにデータを挿入します:
spark-sql (test)> INSERT INTO spark_catalog.default.orders VALUES (1, now(), 'zs', 12.2, 12, true), (2, now(), 'ls', 121.2, 12, true), (3, now(), 'xx', 123.2, 12, true), (4, now(), 'jac', 124.2, 12, false), (5, now(), 'dot', 111.25, 12, true);結果は以下のとおりです:
Time taken: 1.378 seconds
hiveテーブルデータのOceanBaseデータベースへの同期
Spark SQL CLIで以下の書き込みステートメントを実行します。
spark-sql (test)> INSERT INTO orders
SELECT * FROM spark_catalog.default.orders;
結果は以下のとおりです:
Time taken: 0.523 seconds
OBClientを使用してOceanBaseデータベースに接続し、orders テーブルのデータをクエリすることで、データが正常に同期されていることを確認できます。
obclient [test]> SELECT * FROM orders;
結果は以下のとおりです:
+----------+---------------------+---------------+--------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+--------+------------+--------------+
| 1 | 2025-04-07 16:22:34 | zs | 12.2 | 12 | 1 |
| 2 | 2025-04-07 16:22:34 | ls | 121.2 | 12 | 1 |
| 4 | 2025-04-07 16:22:34 | jac | 124.2 | 12 | 0 |
| 5 | 2025-04-07 16:22:34 | dot | 111.25 | 12 | 1 |
| 3 | 2025-04-07 16:22:34 | xx | 123.2 | 12 | 1 |
| 4 | 2025-04-07 16:42:04 | jac | 124.2 | 12 | 0 |
| 5 | 2025-04-07 16:42:04 | dot | 111.25 | 12 | 1 |
| 1 | 2025-04-07 16:42:04 | zs | 12.2 | 12 | 1 |
| 2 | 2025-04-07 16:42:04 | ls | 121.2 | 12 | 1 |
| 3 | 2025-04-07 16:42:04 | xx | 123.2 | 12 | 1 |
+----------+---------------------+---------------+--------+------------+--------------+
10 rows in set
環境のクリーンアップ
チュートリアルが完了後、以下のコマンドを使用して、必要に応じてSpark- SQL CLIを停止できます。
spark-sql (test)> quit;