LangGraphはグラフベースのAIワークフローフレームワークであり、複数ステップのオーケストレーション、状態管理、永続化をサポートします。 本記事では、langgraph-checkpoint-oceanbaseパッケージを使用して、OceanBaseデータベースのMySQLモードをLangGraphのバックエンドとして利用し、短期記憶(チェックポイント)および長期記憶(ストア)機能を実現する方法について説明します。
バージョン互換性
- OceanBaseデータベースのバージョン:V4.3.5 BP2以上。
前提条件
LangGraphとOceanBaseを統合する前に、次のことを確認してください:
- OceanBaseデータベースのデプロイが完了し、MySQLモードのユーザーテナントが作成されていること。テナント作成の詳細については、テナントの作成を参照してください。
- Python 3.10+ および pip がインストール済みであること。
手順
ステップ1:データベース接続文字列を取得する
OceanBaseデータベースのデプロイ担当者から接続文字列を取得します。例:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
パラメータ説明:
$host:接続IPアドレス。ODP接続はODPアドレスを使用し、直接接続はOBServer IPを使用します。$port:接続ポート。ODPのデフォルトは2883、直接接続のデフォルトは2881です。$database_name:データベース名。注意
テナントに接続するユーザーには、データベースに対する
CREATE、INSERT、DROPおよびSELECT権限が付与されていなければなりません。ユーザー権限の詳細については、MySQLモードの権限分類を参照してください。$user_name:接続アカウント。ODP形式:ユーザー@テナント#クラスタまたはクラスタ:テナント:ユーザー。直接接続形式:ユーザー@テナント。$password:アカウントのパスワード。
接続文字列の詳細については、OBClientを使用してOceanBaseテナントに接続するを参照してください。
例:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
ステップ2:LangGraph OceanBase統合パッケージのインストール
pipを使用してlanggraph-checkpoint-oceanbaseと関連依存パッケージをインストールします:
pip install langgraph-checkpoint-oceanbase pymysql langgraph langchain[openai] aiomysql asyncmy
ステップ3:Checkpointer(短期的な記憶)の使用
Checkpointerは、対話履歴とグラフの状態を格納し、短期的な記憶と中断からの復旧を実現します。
Checkpointerの初期化
from langgraph.checkpoint.oceanbase.pyoceanbase import PyOceanBaseSaver
DB_URI = "mysql://$user_name:$password@$host:$port/$database_name"
with PyOceanBaseSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
上記コードを実行すると、OceanBaseデータベースにはcheckpoint_blobs、checkpoint_migrations、checkpoint_writes、checkpointsの4つのテーブルが作成されます。
レコードのコンテキスト
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.oceanbase.pyoceanbase import PyOceanBaseSaver
from langchain_core.runnables.config import RunnableConfig
from langchain_core.messages import HumanMessage
model = init_chat_model(model="qwen-max-latest", api_key="$your_api_key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", model_provider="openai",
temperature=0)
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": response}
DB_URI = "mysql://$user_name:$password@$host:$port/$database_name"
with PyOceanBaseSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config: RunnableConfig = {
"configurable": {
"thread_id": "1"
}
}
# First conversation
print("First conversation:")
for chunk in graph.stream(
{"messages": [HumanMessage(content="hi! I'm bob")]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
# Second conversation (testing memory function)
print("\nSecond conversation:")
for chunk in graph.stream(
{"messages":[HumanMessage(content="what's my name?")]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
実行結果:
First conversation:
================================ Human Message =================================
hi! I'm bob
================================== Ai Message ==================================
Hi Bob! Great to meet you again. How can I assist you today? 😊
Second conversation:
================================ Human Message =================================
what's my name?
================================== Ai Message ==================================
Your name is Bob! 😊
ステップ4:Store(長期記憶)の使用
Storeは、ユーザー情報など、AIアプリケーションの長期記憶を格納するために使用されます。
Storeの初期化
from langgraph.store.oceanbase import PyOceanBaseStore
DB_URI = "mysql://$user_name:$password@$host:$port/$database_name"
with PyOceanBaseStore.from_conn_string(DB_URI) as store:
store.setup()
上記コードを実行すると、OceanBaseデータベースにstoreとstore_migrationsという2つのテーブルが追加で作成されます。
Storeに保存された情報の読み取り
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.oceanbase import PyOceanBaseStore
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage
DB_URI = "mysql://$user_name:$password@$host:$port/$database_name"
model = init_chat_model(model="qwen-max-latest", api_key="$your_api_key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", model_provider="openai", temperature=0)
def get_user_info(config: RunnableConfig) -> str:
"""Look up user info"""
store = get_store()
user_id = config.get("configurable", {}).get("user_id")
if user_id is None:
raise ValueError("user_id must be provided in config['configurable']")
user_info = store.get(("users",), user_id)
return str(user_info.value) if user_info else "Unknown user"
with PyOceanBaseStore.from_conn_string(DB_URI) as store:
store.put(
("users",),
"user_123",
{
"name": "John Smith",
"language": "English"
}
)
agent = create_react_agent(
model=model,
tools=[get_user_info],
store=store
)
# Run the agent
result = agent.invoke(
{"messages": HumanMessage(content="look up user information")},
config={"configurable": {"user_id": "user_123"}}
)
print(result["messages"][-1].content)
実行結果:
The user's name is John Smith and his preferred language is English.
ストアへの情報書き込み
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.oceanbase import PyOceanBaseStore
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage
from typing_extensions import TypedDict
DB_URI = "mysql://$user_name:$password@$host:$port/$database_name"
model = init_chat_model(model="qwen-max-latest", api_key="$your_api_key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", model_provider="openai", temperature=0)
class UserInfo(TypedDict):
name: str
def save_user_info(user_info: UserInfo, config: RunnableConfig) -> str:
"""Save user info"""
store = get_store()
user_id = config.get("configurable", {}).get("user_id")
if user_id is None:
raise ValueError("user_id must be provided in config['configurable']")
store.put(("users",), user_id, dict(user_info))
return "Successfully saved user info."
with PyOceanBaseStore.from_conn_string(DB_URI) as store:
agent = create_react_agent(
model=model,
tools=[save_user_info],
store=store
)
# Run the agent
_ = agent.invoke(
{"messages": [HumanMessage(content="My name is Tom and save my information")]},
config={"configurable": {"user_id": "user_1"}}
)
# You can access the store directly to get the value
result = store.get(("users",), "user_1")
if result is not None:
print(result.value)
else:
print("No value found for user_1")
実行結果:
{'name': 'Tom'}