Q&Aのanswerをembeddingする例。
RecursiveCharacterTextSplitterでanswerを分割してからmetadataに入れる。
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=200,
chunk_overlap=80,
length_function=len,
add_start_index=True,
)
texts = []
with open(file_path, encoding="utf-8", mode="r") as f:
reader = csv.DictReader(f)
for row in reader:
chuncked_texts = text_splitter.create_documents(
[row["answer"]]
)
for chuncked_text in chuncked_texts:
texts.append(
{
"content": row["question"],
"metadata": {
"answer": chuncked_text.page_content,
},
}
)
Chroma.from_texts(
[text["content"] for text in texts],
OpenAIEmbeddings(),
metadatas=[text["metadata"] for text in texts],
persist_directory=PERSIST_DIR,
)
len_safe_get_embedding
でembeddingされたデータが返ってくる。
ChromaなどのDBの場合は、embeddingの関数を引数に入れて使うため、この例は多分使えない(?)。
https://cookbook.openai.com/examples/embedding_long_inputs
import openai # OpenAIのAPIを利用するためにopenaiライブラリをインポート
from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_not_exception_type
# tenacityライブラリから、リトライ制御のための関数とデコレータをインポート
EMBEDDING_MODEL = 'text-embedding-ada-002' # 使用する埋め込みモデルの名称を指定
EMBEDDING_CTX_LENGTH = 8191 # 埋め込みにおいて利用する最大コンテキスト長を指定
EMBEDDING_ENCODING = 'cl100k_base' # 埋め込みに用いるエンコーディング方式を指定
# リトライロジックを適用するためのデコレータを定義
# wait_random_exponentialはランダムな指数関数的な待ち時間を設定する
# stop_after_attemptは指定した回数後にリトライを停止する
# retry_if_not_exception_typeは指定した例外タイプ以外の時にリトライする
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6), retry=retry_if_not_exception_type(openai.InvalidRequestError))
def get_embedding(text_or_tokens, model=EMBEDDING_MODEL):
# OpenAIのEmbedding APIを呼び出して埋め込みを取得する関数
return openai.Embedding.create(input=text_or_tokens, model=model)["data"][0]["embedding"]
long_text = 'AGI ' * 5000 # テスト用の長いテキストを生成('AGI 'が5000回繰り返される)
try:
get_embedding(long_text) # 長いテキストに対して埋め込みを取得しようと試みる
except openai.InvalidRequestError as e:
print(e) # InvalidRequestErrorが発生した場合、エラーメッセージを出力
import tiktoken # テキストをトークンに変換するためのtiktokenライブラリをインポート
# テキストをトークンに変換して指定したトークン数に切り詰める関数を定義
def truncate_text_tokens(text, encoding_name=EMBEDDING_ENCODING, max_tokens=EMBEDDING_CTX_LENGTH):
"""指定されたエンコーディングでテキストをトークン化し、最大トークン数に切り詰める"""
encoding = tiktoken.get_encoding(encoding_name) # 指定されたエンコーディングを取得
return encoding.encode(text)[:max_tokens] # テキストをエンコードし、最大トークン数で切り詰める
truncated = truncate_text_tokens(long_text) # 長いテキストをトークン化して切り詰める
len(get_embedding(truncated)) # 切り詰めたテキストの埋め込みを取得して、その長さを確認
from itertools import islice # イテラブルのスライスを取得するためのisliceをインポート
# イテラブルを指定されたサイズのバッチに分割する関数を定義
def batched(iterable, n):
"""イテラブルをn要素ごとに分割する。最後のバッチはnより少ない可能性がある"""
if n < 1:
raise ValueError('nは1以上でなければならない') # nが1未満の場合はエラーを発生させる
it = iter(iterable) # イテラブルからイテレータを生成
while (batch := tuple(islice(it, n))): # n要素ごとにイテラブルを切り出し、バッチを生成
yield batch # 生成したバッチをyieldで返す
# テキストを指定された長さのトークンチャンクに分割する関数を定義
def chunked_tokens(text, encoding_name, chunk_length):
encoding = tiktoken.get_encoding(encoding_name) # 指定されたエンコーディングを取得
tokens = encoding.encode(text) # テキストをトークン化
chunks_iterator = batched(tokens, chunk_length) # トークンを指定された長さでバッチ処理
yield from chunks_iterator # 各チャンクをyieldで返す
import numpy as np # 数値計算用のnumpyライブラリをインポート
# 長いテキストの埋め込みを安全に取得し、必要に応じて平均を取る関数を定義
def len_safe_get_embedding(text, model=EMBEDDING_MODEL, max_tokens=EMBEDDING_CTX_LENGTH, encoding_name=EMBEDDING_ENCODING, average=True):
chunk_embeddings = [] # 各チャンクの埋め込みを格納するリスト
chunk_lens = [] # 各チャンクの長さを格納するリスト
for chunk in chunked_tokens(text, encoding_name=encoding_name, chunk_length=max_tokens):
chunk_embeddings.append(get_embedding(chunk, model=model)) # 各チャンクの埋め込みを取得してリストに追加
chunk_lens.append(len(chunk)) # 各チャンクの長さをリストに追加
if average:
# 埋め込みの平均を計算
chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens)
# ベクトルの長さを1に正規化
chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)
chunk_embeddings = chunk_embeddings.tolist() # numpy配列をリストに変換
return chunk_embeddings # 最終的な埋め込みベクトルを返す
average_embedding_vector = len_safe_get_embedding(long_text, average=True) # 長いテキストの平均埋め込みベクトルを取得
chunks_embedding_vectors = len_safe_get_embedding(long_text, average=False) # 各チャンクの埋め込みベクトルを取得
# 結果の出力
print(f"Setting average=True gives us a single {len(average_embedding_vector)}-dimensional embedding vector for our long text.")
print(f"Setting average=False gives us {len(chunks_embedding_vectors)} embedding vectors, one for each of the chunks.")
average=True
の場合:
average=False
の場合: