Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 74 additions & 53 deletions evals/ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ def dimensions(self) -> int:
match self.model:
case "voyage-law-2":
return 1024
case "voyage-3.5":
return 1024
case _:
pass
case "cohere":
Expand All @@ -188,8 +190,8 @@ def ratelimit_tpm(self) -> float:
case "cohere":
return float("inf")
case "voyageai":
# Should be 1M, Manual Adjustment
return 500_000
# Should be 16M, Manual Adjustment
return 6_000_000
case "huggingface":
return 1_000_000
case "modal":
Expand All @@ -208,8 +210,8 @@ def ratelimit_rpm(self) -> float:
case "cohere":
return 1_000
case "voyageai":
# Manual Adjustment
return 50
# Should be 4K, Manual Adjustment
return 1_500
case "huggingface":
return 1_000
case "modal":
Expand All @@ -228,7 +230,7 @@ def max_batch_num_vectors(self) -> int:
case "cohere":
return 96
case "voyageai":
return 128
return 1000
case "huggingface":
return 1024
case "modal":
Expand All @@ -248,6 +250,12 @@ def max_batch_num_tokens(self) -> int:
return 1_000_000
case "modal":
return 1_000_000
case "voyageai":
match self.model:
case "voyage-3.5":
return 300_000
case _:
return 100_000
case _:
return 100_000

Expand Down Expand Up @@ -275,6 +283,7 @@ class AIRerankModel(BaseModel):
def ratelimit_tpm(self) -> float:
match self.company:
case "voyageai":
# Should be 4M
return 2_000_000
case "zeroentropy":
return 20_000_000
Expand All @@ -294,8 +303,8 @@ def ratelimit_rpm(self) -> float:
case "zeroentropy":
return 500
case "voyageai":
# It says 100RPM but I can only get 60 out of it
return 60
# Should be 2000 at Tier 2
return 500
case "together":
return 1000
case "jina":
Expand Down Expand Up @@ -326,6 +335,7 @@ class AIConnection:
google_client: AsyncOpenAI
zeroentropy_client: AsyncZeroEntropy | None
voyageai_client: voyageai.client_async.AsyncClient | None
voyageai_semaphore: asyncio.Semaphore
cohere_client: cohere.AsyncClient | None
together_client: AsyncOpenAI
together_rerank_client: cohere.AsyncClient | None
Expand Down Expand Up @@ -360,6 +370,7 @@ def __init__(self) -> None:
self.voyageai_client = voyageai.client_async.AsyncClient()
except voyageai.error.AuthenticationError:
self.voyageai_client = None
self.voyageai_semaphore = asyncio.Semaphore(50)
try:
self.cohere_client = cohere.AsyncClient()
except cohere.core.api_error.ApiError:
Expand Down Expand Up @@ -467,41 +478,27 @@ def tiktoken_truncate_by_num_tokens(
model: str = "cl100k_base",
) -> str:
encoding = tiktoken.get_encoding(model)
tokens = encoding.encode(s)
tokens = encoding.encode(s, disallowed_special=())
tokens = tokens[:max_tokens]
return encoding.decode(tokens)


def ai_num_tokens(model: AIModel | AIEmbeddingModel | AIRerankModel, s: str) -> int:
if isinstance(model, AIModel):
if model.company == "anthropic":
# Doesn't actually connect to the network
return (
get_ai_connection()
.sync_anthropic_client.messages.count_tokens(
model=model.model,
system="",
messages=[
{
"role": "user",
"content": s,
}
],
)
.input_tokens
)
pass
elif model.company == "openai":
if model.model.startswith("gpt-4") or model.model.startswith("gpt-5"):
model_str = "gpt-4"
else:
model_str = model.model
encoding = tiktoken.encoding_for_model(model_str)
num_tokens = len(encoding.encode(s))
num_tokens = len(encoding.encode(s, disallowed_special=()))
return num_tokens
if isinstance(model, AIEmbeddingModel):
if model.company == "openai":
encoding = tiktoken.encoding_for_model(model.model)
num_tokens = len(encoding.encode(s))
num_tokens = len(encoding.encode(s, disallowed_special=()))
return num_tokens
elif model.company == "voyageai":
voyageai_client = get_ai_connection().voyageai_client
Expand All @@ -511,7 +508,7 @@ def ai_num_tokens(model: AIModel | AIEmbeddingModel | AIRerankModel, s: str) ->
logger.exception("VoyageAI Client is not available")
# Otherwise, estimate
# logger.warning("Estimating Tokens!")
return int(len(s) / 4)
return int(len(s.encode()) / 4)


