mirror of
https://github.com/khoj-ai/khoj.git
synced 2026-04-17 13:58:51 -04:00
234 lines
10 KiB
Python
234 lines
10 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
|
|
import pandas as pd
|
|
from datasets import Dataset, DatasetDict, load_dataset
|
|
from dotenv import load_dotenv
|
|
from huggingface_hub import HfApi
|
|
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
SYSTEM_PROMPT = """
|
|
Your role as an assistant involves thoroughly exploring questions through a systematic long thinking process before providing the final precise and accurate solutions. This requires engaging in a comprehensive cycle of analysis, summarizing, exploration, reassessment, reflection, backtracing, and iteration to develop well-considered thinking process. Please structure your response into two main sections: Thought and Solution. In the Thought section, detail your reasoning process using the specified format: <|begin_of_thought|> {thought with steps separated with '\\n\\n'} <|end_of_thought|> Each step should include detailed considerations such as analisying questions, summarizing relevant findings, brainstorming new ideas, verifying the accuracy of the current steps, refining any errors, and revisiting previous steps.In the Solution section, based on various attempts, explorations, and reflections from the Thought section, systematically present the final solution that you deem correct. The solution should remain a logical, accurate, concise expression style and detail necessary step needed to reach the conclusion, formatted as follows: <|begin_of_solution|> {final formatted, precise, and clear solution} <|end_of_solution|>
|
|
|
|
Make sure to use the specific LaTeX math mode delimiters for your response. LaTex math mode specific delimiters as following
|
|
- inline math mode : \\( and \\)
|
|
- display math mode: insert linebreak after opening $$, \\[ and before closing $$, \\]
|
|
|
|
Now, try to solve the following question through the above guidelines"""
|
|
|
|
|
|
def load_dataset_from_jsonl(dataset_path: str, replace_system_prompt: bool = False) -> pd.DataFrame:
|
|
"""Load data trace from JSONL into pandas Dataframe."""
|
|
if not os.path.exists(dataset_path):
|
|
return Dataset.from_dict({"system": "", "conversations": []})
|
|
|
|
try:
|
|
# Read JSONL line by line to catch errors
|
|
data = []
|
|
with open(dataset_path, "r") as f:
|
|
for i, line in enumerate(f, 1):
|
|
try:
|
|
loaded_data = json.loads(line.strip())
|
|
if "system" in loaded_data and replace_system_prompt:
|
|
loaded_data["system"] = SYSTEM_PROMPT
|
|
data.append(loaded_data)
|
|
except json.JSONDecodeError as e:
|
|
logger.debug(f"Error on line {i}: {e}")
|
|
logger.debug(f"Problematic line: {line.strip()}")
|
|
continue
|
|
|
|
# Convert to DataFrame
|
|
if not data:
|
|
return Dataset.from_dict({"system": [], "conversations": []})
|
|
return pd.DataFrame(data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing dataset: {e}")
|
|
return Dataset.from_dict({"system": [], "conversations": []})
|
|
|
|
|
|
def load_eval_results_from_csv(eval_results_paths: str | list[str]) -> pd.DataFrame:
|
|
"""
|
|
Load evaluation results from one or more CSV files into a single Pandas DataFrame.
|
|
|
|
Args:
|
|
eval_results_paths: Single path string or list of paths to CSV files
|
|
|
|
Returns:
|
|
Combined DataFrame with all evaluation results
|
|
"""
|
|
# Convert single path to list
|
|
if isinstance(eval_results_paths, str):
|
|
eval_results_paths = [eval_results_paths]
|
|
|
|
# Initialize empty DataFrame
|
|
combined_df = pd.DataFrame()
|
|
|
|
# Load and concatenate each CSV file
|
|
for path in eval_results_paths:
|
|
if os.path.exists(path):
|
|
df = pd.read_csv(path)
|
|
combined_df = pd.concat([combined_df, df], ignore_index=True)
|
|
|
|
return combined_df
|
|
|
|
|
|
def get_good_dataset_rows(datatrace: pd.DataFrame, eval: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Filter dataset rows to keep only those with successful evaluations.
|
|
|
|
Args:
|
|
dataset: Dataframe with 'system' and 'conversations' columns
|
|
eval: Dataframe with 'prompt', 'agent_response' and 'success' columns
|
|
|
|
Returns:
|
|
Filtered data trace rows corresponding to the successful evaluations
|
|
"""
|
|
|
|
# Filter successful evaluations
|
|
successful_evals = eval[eval["evaluation_decision"] == 1.0]
|
|
|
|
# Create matching columns in datatrace
|
|
datatrace["prompt"] = datatrace["conversations"].apply(lambda x: x[-2]["value"] if len(x) >= 2 else None)
|
|
|
|
# Merge datatrace with successful evaluations
|
|
good_rows = pd.merge(datatrace, successful_evals[["prompt"]], on=["prompt"], how="inner")
|
|
|
|
# Drop temporary columns and return
|
|
return good_rows.drop(columns=["prompt"])
|
|
|
|
|
|
def deduplicate_rows(good_rows: pd.DataFrame, parent_dataset: DatasetDict) -> pd.DataFrame:
|
|
"""
|
|
Deduplicate the good_rows and parent_dataset. Within the `conversation` column, the first item in the list is the user prompt. The duplicate rows would have matching `value` fields in the user prompt.
|
|
|
|
Args:
|
|
good_rows: Dataframe with 'system' and 'conversations' columns
|
|
parent_dataset: Dataframe with 'system' and 'conversations' columns
|
|
|
|
Returns:
|
|
Deduplicated rows
|
|
"""
|
|
# Convert the good_rows and parent_dataset to JSON
|
|
user_prompts = set(good_rows["conversations"].apply(lambda x: x[0]["value"]))
|
|
|
|
logger.info(f"Found {len(good_rows)} rows in good_rows")
|
|
|
|
logger.info(f"Found {len(user_prompts)} unique user prompts in good_rows")
|
|
|
|
# Convert parent dataset to DataFrame and filter
|
|
parent_df = parent_dataset.data["train"].to_pandas()
|
|
|
|
logger.info(f"Found {len(parent_df)} rows in parent dataset")
|
|
parent_filtered = parent_df[~parent_df["conversations"].apply(lambda x: x[0]["value"]).isin(user_prompts)]
|
|
|
|
logger.info(f"Found {len(parent_filtered)} rows in parent dataset after filtering")
|
|
|
|
final_data = pd.concat([parent_filtered, good_rows], ignore_index=True)
|
|
|
|
logger.info(f"Found {len(final_data)} rows in final dataset after combining")
|
|
|
|
# Combine filtered parent data with good rows
|
|
return final_data
|
|
|
|
|
|
def main():
|
|
# Set up argument parser
|
|
parser = argparse.ArgumentParser(description="Create filtered dataset from evaluation results")
|
|
datatrace_path = os.getenv("DATATRACE_PATH")
|
|
fullthoughts_path = os.getenv("FULLTHOUGHTS_PATH")
|
|
eval_paths = os.getenv("EVAL_PATH").split(",") if os.getenv("EVAL_PATH") else None
|
|
output_path = os.getenv("OUTPUT_PATH")
|
|
repo_name = os.getenv("REPO_NAME")
|
|
thoughts_repo_name = os.getenv("THOUGHTS_REPO_NAME")
|
|
|
|
# parser.add_argument("--datatrace_path", type=str, required=True, help="Path to datatrace CSV")
|
|
# parser.add_argument("--eval_path", type=str, required=True, help="Path to evaluation results CSV")
|
|
# parser.add_argument("--output_path", type=str, required=True, help="Path to save filtered dataset")
|
|
# parser.add_argument("--repo_name", type=str, required=True, help="HuggingFace repo in user/dataset format")
|
|
# args = parser.parse_args()
|
|
|
|
hf_token = os.getenv("HF_TOKEN")
|
|
|
|
try:
|
|
# Load data
|
|
datatrace_df = load_dataset_from_jsonl(datatrace_path)
|
|
eval_df = load_eval_results_from_csv(eval_paths)
|
|
|
|
# Get filtered rows
|
|
good_rows = get_good_dataset_rows(datatrace_df, eval_df)
|
|
|
|
# Load in the parent dataset, stratos
|
|
parent_dataset = load_dataset("bespokelabs/Bespoke-Stratos-17k", split=None)
|
|
logger.info(f"Loaded parent dataset with {len(parent_dataset)} rows")
|
|
|
|
# load full thoughts dataset
|
|
fullthoughts_df = load_dataset_from_jsonl(fullthoughts_path, replace_system_prompt=True)
|
|
logger.info(f"Loaded fullthoughts dataset with {len(fullthoughts_df)} rows")
|
|
|
|
# Convert to pandas and save as JSON
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
json_path = os.path.join(tmp_dir, "data.json")
|
|
# Dedupe the good_rows and parent_dataset.
|
|
deduplicated_rows = deduplicate_rows(good_rows, parent_dataset)
|
|
deduplicated_rows.to_json(json_path, orient="records", indent=2)
|
|
|
|
if repo_name and hf_token:
|
|
# Initialize HF API
|
|
api = HfApi(token=hf_token)
|
|
|
|
# Upload JSON file
|
|
api.upload_file(
|
|
path_or_fileobj=json_path,
|
|
path_in_repo="data.json",
|
|
repo_id=repo_name,
|
|
repo_type="dataset",
|
|
commit_message="Upload filtered dataset as JSON",
|
|
)
|
|
logging.info(f"Pushed JSON dataset with {len(good_rows)} rows to {repo_name}")
|
|
elif output_path:
|
|
# Save locally if no HF repo specified
|
|
output_json = os.path.join(output_path, "data.json")
|
|
good_rows.to_json(output_json, orient="records", indent=2)
|
|
logging.info(f"Saved JSON dataset with {len(good_rows)} rows to {output_json}")
|
|
|
|
if fullthoughts_path:
|
|
# Load fullthoughts dataset
|
|
good_rows = get_good_dataset_rows(fullthoughts_df, eval_df)
|
|
deduplicated_rows = deduplicate_rows(good_rows, parent_dataset)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
json_path = os.path.join(tmp_dir, "data.json")
|
|
deduplicated_rows.to_json(json_path, orient="records", indent=2)
|
|
|
|
if repo_name and hf_token:
|
|
api = HfApi(token=hf_token)
|
|
api.upload_file(
|
|
path_or_fileobj=json_path,
|
|
path_in_repo="data.json",
|
|
repo_id=thoughts_repo_name,
|
|
repo_type="dataset",
|
|
commit_message="Upload filtered dataset as JSON",
|
|
)
|
|
logging.info(f"Pushed JSON dataset with {len(fullthoughts_df)} rows to {repo_name}")
|
|
elif output_path:
|
|
output_json = os.path.join(output_path, "data.json")
|
|
fullthoughts_df.to_json(output_json, orient="records", indent=2)
|
|
logging.info(f"Saved JSON dataset with {len(fullthoughts_df)} rows to {output_json}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing dataset: {str(e)}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|