pyobvectorはOceanBaseベクトルストレージ機能のpython SDKであり、2つの使用モードを提供します。
pymilvus互換モード:MilvusLikeClientオブジェクトを使用してデータベースを操作し、軽量なMilvusClientと互換性のある一般的なインターフェースを提供します。
SQLAlchemy拡張モード:ObVecClientオブジェクトを使用してデータベースを操作し、リレーショナルデータベース用のpython SDK拡張機能を提供します。
本記事では、2つのモードにおける使用インターフェースと例をそれぞれ紹介します。
MilvusLikeClient
コンストラクター
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
collection関連インターフェース
| API | パラメータ記述 | 例 |
|---|---|---|
def create_schema(self, **kwargs) -> CollectionSchema: |
|
|
|
テーブルを作成します:
|
|
|
テーブルのレコードの数を取得します
|
|
|
テーブルが存在しているかどうかを判断します
|
|
|
リネームテーブル
|
|
|
テーブルのメタデータをSQLAlchemyメタデータキャッシュに読み込みます
|
CollectionSchema & FieldSchema
MilvusLikeClientは、CollectionSchemaを使用してテーブルのモード定義を記述します。1つのCollectionSchemaは複数のFieldSchemaを含み、FieldSchemaは1つのテーブルの列モードを記述します。
MilvusLikeClientのcreate_schemaを使用してCollectionSchemaを作成します
def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)
パラメータの説明は以下のとおりです:
fields:FieldSchemaのオプションのセット。
partitions:パーティションルール(詳細についてはObPartitionを使用したパーティションルールの定義に関する章を参照してください)。
description:Milvusと互換性がありますが、OceanBaseでは現時点で実用的な役割はありません。
FieldSchemaを作成してCollectionSchemaに登録する
def add_field(self, field_name: str, datatype: DataType, **kwargs)
field_name:列名。
datatype:列データ型。サポートされているデータ型の詳細については、 互換性に関する説明を参照してください。
kwargs:その他のパラメータは列属性の設定に使用されます。例は以下のとおりです:
def __init__( self, name: str, dtype: DataType, description: str = "", is_primary: bool = False, auto_id: bool = False, nullable: bool = False, **kwargs, )パラメータの説明は以下のとおりです:
is_primary:主キーであるかどうか。
auto_id:自動インクリメント列かどうか。
nullable:空を許可するかどうか。
使用例
schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)
self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)
インデックス関連
| API | パラメータ記述 | 例または備考 |
|---|---|---|
|
作成済みのIndexParamsに基づいてベクトルインデックステーブルを作成する(このインターフェースのIndexParamsに関する使用の詳細については、prepare_index_paramsとadd_indexインターフェースを参照してください)
|
|
|
インデックステーブル削除
|
|
|
読み取りの性能を向上させるためにベクトルインデックステーブルをリフレッシュします。これは増分データの移行とみなすことができます
|
OceanBaseで追加されたインターフェース Milvusとの互換性はありません |
|
読み取りの性能を向上させるためにベクトルインデックステーブルを再構築します。これは増分データをベースラインインデックスデータにマージすることとみなすことができます
|
OceanBaseに追加されたインターフェース Milvusとの互換性はありません |
|
ベクトル近似近傍検索を実行します
レコードのリストで、各レコードは辞典です column_nameから列値へのマッピングを表しています。 |
|
|
指定されたフィルタ条件を使用してデータレコードを読み取ります
レコードのリストで、各レコードは辞典です column_nameから列値へのマッピングを表しています。 |
|
|
指定された主キー ids のレコードを取得します:
レコードのリストで、各レコードは辞典です column_nameから列値へのマッピングを表しています。 |
|
|
集合内のデータを削除します
|
|
|
テーブルにデータを挿入します
|
|
|
テーブルのデータを更新します。主キーが重複した場合、それを交換します
|
|
|
SQLステートメントを直接実行します
SQLAlchemyが提供する結果セットのイテレータを返します |
ObVecClient
コンストラクター
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
テーブルモード関連操作
| API | パラメータ記述 | 例または備考 |
|---|---|---|
|
検査テーブルが存在しているかどうか
|
|
|
テーブル作成
|
|
|
IndexParams対象を作成してベクトルインデックステーブルのモード定義を記録します(列名、インデックス名)のtupleからIndexParam構造へのマッピングを保持します IndexParamタイプの構造関数は:
prepare_index_params を通じてIndexParamsを取得した後、add_index インターフェースを通じてIndexParamを登録できます: |
ベクトルインデックスを作成する使用例を示しています:prepare_index_params 関数はMilvusLikeClientで使用し、ObVecClientでは使用しないことが推奨されます。ObVecClientモードでは create_index インターフェースを使用してベクトルインデックステーブルを定義しなければなりません。(詳細については、create_indexインターフェースを参照してください) |
|
オプションのindex_paramsを使用して、テーブルを作成すると同時にベクトルインデックスを作成します
|
MilvusLikeClientでの使用が推奨されます。ObVecClientでの使用は推奨されません |
|
通常インデックスとベクトルインデックスの2つのモードの作成をサポートします
type=hnsw および lib=vsag のみをサポートしています。これらの2つの設定は保持してください。distanceは l2 または inner_product に設定できます |
|
|
ベクトルインデックスパラメータを使用してベクトルインデックスを作成します
|
|
|
テーブルを削除します
|
|
|
インデックスを削除します
|
|
|
読み取りの性能を向上させるためにベクトルインデックステーブルをリフレッシュします。これは増分データの移行とみなすことができます
|
|
|
読み取りの性能を向上させるためにベクトルインデックステーブルを再構築します。これは増分データをベースラインインデックスデータにマージすることとみなすことができます
|
DML操作
| API | パラメータ記述 | 例または備考 |
|---|---|---|
|
テーブルにデータを入力します
|
|
|
テーブルのデータを更新します。主キーが重複した場合、それを交換します。
|
|
|
テーブルのデータを更新します。主キーが重複した場合、それを交換します。
|
|
|
テーブルのデータを削除します
|
|
|
指定した主キー ids のレコードを取得します:
MilvusLikeClientとは異なり、ObVecClientの戻り値はtuple listです。各tupleは1行のレコードを表します |
|
|
HNSWインデックスのefSearchパラメータを設定します。sessionレベル変数設定では、ef_searchが大きくなればリコール率も高くなるが、クエリの性能は若干下がります。
|
|
|
HNSWインデックスのefSearchパラメータを取得します | |
|
ベクトル近似近傍検索を実行します
MilvusLikeClientとは異なり、ObVecClientの戻り値はtuple listです。各tupleは1行のレコードを表します |
|
|
正確近傍検索アルゴリズムを実行します
MilvusLikeClientとは異なり、ObVecClientの戻り値はtuple listです。各tupleは1行のレコードを表します |
|
|
SQLステートメントを直接実行します
SQLAlchemyが提供する結果セットのイテレータを返します |
ObPartitionを使用したパーティションルールの定義
pyobvectorは、range/range columns、list/list columns、hash、keyおよびサブパーティションをサポートするために以下のタイプを提供します:
ObRangePartition:rangeパーティション。構築する際に
is_range_columns = Trueを設定してrange columnsパーティションを作成します。ObListPartition:listパーティション。構築する際に
is_list_columns = Trueを設定してlist columnsパーティションを作成します。ObHashPartition:hashパーティション。
ObKeyPartition:keyパーティション。
ObSubRangePartition:rangeサブパーティション。構築する際に
is_range_columns = Trueを設定してrange columnsサブパーティションを作成します。ObSubListPartition:listサブパーティション。構築する際に
is_list_columns = Trueを設定してlist columnsサブパーティションを作成します。ObSubHashPartition:hashサブパーティション。
ObSubKeyPartition:keyサブパーティション。
rangeパーティションの例
range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)
listパーティションの例
list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)
hashパーティションの例
hash_part = ObHashPartition("col1", part_count=60)
マルチレベルパーティションの例
# rangeパーティション
range_columns_part = ObRangePartition(
True,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", 200),
RangeListPartInfo("p2", 300),
],
col_name_list=["col1"],
)
# rangeサブパーティション
range_sub_part = ObSubRangePartition(
False,
range_part_infos=[
RangeListPartInfo("mp0", 1000),
RangeListPartInfo("mp1", 2000),
RangeListPartInfo("mp2", 3000),
],
range_expr="col3",
)
range_columns_part.add_subpartition(range_sub_part)
純粋なSQLAlchemy APIモード
OceanBaseデータベースのベクトル検索機能で純粋なSQLAlchemy APIを使用したい場合には、以下の2つの方法を通じて同期データベースエンジンを取得できます:
- 方法1:ObVecClientを使用してデータベースエンジンの作成を補助する
from pyobvector import ObVecClient
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
engine = client.engine
# 次に、SQLAlchemyを通常通り使用してsessionを作成し、SQLAlchemyのAPIを使用します
- 方法2:ObVecClientの
create_engineインターフェースを使用してデータベースエンジンを作成する
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
# mysql+aoceanbaseは、mysql標準を選択し、OceanBaseデータベースの同期ドライバーを使用することを意味します
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
# 次に、SQLAlchemyを通常通り使用してsessionを作成し、SQLAlchemyのAPIを使用します
SQLAlchemyの非同期インターフェースを使用したい場合は、OceanBaseデータベースの非同期ドライバーを使用できます。
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
# mysql+aoceanbaseは、mysql標準を選択し、OceanBaseデータベースの非同期ドライバーを使用することを意味します
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
# 次に、SQLAlchemyを通常通り使用してsessionを作成し、SQLAlchemyのAPIを使用します
さらなる例
pyobvectorコードリポジトリにアクセスして、より多くの例を取得します。