diff --git a/evals/ai.py b/evals/ai.py index 4b2863e..0af06e8 100644 --- a/evals/ai.py +++ b/evals/ai.py @@ -163,6 +163,8 @@ def dimensions(self) -> int: match self.model: case "voyage-law-2": return 1024 + case "voyage-3.5": + return 1024 case _: pass case "cohere": @@ -188,8 +190,8 @@ def ratelimit_tpm(self) -> float: case "cohere": return float("inf") case "voyageai": - # Should be 1M, Manual Adjustment - return 500_000 + # Should be 16M, Manual Adjustment + return 6_000_000 case "huggingface": return 1_000_000 case "modal": @@ -208,8 +210,8 @@ def ratelimit_rpm(self) -> float: case "cohere": return 1_000 case "voyageai": - # Manual Adjustment - return 50 + # Should be 4K, Manual Adjustment + return 1_500 case "huggingface": return 1_000 case "modal": @@ -228,7 +230,7 @@ def max_batch_num_vectors(self) -> int: case "cohere": return 96 case "voyageai": - return 128 + return 1000 case "huggingface": return 1024 case "modal": @@ -248,6 +250,12 @@ def max_batch_num_tokens(self) -> int: return 1_000_000 case "modal": return 1_000_000 + case "voyageai": + match self.model: + case "voyage-3.5": + return 300_000 + case _: + return 100_000 case _: return 100_000 @@ -275,6 +283,7 @@ class AIRerankModel(BaseModel): def ratelimit_tpm(self) -> float: match self.company: case "voyageai": + # Should be 4M return 2_000_000 case "zeroentropy": return 20_000_000 @@ -294,8 +303,8 @@ def ratelimit_rpm(self) -> float: case "zeroentropy": return 500 case "voyageai": - # It says 100RPM but I can only get 60 out of it - return 60 + # Should be 2000 at Tier 2 + return 500 case "together": return 1000 case "jina": @@ -326,6 +335,7 @@ class AIConnection: google_client: AsyncOpenAI zeroentropy_client: AsyncZeroEntropy | None voyageai_client: voyageai.client_async.AsyncClient | None + voyageai_semaphore: asyncio.Semaphore cohere_client: cohere.AsyncClient | None together_client: AsyncOpenAI together_rerank_client: cohere.AsyncClient | None @@ -360,6 +370,7 @@ def __init__(self) -> None: self.voyageai_client = voyageai.client_async.AsyncClient() except voyageai.error.AuthenticationError: self.voyageai_client = None + self.voyageai_semaphore = asyncio.Semaphore(50) try: self.cohere_client = cohere.AsyncClient() except cohere.core.api_error.ApiError: @@ -467,7 +478,7 @@ def tiktoken_truncate_by_num_tokens( model: str = "cl100k_base", ) -> str: encoding = tiktoken.get_encoding(model) - tokens = encoding.encode(s) + tokens = encoding.encode(s, disallowed_special=()) tokens = tokens[:max_tokens] return encoding.decode(tokens) @@ -475,33 +486,19 @@ def tiktoken_truncate_by_num_tokens( def ai_num_tokens(model: AIModel | AIEmbeddingModel | AIRerankModel, s: str) -> int: if isinstance(model, AIModel): if model.company == "anthropic": - # Doesn't actually connect to the network - return ( - get_ai_connection() - .sync_anthropic_client.messages.count_tokens( - model=model.model, - system="", - messages=[ - { - "role": "user", - "content": s, - } - ], - ) - .input_tokens - ) + pass elif model.company == "openai": if model.model.startswith("gpt-4") or model.model.startswith("gpt-5"): model_str = "gpt-4" else: model_str = model.model encoding = tiktoken.encoding_for_model(model_str) - num_tokens = len(encoding.encode(s)) + num_tokens = len(encoding.encode(s, disallowed_special=())) return num_tokens if isinstance(model, AIEmbeddingModel): if model.company == "openai": encoding = tiktoken.encoding_for_model(model.model) - num_tokens = len(encoding.encode(s)) + num_tokens = len(encoding.encode(s, disallowed_special=())) return num_tokens elif model.company == "voyageai": voyageai_client = get_ai_connection().voyageai_client @@ -511,7 +508,7 @@ def ai_num_tokens(model: AIModel | AIEmbeddingModel | AIRerankModel, s: str) -> logger.exception("VoyageAI Client is not available") # Otherwise, estimate # logger.warning("Estimating Tokens!") - return int(len(s) / 4) + return int(len(s.encode()) / 4) def get_call_cache_key( @@ -814,12 +811,10 @@ async def ai_embedding( # Cache cache: dc.Cache | None = None, # Num Tokens (Internal: To prevent recalculating) - _texts_num_tokens: list[int] | None = None, + _num_tokens_input: int | None = None, ) -> list[AIEmbedding]: if cache is None: cache = g_cache - if _texts_num_tokens is None: - _texts_num_tokens = [ai_num_tokens(model, text) for text in texts] # Extract cache miss indices text_embeddings: list[AIEmbedding | None] = [None] * len(texts) @@ -845,38 +840,63 @@ async def ai_embedding( i for i in range(len(text_embeddings)) if text_embeddings[i] is None ] - num_tokens_input: int = sum( - [_texts_num_tokens[index] for index in required_text_embeddings_indices] - ) + if _num_tokens_input is None: + required_num_tokens = [ + ai_num_tokens(model, texts[idx]) for idx in required_text_embeddings_indices + ] + num_tokens_input = sum(required_num_tokens) + else: + required_num_tokens = None + num_tokens_input = _num_tokens_input # Recursively Batch if necessary if len(required_text_embeddings_indices) > model.max_batch_num_vectors or ( num_tokens_input > model.max_batch_num_tokens and len(required_text_embeddings_indices) > 1 ): - # Calculate batch size - batch_size = model.max_batch_num_vectors - # If we wouldn't split on the basis of max batch num vectors, but we should based on tokens, then we'll lower the batch size - if ( - len(required_text_embeddings_indices) <= model.max_batch_num_vectors - and num_tokens_input > model.max_batch_num_tokens - ): - batch_size = max(len(required_text_embeddings_indices) // 2, 1) + assert required_num_tokens is not None, "Recursion Error in ai_embedding" # Calculate embeddings in batches + accumulating_batch: list[str] = [] + accumulating_batch_num_tokens = 0 tasks: list[Coroutine[Any, Any, list[AIEmbedding]]] = [] - for i in range(0, len(required_text_embeddings_indices), batch_size): - batch_indices = required_text_embeddings_indices[i : i + batch_size] + for idx, num_tokens in zip( + required_text_embeddings_indices, required_num_tokens, strict=True + ): + if ( + len(accumulating_batch) + 1 > model.max_batch_num_vectors + or accumulating_batch_num_tokens + num_tokens + > model.max_batch_num_tokens + ): + tasks.append( + ai_embedding( + model, + accumulating_batch, + embedding_type, + num_ratelimit_retries=num_ratelimit_retries, + backoff_algo=backoff_algo, + callback=callback, + cache=cache, + _num_tokens_input=accumulating_batch_num_tokens, + ) + ) + accumulating_batch = [] + accumulating_batch_num_tokens = 0 + + accumulating_batch.append(texts[idx]) + accumulating_batch_num_tokens += num_tokens + + if len(accumulating_batch) > 0: tasks.append( ai_embedding( model, - [texts[i] for i in batch_indices], + accumulating_batch, embedding_type, num_ratelimit_retries=num_ratelimit_retries, backoff_algo=backoff_algo, callback=callback, cache=cache, - _texts_num_tokens=[_texts_num_tokens[i] for i in batch_indices], + _num_tokens_input=accumulating_batch_num_tokens, ) ) preflattened_results = await asyncio.gather(*tasks) @@ -1014,15 +1034,16 @@ async def ai_embedding( for i, text in enumerate(prepared_input_texts): if len(text.strip()) == 0: prepared_input_texts[i] = " " - result = await voyageai_client.embed( - prepared_input_texts, - model=model.model, - input_type=( - "document" - if embedding_type == AIEmbeddingType.DOCUMENT - else "query" - ), - ) + async with get_ai_connection().voyageai_semaphore: + result = await voyageai_client.embed( + prepared_input_texts, + model=model.model, + input_type=( + "document" + if embedding_type == AIEmbeddingType.DOCUMENT + else "query" + ), + ) assert isinstance(result.embeddings, list) text_embeddings_response = [ np.array(embedding) for embedding in result.embeddings diff --git a/evals/common.py b/evals/common.py index e5256ea..d1b1145 100644 --- a/evals/common.py +++ b/evals/common.py @@ -57,21 +57,14 @@ def ze_results_path( def embeddings_cache_path( self, retrieval_method: str, - include_relevant_docs: bool, ) -> str: - return self.retrieval_method_path( - retrieval_method, include_relevant_docs, "embeddings_cache.db" - ) + return self.file_path(f"embeddings_cache/{retrieval_method}.db") def reranker_cache_path( self, - retrieval_method: str, - include_relevant_docs: bool, reranker: str, ) -> str: - return self.retrieval_method_path( - retrieval_method, include_relevant_docs, f"{reranker}/reranker_cache.db" - ) + return self.file_path(f"reranker_cache/{reranker}.db") def ze_scores_path( self, diff --git a/evals/ingestors/common.py b/evals/ingestors/common.py index b1736f6..c9b9c22 100644 --- a/evals/ingestors/common.py +++ b/evals/ingestors/common.py @@ -145,7 +145,7 @@ def split_by_tokens(s: str, max_tokens: int) -> list[str]: if len(s.encode("utf-8")) <= max_tokens: # Tokenization is always compressive return [s] - tokens = encoding.encode(s) + tokens = encoding.encode(s, disallowed_special=()) total_tokens = len(tokens) # Determine minimum number of chunks needed diff --git a/evals/ingestors/master_mair_ingestion.py b/evals/ingestors/master_mair_ingestion.py new file mode 100644 index 0000000..9d98c60 --- /dev/null +++ b/evals/ingestors/master_mair_ingestion.py @@ -0,0 +1,217 @@ +import json +from random import Random +from typing import Any, override + +import requests +from datasets import load_dataset # pyright: ignore[reportMissingTypeStubs] + +from evals.common import Document, QRel, Query +from evals.ingestors.common import ( + BaseIngestor, + clean_dataset, +) + +# pyright: reportUnknownArgumentType=false +# pyright: reportUnknownVariableType=false +# pyright: reportUnknownMemberType=false + + +class MasterMairIngestor(BaseIngestor): + task_name: str + dataset_name: str + split: str + queries_split: str + docs_split: str + + def __init__( + self, + task_name: str, + queries_split: str, + docs_split: str, + ) -> None: + self.task_name = task_name + self.queries_split = queries_split + self.docs_split = docs_split + + self.dataset_name = "".join(filter(str.isalnum, task_name)).lower() + if queries_split != "queries": + self.dataset_name += f"_{queries_split.removesuffix('queries').lower()}" + + @classmethod + def all_splits(cls) -> list["MasterMairIngestor"]: + queries_dataset_name = "MAIR-Bench/MAIR-Queries" + docs_dataset_name = "MAIR-Bench/MAIR-Docs" + + def extract_subset_splits(dataset_name: str) -> dict[str, list[str]]: + API_URL = ( + f"https://datasets-server.huggingface.co/splits?dataset={dataset_name}" + ) + response = requests.get(API_URL) + response.raise_for_status() + data = response.json() + + # Extract (config, split) pairs from the response + config_split_pairs = [ + (item["config"], item["split"]) for item in data["splits"] + ] + + splits_by_subset: dict[str, list[str]] = {} + for config, split in config_split_pairs: + if config not in splits_by_subset: + splits_by_subset[config] = [] + splits_by_subset[config].append(split) + return splits_by_subset + + query_subset_splits = extract_subset_splits(queries_dataset_name) + doc_subset_splits = extract_subset_splits(docs_dataset_name) + assert set(query_subset_splits.keys()) == set(doc_subset_splits.keys()), ( + "Query and Doc subsets do not match" + ) + + subset_splits = [] + for subset in query_subset_splits.keys(): + query_splits = set( + split.removesuffix("queries") for split in query_subset_splits[subset] + ) + doc_splits = set( + split.removesuffix("docs") for split in doc_subset_splits[subset] + ) + assert query_splits == doc_splits, ( + f"Query and Doc splits do not match for subset {subset}" + ) + subset_splits.extend( + [ + (subset, query_split, doc_split) + for query_split, doc_split in zip( + sorted(query_subset_splits[subset]), + sorted(doc_subset_splits[subset]), + strict=True, + ) + ] + ) + + # Return an ingestor for each subset/split pair in MAIR-Bench + ingestors: list[MasterMairIngestor] = [] + for subset, queries_split, docs_split in subset_splits: + ingestors.append( + MasterMairIngestor( + task_name=subset, queries_split=queries_split, docs_split=docs_split + ) + ) + return ingestors + + @override + def dataset_id(self) -> str: + return f"evals/mair/{self.dataset_name}" + + @override + def ingest(self) -> tuple[list[Query], list[Document], list[QRel]]: + """Load MAIR task and return queries, documents, qrels""" + + # Load MAIR queries -- rows are {id, instruction, query, labels: {id, score}} + queries_data = load_dataset( + "MAIR-Bench/MAIR-Queries", self.task_name, split=self.queries_split + ) + + # Load MAIR documents -- rows are {id, doc} + corpus_data = load_dataset( + "MAIR-Bench/MAIR-Docs", self.task_name, split=self.docs_split + ) + + queries = self._load_queries(queries_data) + documents = self._load_documents(corpus_data) + qrels = self._load_qrels(queries_data) + + return clean_dataset(queries, documents, qrels) + + def _format_instruction_query( + self, instruction: str, query: str, qid: str + ) -> tuple[str, str]: + # Format instruction/query pair to one of five single-string templates + return Random(qid).choice( + [ + ("newline", f"{instruction}\n\n{query}"), + ( + "xml", + f"{instruction}\n{query}", + ), + ("markdown", f"# Instruction\n{instruction}\n\n# Query\n{query}"), + ( + "json", + f'{{"instruction": "{json.dumps(instruction)}", "query": "{json.dumps(query)}"}}', + ), + ("caps", f"TASK DESCRIPTION: {instruction}\n\nINPUT: {query}"), + ("xml2", f"{instruction}\n{query}"), + ( + "naturallanguage", + f"Instructions below:\n{instruction}\n\nNow please answer the following query:\n{query}", + ), + ] + ) + + def _load_queries(self, query_dataset: Any) -> list[Query]: + queries: list[Query] = [] + seen_qids: set[str] = set() + for idx, row in enumerate(query_dataset): + # Handle potential duplicate query IDs + query_id = str(row["qid"]) + if query_id in seen_qids: + query_id = f"{query_id}_{idx}" + seen_qids.add(query_id) + + template, query_text = self._format_instruction_query( + instruction=row.get("instruction", ""), + query=row["query"], + qid=query_id, + ) + queries.append( + Query( + id=str(query_id), + query=query_text, + metadata={"dataset": self.dataset_name, "template": template}, + ) + ) + return queries + + def _load_documents(self, docs_dataset: Any) -> list[Document]: + documents: list[Document] = [] + seen_doc_ids: set[str] = set() + for row in docs_dataset: + if str(row["id"]) in seen_doc_ids: + continue + seen_doc_ids.add(str(row["id"])) + documents.append( + Document( + id=str(row["id"]), + content=row["doc"], + metadata={"dataset": self.dataset_name}, + ) + ) + return documents + + def _load_qrels(self, query_dataset: Any) -> list[QRel]: + qrels: list[QRel] = [] + qrel_pkey_to_score: dict[tuple[str, str], int] = {} + seen_qids: set[str] = set() + for idx, row in enumerate(query_dataset): + # Handle potential duplicate query IDs + query_id = str(row["qid"]) + if query_id in seen_qids: + query_id = f"{query_id}_{idx}" + seen_qids.add(query_id) + + for label in row["labels"]: + doc_id = str(label["id"]) + score = int(label["score"]) + + pkey = (query_id, doc_id) + qrel_pkey_to_score[pkey] = max(qrel_pkey_to_score.get(pkey, 0), score) + for (query_id, doc_id), score in qrel_pkey_to_score.items(): + qrels.append( + QRel( + query_id=query_id, + document_id=doc_id, + score=score, + ) + ) + return qrels diff --git a/evals/ingestors/pandas.py b/evals/ingestors/pandas_documentation.py similarity index 95% rename from evals/ingestors/pandas.py rename to evals/ingestors/pandas_documentation.py index 4d0cc4d..d823ac7 100644 --- a/evals/ingestors/pandas.py +++ b/evals/ingestors/pandas_documentation.py @@ -9,7 +9,7 @@ from evals.ingestors.common import BaseIngestor, clean_dataset -class PandasIngestor(BaseIngestor): +class PandasDocumentationIngestor(BaseIngestor): @override def dataset_id(self) -> str: return "evals/pandas" diff --git a/evals/run_embeddings.py b/evals/run_embeddings.py index b2c3edb..02d4238 100644 --- a/evals/run_embeddings.py +++ b/evals/run_embeddings.py @@ -255,7 +255,9 @@ async def generate_embeddings( # Calculate similarity scores using selected retrieval method embeddings_cache = ( dc.Cache( - dataset.embeddings_cache_path(retrieval_method, include_relevant_docs), + dataset.embeddings_cache_path( + retrieval_method if retrieval_method != "hybrid" else "openai_small" + ), eviction_policy="none", ) if USE_EMBEDDINGS_CACHE @@ -291,7 +293,7 @@ async def generate_embeddings( top_sorted_indices, similarity_scores = await get_embeddings( AIEmbeddingModel( company="voyageai", - model="voyage-3-large", + model="voyage-3.5", ), queries, documents, @@ -319,9 +321,11 @@ async def generate_embeddings( ) # Save all necessary data - with open( - dataset.ze_results_path(retrieval_method, include_relevant_docs), "w" - ) as f: + ze_results_path = Path( + dataset.ze_results_path(retrieval_method, include_relevant_docs) + ) + ze_results_path.parent.mkdir(parents=True, exist_ok=True) + with open(ze_results_path, "w") as f: queries_processed = 0 queries_skipped = 0 diff --git a/evals/run_rerankers.py b/evals/run_rerankers.py index 5428435..c9fdf53 100644 --- a/evals/run_rerankers.py +++ b/evals/run_rerankers.py @@ -1,5 +1,6 @@ import asyncio from contextlib import ExitStack +from pathlib import Path from typing import TextIO import diskcache as dc # pyright: ignore[reportMissingTypeStubs] @@ -102,9 +103,7 @@ async def rerank_dataset( for reranker in rerankers: reranker_caches[reranker] = ( dc.Cache( - directory=dataset.reranker_cache_path( - retrieval_method, include_relevant_docs, reranker - ), + directory=dataset.reranker_cache_path(reranker), eviction_policy="none", ) if USE_RERANKER_CACHE @@ -119,6 +118,11 @@ async def rerank_dataset( ) pending_tasks: set[asyncio.Task[None]] = set() + for reranker in rerankers: + Path( + dataset.ze_scores_path(retrieval_method, include_relevant_docs, reranker) + ).parent.mkdir(parents=True, exist_ok=True) + with open(ze_results_path) as f, ExitStack() as stack: f_write: dict[RerankerName, TextIO] = { reranker: stack.enter_context( diff --git a/evals/types.py b/evals/types.py index 59b1094..1c83147 100644 --- a/evals/types.py +++ b/evals/types.py @@ -13,12 +13,13 @@ from evals.ingestors.ineqs import IneqsIngestor from evals.ingestors.leetcode_multi import LeetcodeMultiLanguageIngestor from evals.ingestors.master_legal_ingestion import MasterLegalIngestor +from evals.ingestors.master_mair_ingestion import MasterMairIngestor from evals.ingestors.master_mteb_ingestion import MasterMtebIngestor from evals.ingestors.mbpp import MbppIngestor from evals.ingestors.meeting import MeetingIngestor from evals.ingestors.msmarco import MSMarcoIngestor from evals.ingestors.narrativeqa import NarrativeQAIngestor -from evals.ingestors.pandas import PandasIngestor +from evals.ingestors.pandas_documentation import PandasDocumentationIngestor from evals.ingestors.qmsum import QMSumIngestor from evals.ingestors.quora import QuoraIngestor from evals.ingestors.quora_swedish import QuoraSwedishIngestor @@ -178,7 +179,7 @@ QuoraSwedishIngestor(), MeetingIngestor(), NarrativeQAIngestor(), - PandasIngestor(), + PandasDocumentationIngestor(), ] MTEB_INGESTORS: list[BaseIngestor] = [ @@ -274,6 +275,9 @@ split="test", ), ] + +MAIR_INGESTORS: list[BaseIngestor] = [*MasterMairIngestor.all_splits()] + ALL_INGESTORS: list[BaseIngestor] = MTEB_INGESTORS + NEW_INGESTORS + ORIGINAL_INGESTORS # Defaults