def get_call_cache_key(
Expand Down Expand Up @@ -814,12 +811,10 @@ async def ai_embedding(
# Cache
cache: dc.Cache | None = None,
# Num Tokens (Internal: To prevent recalculating)
_texts_num_tokens: list[int] | None = None,
_num_tokens_input: int | None = None,
) -> list[AIEmbedding]:
if cache is None:
cache = g_cache
if _texts_num_tokens is None:
_texts_num_tokens = [ai_num_tokens(model, text) for text in texts]

# Extract cache miss indices
text_embeddings: list[AIEmbedding | None] = [None] * len(texts)
Expand All @@ -845,38 +840,63 @@ async def ai_embedding(
i for i in range(len(text_embeddings)) if text_embeddings[i] is None
]

num_tokens_input: int = sum(
[_texts_num_tokens[index] for index in required_text_embeddings_indices]
)
if _num_tokens_input is None:
required_num_tokens = [
ai_num_tokens(model, texts[idx]) for idx in required_text_embeddings_indices
]
num_tokens_input = sum(required_num_tokens)
else:
required_num_tokens = None
num_tokens_input = _num_tokens_input

# Recursively Batch if necessary
if len(required_text_embeddings_indices) > model.max_batch_num_vectors or (
num_tokens_input > model.max_batch_num_tokens
and len(required_text_embeddings_indices) > 1
):
# Calculate batch size
batch_size = model.max_batch_num_vectors
# If we wouldn't split on the basis of max batch num vectors, but we should based on tokens, then we'll lower the batch size
if (
len(required_text_embeddings_indices) <= model.max_batch_num_vectors
and num_tokens_input > model.max_batch_num_tokens
):
batch_size = max(len(required_text_embeddings_indices) // 2, 1)
assert required_num_tokens is not None, "Recursion Error in ai_embedding"

# Calculate embeddings in batches
accumulating_batch: list[str] = []
accumulating_batch_num_tokens = 0
tasks: list[Coroutine[Any, Any, list[AIEmbedding]]] = []
for i in range(0, len(required_text_embeddings_indices), batch_size):
batch_indices = required_text_embeddings_indices[i : i + batch_size]
for idx, num_tokens in zip(
required_text_embeddings_indices, required_num_tokens, strict=True
):
if (
len(accumulating_batch) + 1 > model.max_batch_num_vectors
or accumulating_batch_num_tokens + num_tokens
> model.max_batch_num_tokens
):
tasks.append(
ai_embedding(
model,
accumulating_batch,
embedding_type,
num_ratelimit_retries=num_ratelimit_retries,
backoff_algo=backoff_algo,
callback=callback,
cache=cache,
_num_tokens_input=accumulating_batch_num_tokens,
)
)
accumulating_batch = []
accumulating_batch_num_tokens = 0

accumulating_batch.append(texts[idx])
accumulating_batch_num_tokens += num_tokens

if len(accumulating_batch) > 0:
tasks.append(
ai_embedding(
model,
[texts[i] for i in batch_indices],
accumulating_batch,
embedding_type,
num_ratelimit_retries=num_ratelimit_retries,
backoff_algo=backoff_algo,
callback=callback,
cache=cache,
_texts_num_tokens=[_texts_num_tokens[i] for i in batch_indices],
_num_tokens_input=accumulating_batch_num_tokens,
)
)
preflattened_results = await asyncio.gather(*tasks)
Expand Down Expand Up @@ -1014,15 +1034,16 @@ async def ai_embedding(
for i, text in enumerate(prepared_input_texts):
if len(text.strip()) == 0:
prepared_input_texts[i] = " "
result = await voyageai_client.embed(
prepared_input_texts,
model=model.model,
input_type=(
"document"
if embedding_type == AIEmbeddingType.DOCUMENT
else "query"
),
)
async with get_ai_connection().voyageai_semaphore:
result = await voyageai_client.embed(
prepared_input_texts,
model=model.model,
input_type=(
"document"
if embedding_type == AIEmbeddingType.DOCUMENT
else "query"
),
)
assert isinstance(result.embeddings, list)
text_embeddings_response = [
np.array(embedding) for embedding in result.embeddings
Expand Down
11 changes: 2 additions & 9 deletions evals/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,14 @@ def ze_results_path(
def embeddings_cache_path(
self,
retrieval_method: str,
include_relevant_docs: bool,
) -> str:
return self.retrieval_method_path(
retrieval_method, include_relevant_docs, "embeddings_cache.db"
)
return self.file_path(f"embeddings_cache/{retrieval_method}.db")

def reranker_cache_path(
self,
retrieval_method: str,
include_relevant_docs: bool,
reranker: str,
) -> str:
return self.retrieval_method_path(
retrieval_method, include_relevant_docs, f"{reranker}/reranker_cache.db"
)
return self.file_path(f"reranker_cache/{reranker}.db")

def ze_scores_path(
self,
Expand Down
2 changes: 1 addition & 1 deletion evals/ingestors/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def split_by_tokens(s: str, max_tokens: int) -> list[str]:
if len(s.encode("utf-8")) <= max_tokens: # Tokenization is always compressive
return [s]

tokens = encoding.encode(s)
tokens = encoding.encode(s, disallowed_special=())
total_tokens = len(tokens)

# Determine minimum number of chunks needed
Expand Down
Loading