参考1. RecursiveCharacterTextSplitterを使う

Q&Aのanswerをembeddingする例。

RecursiveCharacterTextSplitterでanswerを分割してからmetadataに入れる。

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=200,
        chunk_overlap=80,
        length_function=len,
        add_start_index=True,
    )

texts = []
with open(file_path, encoding="utf-8", mode="r") as f:
    reader = csv.DictReader(f) 
		for row in reader:
        chuncked_texts = text_splitter.create_documents(
            [row["answer"]]
        )
        for chuncked_text in chuncked_texts:
            texts.append(
                {
                    "content": row["question"],
                    "metadata": {
                        "answer": chuncked_text.page_content,
                    },
                }
            )
		Chroma.from_texts(
        [text["content"] for text in texts],
        OpenAIEmbeddings(),
        metadatas=[text["metadata"] for text in texts],
        persist_directory=PERSIST_DIR,
    )

参考2. OpenAI CookBook

len_safe_get_embedding でembeddingされたデータが返ってくる。

ChromaなどのDBの場合は、embeddingの関数を引数に入れて使うため、この例は多分使えない(?)。

https://cookbook.openai.com/examples/embedding_long_inputs

import openai  # OpenAIのAPIを利用するためにopenaiライブラリをインポート
from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_not_exception_type  
# tenacityライブラリから、リトライ制御のための関数とデコレータをインポート

EMBEDDING_MODEL = 'text-embedding-ada-002'  # 使用する埋め込みモデルの名称を指定
EMBEDDING_CTX_LENGTH = 8191  # 埋め込みにおいて利用する最大コンテキスト長を指定
EMBEDDING_ENCODING = 'cl100k_base'  # 埋め込みに用いるエンコーディング方式を指定

# リトライロジックを適用するためのデコレータを定義
# wait_random_exponentialはランダムな指数関数的な待ち時間を設定する
# stop_after_attemptは指定した回数後にリトライを停止する
# retry_if_not_exception_typeは指定した例外タイプ以外の時にリトライする
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6), retry=retry_if_not_exception_type(openai.InvalidRequestError))
def get_embedding(text_or_tokens, model=EMBEDDING_MODEL):
    # OpenAIのEmbedding APIを呼び出して埋め込みを取得する関数
    return openai.Embedding.create(input=text_or_tokens, model=model)["data"][0]["embedding"]

long_text = 'AGI ' * 5000  # テスト用の長いテキストを生成('AGI 'が5000回繰り返される)
try:
    get_embedding(long_text)  # 長いテキストに対して埋め込みを取得しようと試みる
except openai.InvalidRequestError as e:
    print(e)  # InvalidRequestErrorが発生した場合、エラーメッセージを出力

import tiktoken  # テキストをトークンに変換するためのtiktokenライブラリをインポート

# テキストをトークンに変換して指定したトークン数に切り詰める関数を定義
def truncate_text_tokens(text, encoding_name=EMBEDDING_ENCODING, max_tokens=EMBEDDING_CTX_LENGTH):
    """指定されたエンコーディングでテキストをトークン化し、最大トークン数に切り詰める"""
    encoding = tiktoken.get_encoding(encoding_name)  # 指定されたエンコーディングを取得
    return encoding.encode(text)[:max_tokens]  # テキストをエンコードし、最大トークン数で切り詰める

truncated = truncate_text_tokens(long_text)  # 長いテキストをトークン化して切り詰める
len(get_embedding(truncated))  # 切り詰めたテキストの埋め込みを取得して、その長さを確認

from itertools import islice  # イテラブルのスライスを取得するためのisliceをインポート

# イテラブルを指定されたサイズのバッチに分割する関数を定義
def batched(iterable, n):
    """イテラブルをn要素ごとに分割する。最後のバッチはnより少ない可能性がある"""
    if n < 1:
        raise ValueError('nは1以上でなければならない')  # nが1未満の場合はエラーを発生させる
    it = iter(iterable)  # イテラブルからイテレータを生成
    while (batch := tuple(islice(it, n))):  # n要素ごとにイテラブルを切り出し、バッチを生成
        yield batch  # 生成したバッチをyieldで返す

# テキストを指定された長さのトークンチャンクに分割する関数を定義
def chunked_tokens(text, encoding_name, chunk_length):
    encoding = tiktoken.get_encoding(encoding_name)  # 指定されたエンコーディングを取得
    tokens = encoding.encode(text)  # テキストをトークン化
    chunks_iterator = batched(tokens, chunk_length)  # トークンを指定された長さでバッチ処理
    yield from chunks_iterator  # 各チャンクをyieldで返す

import numpy as np  # 数値計算用のnumpyライブラリをインポート

# 長いテキストの埋め込みを安全に取得し、必要に応じて平均を取る関数を定義
def len_safe_get_embedding(text, model=EMBEDDING_MODEL, max_tokens=EMBEDDING_CTX_LENGTH, encoding_name=EMBEDDING_ENCODING, average=True):
    chunk_embeddings = []  # 各チャンクの埋め込みを格納するリスト
    chunk_lens = []  # 各チャンクの長さを格納するリスト
    for chunk in chunked_tokens(text, encoding_name=encoding_name, chunk_length=max_tokens):
        chunk_embeddings.append(get_embedding(chunk, model=model))  # 各チャンクの埋め込みを取得してリストに追加
        chunk_lens.append(len(chunk))  # 各チャンクの長さをリストに追加

    if average:
        # 埋め込みの平均を計算
        chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens)
        # ベクトルの長さを1に正規化
        chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)
        chunk_embeddings = chunk_embeddings.tolist()  # numpy配列をリストに変換
    return chunk_embeddings  # 最終的な埋め込みベクトルを返す

average_embedding_vector = len_safe_get_embedding(long_text, average=True)  # 長いテキストの平均埋め込みベクトルを取得
chunks_embedding_vectors = len_safe_get_embedding(long_text, average=False)  # 各チャンクの埋め込みベクトルを取得

# 結果の出力
print(f"Setting average=True gives us a single {len(average_embedding_vector)}-dimensional embedding vector for our long text.")
print(f"Setting average=False gives us {len(chunks_embedding_vectors)} embedding vectors, one for each of the chunks.")