pyobvectorは、OceanBaseのベクトルストレージ機能を利用するためのPython SDKであり、以下の2つのモードを提供しています:
pymilvus互換モード:MilvusLikeClientオブジェクトを使用してデータベースを操作し、軽量級のMilvusClientと互換性のある一般的なインターフェースを提供します。
SQLAlchemy拡張モード:ObVecClientオブジェクトを使用してデータベースを操作し、リレーショナルデータベース用のPython SDK拡張機能を提供します。
本記事では、それぞれのモードにおけるインターフェースと使用例について説明します。
MilvusLikeClient
コンストラクタ
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
collection関連インターフェース
API |
パラメータの説明 |
例 |
|---|---|---|
def create_schema(self, **kwargs) -> CollectionSchema: |
|
|
|
テーブルの作成:
|
|
|
テーブルのレコード数の取得。
|
|
|
テーブルが存在するかどうかを判断します。
|
|
|
テーブルの名前を変更します。
|
|
|
テーブルのメタデータをSQLAlchemyメタデータキャッシュに読み込みます。
|
CollectionSchema & FieldSchema
MilvusLikeClientは、CollectionSchemaを使用してテーブルのスキーマ定義を記述します。1つのCollectionSchemaには複数のFieldSchemaが含まれ、FieldSchemaはテーブルの列スキーマを記述します。
MilvusLikeClientのcreate_schemaを使用したCollectionSchemaの作成
def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)
パラメータの説明は以下のとおりです:
fields:オプションのFieldSchemaのセット。
partitions:パーティションルール(詳細はObPartitionを使用したパーティションルールの定義の章を参照)。
description:Milvusとの互換性のために使用され、OceanBaseでは現在実際の役割はありません。
FieldSchemaの作成とCollectionSchemaへの登録
def add_field(self, field_name: str, datatype: DataType, **kwargs)
field_name:列名。
datatype:列のデータ型(サポートされているデータ型については、互換性の説明を参照)。
kwargs:その他のパラメータは列の属性を設定するために使用されます。例:
def __init__( self, name: str, dtype: DataType, description: str = "", is_primary: bool = False, auto_id: bool = False, nullable: bool = False, **kwargs, )パラメータの説明は以下のとおりです:
is_primary:主キーかどうか。
auto_id:自動インクリメント列かどうか。
nullable:NULLを許容するかどうか。
使用例
schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)
self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)
インデックス関連
API |
パラメータ説明 |
例または備考 |
|---|---|---|
|
既に構築されたIndexParamsに基づいてベクトルインデックステーブルを作成します(このAPIにおけるIndexParamsの使用方法の詳細は、prepare_index_paramsおよびadd_index APIを参照してください)。
|
|
|
インデックステーブルを削除します。
|
|
|
ベクトルインデックステーブルをリフレッシュして読み取り性能を向上させます。これは、増分データの移行と理解できます。
|
OceanBase独自の追加インターフェースです。 Milvusとは互換性ありません |
|
ベクトルインデックステーブルを再構築して読み取り性能を向上させます。これは、増分データをベースラインインデックスデータにマージすることと理解できます。
|
OceanBase独自の追加インターフェースです。 Milvusとは互換性ありません |
|
ベクトル近似近傍探索の実行
レコードのリスト。各レコードは、column_nameから列値へのマッピングを表す辞書です。 |
|
|
指定されたフィルター条件を使用してデータレコードを読み取る
レコードのリスト。各レコードは、column_nameから列値へのマッピングを表す辞書です。 |
|
|
指定された主キー ids のレコードを取得します:
レコードのリストで、各レコードはdictです column_nameから列値へのマッピングを表します。 |
|
|
コレクション内のデータを削除します。
|
|
|
テーブルにデータを挿入します。
|
|
|
テーブル内のデータを更新します。主キーが既に存在する場合は、該当するレコードを更新します。不存在の場合は、新規レコードを挿入します。
|
|
|
SQLステートメントを直接実行します。
SQLAlchemyが提供する結果セットイテレータを返します。 |
ObVecClient
コンストラクタ
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
テーブルモード関連操作
API |
パラメータ説明 |
例または備考 |
|---|---|---|
|
テーブルの存在を確認する。
|
|
|
テーブルを作成する。
|
|
|
IndexParamsオブジェクトを作成し、ベクトルインデックステーブルのスキーマ定義を記録します。があり、(列名、インデックス名) のタプルをIndexParam構造へマッピングしています。 IndexParamクラスのコンストラクタは次のとおりです。
prepare_index_paramsでIndexParamsを取得した後、add_indexインターフェースを使用してIndexParamを登録できます: |
ベクトルインデックスの作成例を示します:prepare_index_params関数はMilvusLikeClientでの使用を推奨し、ObVecClientでの使用は推奨されません。ObVecClientモードでは、create_indexインターフェースを使用してベクトルインデックステーブルを定義する必要があります。(詳細はcreate_indexインターフェースを参照) |
|
オプションのindex_paramsを使用して、テーブル作成と同時にベクトルインデックスを作成します。
|
MilvusLikeClientでの使用を推奨します。ObVecClientでの使用は推奨されません。 |
|
通常インデックスとベクトルインデックスの2種類のモードをサポートしています。
type=hnswとlib=vsagのみをサポートしています。これら2つの設定は維持してください。distanceはl2またはinner_productに設定できます。 |
|
|
ベクトルインデックスパラメータを使用してベクトルインデックスを作成します。
|
|
|
テーブルを削除します。
|
|
|
インデックスを削除します。
|
|
|
ベクトルインデックステーブルをリフレッシュして読み取りパフォーマンスを向上させる。増分データの移動と理解できる。
|
|
|
ベクトルインデックステーブルを再構築して読み取りパフォーマンスを向上させる。増分データをベースラインインデックスデータにマージすると理解できる。
|
DML操作
API |
パラメータの説明 |
例または備考 |
|---|---|---|
|
テーブルにデータを挿入します。
|
|
|
テーブルのデータを挿入または更新します。主キーが既に存在する場合は、対応するレコードを更新します。不存在の場合は、新規レコードを挿入します。
|
|
|
テーブルのデータを更新します。主キーが重複する場合は、それを置き換えます。
|
|
|
テーブル内のデータを削除。
|
|
|
指定された主キーidsのレコードを取得します。
MilvusLikeClientとは異なり、ObVecClientの戻り値はtuple listです。各tupleは1行のレコードを表します。 |
|
|
HNSWインデックスのefSearchパラメータを設定します。セッションレベル変数の設定で、ef_searchが大きいほど再現率は高くなりますが、クエリのパフォーマンスは低下します。
|
|
|
HNSWインデックスのefSearchパラメータを取得します。 | |
|
ベクトル近似近傍探索の実行。
MilvusLikeClientとは異なり、ObVecClientの戻り値はtuple listであり、各tupleは1行のレコードを表す。 |
|
|
厳密近傍探索アルゴリズムの実行。
MilvusLikeClientとは異なり、ObVecClientの戻り値はtuple listであり、各tupleは1行のレコードを表す。 |
|
|
SQLステートメントを直接実行します。
SQLAlchemyが提供する結果セットイテレータを返します。 |
ObPartitionを使用したパーティションルールの定義
pyobvectorは、range/range columns、list/list columns、hash、key、およびサブパーティションをサポートするために以下のタイプを提供しています:
ObRangePartition:rangeパーティション。構築時に
is_range_columns = Trueを設定することで、range columnsパーティションを作成します。ObListPartition:listパーティション。構築時に
is_list_columns = Trueを設定することで、list columnsパーティションを作成します。ObHashPartition:hashパーティション。
ObKeyPartition:keyパーティション。
ObSubRangePartition:サブrangeパーティション。構築時に
is_range_columns = Trueを設定することで、range columnsサブパーティションを作成します。ObSubListPartition:listサブパーティション。構築時に
is_list_columns = Trueを設定することで、list columnsサブパーティションを作成します。ObSubHashPartition:hashサブパーティション。
ObSubKeyPartition:keyサブパーティション。
rangeパーティションの例
range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)
listパーティションの例
list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)
hashパーティションの例
hash_part = ObHashPartition("col1", part_count=60)
複数レベルパーティションの例
# パーティションrange
range_columns_part = ObRangePartition(
True,
range_partInfos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", 200),
RangeListPartInfo("p2", 300),
],
col_name_list=["col1"],
)
# サブパーティションrange
range_sub_part = ObSubRangePartition(
False,
range_partInfos=[
RangeListPartInfo("mp0", 1000),
RangeListPartInfo("mp1", 2000),
RangeListPartInfo("mp2", 3000),
],
range_expr="col3",
)
range_columns_part.add_subpartition(range_sub_part)
純粋なSQLAlchemy APIモード
OceanBaseデータベースのベクトル検索機能で純粋なSQLAlchemy APIを使用したい場合は、以下の2つの方法で同期されたデータベースエンジンを取得できます。
- 方法1:ObVecClientを使用してデータベースエンジンを作成する
from pyobvector import ObVecClient
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
engine = client.engine
# その後、通常どおりSQLAlchemyを使用してsessionを作成し、SQLAlchemyのAPIを使用できます。
- 方法2:ObVecClientの
create_engineインターフェースを使用してデータベースエンジンを作成する
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
# mysql+oceanbase は、MySQL標準を選択し、OceanBaseデータベースの同期ドライバーを使用することを意味します。
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
# その後、通常どおりSQLAlchemyを使用してsessionを作成し、SQLAlchemyのAPIを使用できます。
SQLAlchemyの非同期インターフェースを使用したい場合は、OceanBaseデータベースの非同期ドライバーを使用できます。
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
# mysql+aoceanbase は、MySQL標準を選択し、OceanBaseデータベースの非同期ドライバーを使用することを意味します。
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
# その後、通常どおりSQLAlchemyを使用してsessionを作成し、SQLAlchemyのAPIを使用できます。
詳細な例
pyobvectorコードリポジトリにアクセスして、詳細な例を入手してください。