226 lines
11 KiB
Python
226 lines
11 KiB
Python
import sys
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Dict, Union
|
|
|
|
import pandas as pd
|
|
from langchain_chroma import Chroma
|
|
from langchain_openai import OpenAIEmbeddings
|
|
from rag.rag_utils import constraint_path, problem_descriptions_vector_db_path, constraint_vector_db_path, \
|
|
objective_descriptions_vector_db_path
|
|
|
|
root_path = Path("..")
|
|
root_path = root_path.absolute().parent
|
|
sys.path.append(str(root_path / "rag"))
|
|
|
|
openai_key = "sk-f01744b2801344b1a72f89ec7e290cad"
|
|
openai_org = "###"
|
|
constraint_df = pd.read_pickle(constraint_path)
|
|
|
|
|
|
class RAGFormat(Enum):
|
|
PROBLEM_DESCRIPTION_OBJECTIVE = 1
|
|
PROBLEM_DESCRIPTION_CONSTRAINTS = 2
|
|
CONSTRAINT_FORMULATION = 3
|
|
OBJECTIVE_FORMULATION = 4
|
|
|
|
|
|
def load_vector_db(vector_db_path: Path, model_name: str = "text-embedding-3-large") -> Chroma:
|
|
"""
|
|
Loads the vector database from the specified directory.
|
|
Args:
|
|
vector_db_path (Path): The path to the vector database directory.
|
|
model_name (str): The model name for generating embeddings.
|
|
Returns:
|
|
Chroma: The loaded vector database.
|
|
"""
|
|
embedding_function = OpenAIEmbeddings(model=model_name, openai_api_key=openai_key, organization=openai_org)
|
|
return Chroma(persist_directory=str(vector_db_path), embedding_function=embedding_function)
|
|
|
|
|
|
constraint_vector_db = load_vector_db(constraint_vector_db_path)
|
|
problem_desciption_vector_db = load_vector_db(problem_descriptions_vector_db_path)
|
|
objective_descriptions_vector_db = load_vector_db(objective_descriptions_vector_db_path)
|
|
|
|
|
|
def get_rag_from_problem_description(description: str, format_type: RAGFormat, top_k: int = 3) -> str:
|
|
"""
|
|
Generates RAG (Retrieval-Augmented Generation) text based on a problem description.
|
|
Args:
|
|
description (str): The problem description.
|
|
format_type (RAGFormat): The format type for RAG text.
|
|
top_k (int, optional): The number of top similar documents to consider. Defaults to 3.
|
|
Returns:
|
|
str: The generated RAG text.
|
|
"""
|
|
similar_documents = problem_desciption_vector_db.similarity_search_with_score(description, k=top_k + 1)
|
|
rag_text = ""
|
|
similar_documents_remove_duplicates = [document for document in similar_documents if
|
|
document[0].page_content != description][:top_k]
|
|
for i in range(top_k):
|
|
document = similar_documents_remove_duplicates[i][0]
|
|
# document.metadata['key']
|
|
if format_type == RAGFormat.PROBLEM_DESCRIPTION_OBJECTIVE:
|
|
rag_text += f"Problem Description:\n{document.page_content}\n\nObjective:\n{constraint_df[constraint_df.problem_name == document.metadata['key']].iloc[0].objective_description}\n\n"
|
|
elif format_type == RAGFormat.PROBLEM_DESCRIPTION_CONSTRAINTS:
|
|
rag_text += f"Problem Description:\n{document.page_content}\n\nConstraints:\n"
|
|
for row in constraint_df[constraint_df.problem_name == document.metadata['key']].itertuples():
|
|
if row.constraint_description == "auxiliary constraint":
|
|
continue
|
|
rag_text += f"{row.constraint_description}\n\n"
|
|
elif format_type == RAGFormat.CONSTRAINT_FORMULATION:
|
|
for row in constraint_df[constraint_df.problem_name == document.metadata['key']].itertuples():
|
|
rag_text += f"{row.constraint_description}\n{row.constraint_formulation}\n\n"
|
|
elif format_type == RAGFormat.OBJECTIVE_FORMULATION:
|
|
rag_text += f"Objective:\n{constraint_df[constraint_df.problem_name == document.metadata['key']].iloc[0].objective_description}\n{constraint_df[constraint_df.problem_name == document.metadata['key']].iloc[0].objective_formulation}\n\n"
|
|
return rag_text
|
|
|
|
|
|
def get_rag_from_constraint(constraint_description: str, format_type: RAGFormat,
|
|
current_problem_name: str | None = None, top_k: int = 10) -> str:
|
|
"""
|
|
Generates RAG text based on a constraint description.
|
|
Args:
|
|
constraint_description (str): The constraint description.
|
|
format_type (RAGFormat): The format type for RAG text.
|
|
current_problem_name (str | None, optional): The name of the current problem. Defaults to None.
|
|
top_k (int, optional): The number of top similar documents to consider. Defaults to 10.
|
|
Returns:
|
|
str: The generated RAG text.
|
|
"""
|
|
assert format_type in [RAGFormat.CONSTRAINT_FORMULATION]
|
|
|
|
similar_documents = constraint_vector_db.similarity_search_with_score(constraint_description, k=top_k + 20)
|
|
rag_text = ""
|
|
similar_documents_remove_duplicates = [document for document in similar_documents if
|
|
document[0].page_content != constraint_description]
|
|
similar_documents_remove_duplicates = [x for x in similar_documents_remove_duplicates if constraint_df.iloc[
|
|
x[0].metadata['key']].problem_name != current_problem_name][:top_k]
|
|
|
|
for i in range(min(top_k, len(similar_documents_remove_duplicates))):
|
|
x = similar_documents_remove_duplicates[i][0]
|
|
row = constraint_df.iloc[x.metadata['key']]
|
|
rag_text += f"{row.constraint_description}\n{row.constraint_formulation}\n\n"
|
|
return rag_text
|
|
|
|
|
|
def get_rag_from_objective(objective_description: str, format_type: RAGFormat, current_problem_name: str | None = None,
|
|
top_k: int = 10) -> str:
|
|
"""
|
|
Generates RAG text based on an objective description.
|
|
Args:
|
|
objective_description (str): The objective description.
|
|
format_type (RAGFormat): The format type for RAG text.
|
|
current_problem_name (str | None, optional): The name of the current problem. Defaults to None.
|
|
top_k (int, optional): The number of top similar documents to consider. Defaults to 10.
|
|
Returns:
|
|
str: The generated RAG text.
|
|
"""
|
|
assert format_type in [RAGFormat.OBJECTIVE_FORMULATION]
|
|
|
|
similar_documents = objective_descriptions_vector_db.similarity_search_with_score(objective_description,
|
|
k=top_k + 20)
|
|
rag_text = ""
|
|
similar_documents_remove_duplicates = [document for document in similar_documents if
|
|
document[0].page_content != objective_description]
|
|
similar_documents_remove_duplicates = [x for x in similar_documents_remove_duplicates if constraint_df.iloc[
|
|
int(x[0].metadata['key'])].problem_name != current_problem_name][:top_k]
|
|
|
|
for i in range(min(top_k, len(similar_documents_remove_duplicates))):
|
|
x = similar_documents_remove_duplicates[i][0]
|
|
row = constraint_df.iloc[int(x.metadata['key'])]
|
|
rag_text += f"Objective:\n{row.objective_description}\n{row.objective_formulation}\n\n"
|
|
return rag_text
|
|
|
|
|
|
def jaccard_similarity(set1: Union[set, List[Tuple[str, str]], str],
|
|
set2: Union[set, List[Tuple[str, str]], str]) -> float:
|
|
"""
|
|
Calculates the Jaccard similarity between two sets, lists of pairs, or descriptions.
|
|
Args:
|
|
set1 (Union[set, List[Tuple[str, str]], str]): The first set, list of pairs, or description.
|
|
set2 (Union[set, List[Tuple[str, str]], str]): The second set, list of pairs, or description.
|
|
Returns:
|
|
float: The Jaccard similarity coefficient.
|
|
"""
|
|
if isinstance(set1, str) and isinstance(set2, str):
|
|
set1 = set(set1.split())
|
|
set2 = set(set2.split())
|
|
elif isinstance(set1, list) and isinstance(set2, list):
|
|
set1 = set(set1)
|
|
set2 = set(set2)
|
|
assert isinstance(set1, set) and isinstance(set2, set)
|
|
|
|
intersection = len(set1.intersection(set2))
|
|
union = len(set1.union(set2))
|
|
return intersection / union if union != 0 else 0
|
|
|
|
|
|
def get_rag_from_problem_categories(description: str, labels: Dict[str, Dict], format_type: RAGFormat,
|
|
current_problem_name: str | None = None, top_k: int = 3) -> str:
|
|
"""
|
|
Generates RAG text based on problem categories and similarity criteria.
|
|
Args:
|
|
description (str): The problem description.
|
|
labels (Dict[str, Dict]): The labels for problem categories.
|
|
format_type (RAGFormat): The format type for RAG text.
|
|
current_problem_name (str | None, optional): The name of the current problem. Defaults to None.
|
|
top_k (int, optional): The number of top similar problems to consider. Defaults to 3.
|
|
Returns:
|
|
str: The generated RAG text.
|
|
"""
|
|
# Generate the pairs for the specified problem
|
|
pairs_x = [(labels['types'][i], labels['domains'][j]) for i in range(len(labels['types'])) for j in
|
|
range(len(labels['domains']))]
|
|
types_x = set(labels['types'])
|
|
domains_x = set(labels['domains'])
|
|
|
|
# Store similarities
|
|
similarities = []
|
|
|
|
# Iterate over other problems
|
|
for other_problem_name in set(constraint_df.problem_name.unique()) - {current_problem_name}:
|
|
y_row = constraint_df[constraint_df.problem_name == other_problem_name].iloc[0]
|
|
y_labels = y_row['labels']
|
|
description_y = y_row['description']
|
|
|
|
# Generate the pairs for the other problem
|
|
pairs_y = [(y_labels['types'][i], y_labels['domains'][j]) for i in range(len(y_labels['types'])) for j in
|
|
range(len(y_labels['domains']))]
|
|
types_y = set(y_labels['types'])
|
|
domains_y = set(y_labels['domains'])
|
|
|
|
# Calculate Jaccard similarities
|
|
pairs_similarity = jaccard_similarity(pairs_x, pairs_y)
|
|
types_similarity = jaccard_similarity(types_x, types_y)
|
|
domains_similarity = jaccard_similarity(domains_x, domains_y)
|
|
description_similarity = jaccard_similarity(description, description_y)
|
|
|
|
# Combine similarities (sum of individual categories)
|
|
combined_similarity = types_similarity + domains_similarity
|
|
|
|
similarities.append((other_problem_name, pairs_similarity, combined_similarity, description_similarity))
|
|
|
|
# Rank the problems based on the criteria
|
|
ranked_problems = sorted(similarities, key=lambda x: (x[1], x[2], x[3]), reverse=True)
|
|
|
|
# Create the RAG text
|
|
rag_text = ""
|
|
for problem, _, _, _ in ranked_problems[:top_k]:
|
|
problem_df = constraint_df[constraint_df.problem_name == problem]
|
|
if format_type == RAGFormat.PROBLEM_DESCRIPTION_OBJECTIVE:
|
|
rag_text += f"Problem: {problem_df.iloc[0]['description']}\n\nObjective:\n{problem_df.iloc[0]['objective_description']}\n\n"
|
|
elif format_type == RAGFormat.PROBLEM_DESCRIPTION_CONSTRAINTS:
|
|
rag_text += f"Problem: {problem_df.iloc[0]['description']}\n\nConstraints:\n"
|
|
for row in problem_df.itertuples():
|
|
if row.constraint_description == "auxiliary constraint":
|
|
continue
|
|
rag_text += f"{row.constraint_description}\n\n"
|
|
elif format_type == RAGFormat.CONSTRAINT_FORMULATION:
|
|
for row in problem_df.itertuples():
|
|
rag_text += f"{row.constraint_description}\n{row.constraint_formulation}\n\n"
|
|
elif format_type == RAGFormat.OBJECTIVE_FORMULATION:
|
|
rag_text += f"Objective:\n{problem_df.iloc[0]['objective_description']}\n{problem_df.iloc[0]['objective_formulation']}\n\n"
|
|
|
|
return rag_text
|