mirror of
https://github.com/Dictionarry-Hub/profilarr.git
synced 2026-01-20 12:37:47 -05:00
339 lines
15 KiB
Python
339 lines
15 KiB
Python
import yaml
|
|
from git import GitCommandError
|
|
import logging
|
|
from typing import Dict, Any
|
|
import os
|
|
from copy import deepcopy
|
|
from ...data.utils import CATEGORY_MAP
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def determine_type(file_path):
|
|
if 'regex_patterns' in file_path:
|
|
return 'Regex Pattern'
|
|
elif 'custom_formats' in file_path:
|
|
return 'Custom Format'
|
|
elif 'profiles' in file_path:
|
|
return 'Quality Profile'
|
|
return 'Unknown'
|
|
|
|
|
|
def get_version_data(repo, ref, file_path):
|
|
"""Get YAML data from a specific version of a file."""
|
|
try:
|
|
content = repo.git.show(f'{ref}:{file_path}')
|
|
return yaml.safe_load(content) if content else None
|
|
except GitCommandError:
|
|
return None
|
|
|
|
|
|
def resolve_conflicts(
|
|
repo, resolutions: Dict[str, Dict[str, str]]) -> Dict[str, Any]:
|
|
"""
|
|
Resolve merge conflicts based on provided resolutions.
|
|
"""
|
|
logger.debug(f"Received resolutions for files: {list(resolutions.keys())}")
|
|
logger.debug(f"Full resolutions data: {resolutions}")
|
|
|
|
try:
|
|
status = repo.git.status('--porcelain', '-z').split('\0')
|
|
conflicts = []
|
|
for item in status:
|
|
if not item or len(item) < 4:
|
|
continue
|
|
x, y, file_path = item[0], item[1], item[3:]
|
|
# Include modify/delete conflicts
|
|
if 'U' in (x, y) or (x == 'D' and y == 'D') or (
|
|
x == 'D' and y == 'U') or (x == 'U' and y == 'D'):
|
|
conflicts.append((file_path, x, y))
|
|
|
|
# Track which files are modify/delete conflicts
|
|
modify_delete_conflicts = {
|
|
path: (x == 'D' and y == 'U') or (x == 'U' and y == 'D')
|
|
for path, x, y in conflicts
|
|
}
|
|
|
|
# Validate resolutions are for actual conflicting files
|
|
for file_path in resolutions:
|
|
if file_path not in {path for path, _, _ in conflicts}:
|
|
return {
|
|
'success': False,
|
|
'error': f"File not in conflict: {file_path}"
|
|
}
|
|
|
|
# Store initial states for rollback
|
|
initial_states = {}
|
|
for file_path in resolutions:
|
|
try:
|
|
full_path = os.path.join(repo.working_dir, file_path)
|
|
try:
|
|
with open(full_path, 'r') as f:
|
|
initial_states[file_path] = f.read()
|
|
except FileNotFoundError:
|
|
initial_states[file_path] = None
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f"Couldn't read file {file_path}: {str(e)}"
|
|
}
|
|
|
|
results = {}
|
|
for file_path, field_resolutions in resolutions.items():
|
|
# Handle modify/delete conflicts differently
|
|
if modify_delete_conflicts[file_path]:
|
|
logger.debug(
|
|
f"Handling modify/delete conflict for {file_path}")
|
|
logger.debug(f"Field resolutions for modify/delete: {field_resolutions}")
|
|
|
|
# Get the existing version (either from HEAD or MERGE_HEAD)
|
|
head_data = get_version_data(repo, 'HEAD', file_path)
|
|
merge_head_data = get_version_data(repo, 'MERGE_HEAD',
|
|
file_path)
|
|
|
|
# Determine which version exists
|
|
is_deleted_in_head = head_data is None
|
|
existing_data = merge_head_data if is_deleted_in_head else head_data
|
|
logger.debug(f"Existing version data: {existing_data}")
|
|
logger.debug(f"is_deleted_in_head: {is_deleted_in_head}")
|
|
logger.debug(f"head_data: {head_data}")
|
|
logger.debug(f"merge_head_data: {merge_head_data}")
|
|
|
|
# Try both lowercase and capitalized versions of 'file'
|
|
choice = field_resolutions.get('file') or field_resolutions.get('File')
|
|
logger.debug(f"Resolution choice for file: {choice}")
|
|
|
|
if not choice:
|
|
logger.error("No 'file' or 'File' resolution found in field_resolutions!")
|
|
logger.error(f"Available keys: {list(field_resolutions.keys())}")
|
|
raise Exception(
|
|
"No resolution provided for modify/delete conflict")
|
|
|
|
full_path = os.path.join(repo.working_dir, file_path)
|
|
|
|
if choice == 'local':
|
|
if is_deleted_in_head:
|
|
logger.debug(f"Keeping file deleted: {file_path}")
|
|
# File should stay deleted
|
|
try:
|
|
os.remove(full_path)
|
|
except FileNotFoundError:
|
|
pass # File is already gone
|
|
repo.index.remove([file_path])
|
|
else:
|
|
logger.debug(f"Keeping local version: {file_path}")
|
|
# Keep our version
|
|
with open(full_path, 'w') as f:
|
|
yaml.safe_dump(head_data,
|
|
f,
|
|
default_flow_style=False)
|
|
repo.index.add([file_path])
|
|
|
|
elif choice == 'incoming':
|
|
if is_deleted_in_head:
|
|
logger.debug(
|
|
f"Restoring from incoming version: {file_path}")
|
|
# Restore the file from MERGE_HEAD
|
|
with open(full_path, 'w') as f:
|
|
yaml.safe_dump(merge_head_data,
|
|
f,
|
|
default_flow_style=False)
|
|
repo.index.add([file_path])
|
|
else:
|
|
logger.debug(f"Accepting deletion: {file_path}")
|
|
# Accept the deletion
|
|
try:
|
|
os.remove(full_path)
|
|
except FileNotFoundError:
|
|
pass # File is already gone
|
|
repo.index.remove([file_path])
|
|
|
|
results[file_path] = {
|
|
'resolution':
|
|
choice,
|
|
'action':
|
|
'delete' if (choice == 'local' and is_deleted_in_head) or
|
|
(choice == 'incoming' and not is_deleted_in_head) else
|
|
'keep'
|
|
}
|
|
|
|
else:
|
|
# Regular conflict resolution
|
|
# Get all three versions
|
|
base_data = get_version_data(repo, 'HEAD^', file_path)
|
|
ours_data = get_version_data(repo, 'HEAD', file_path)
|
|
theirs_data = get_version_data(repo, 'MERGE_HEAD', file_path)
|
|
|
|
# For files that were previously involved in modify/delete conflicts
|
|
# we may not be able to get all versions
|
|
if not base_data or not ours_data or not theirs_data:
|
|
logger.warning(f"Couldn't get all versions of {file_path} - may have been previously resolved as a modify/delete conflict")
|
|
logger.warning(f"base_data: {base_data}, ours_data: {ours_data}, theirs_data: {theirs_data}")
|
|
|
|
# If it was previously resolved as "incoming" but ours_data is missing, use theirs_data
|
|
if not ours_data and theirs_data:
|
|
logger.info(f"Using incoming version for {file_path} as base for resolution")
|
|
ours_data = theirs_data
|
|
# If it was previously resolved as "local" but theirs_data is missing, use ours_data
|
|
elif ours_data and not theirs_data:
|
|
logger.info(f"Using local version for {file_path} as base for resolution")
|
|
theirs_data = ours_data
|
|
# If we can't recover either version, we can't proceed
|
|
else:
|
|
raise Exception(f"Couldn't get required versions of {file_path}")
|
|
|
|
# Start with a deep copy of ours_data to preserve all fields
|
|
resolved_data = deepcopy(ours_data)
|
|
|
|
# Track changes
|
|
kept_values = {}
|
|
discarded_values = {}
|
|
|
|
# Handle each resolution field
|
|
for field, choice in field_resolutions.items():
|
|
if field.startswith('custom_format_'):
|
|
format_name = field[len('custom_format_'):]
|
|
|
|
ours_cf = next(
|
|
(item
|
|
for item in ours_data.get('custom_formats', [])
|
|
if item['name'] == format_name), None)
|
|
theirs_cf = next(
|
|
(item
|
|
for item in theirs_data.get('custom_formats', [])
|
|
if item['name'] == format_name), None)
|
|
|
|
if choice == 'local' and ours_cf:
|
|
resolved_cf = ours_cf
|
|
kept_values[field] = ours_cf
|
|
discarded_values[field] = theirs_cf
|
|
elif choice == 'incoming' and theirs_cf:
|
|
resolved_cf = theirs_cf
|
|
kept_values[field] = theirs_cf
|
|
discarded_values[field] = ours_cf
|
|
else:
|
|
raise Exception(
|
|
f"Invalid choice or missing custom format {format_name}"
|
|
)
|
|
|
|
resolved_cf_list = resolved_data.get(
|
|
'custom_formats', [])
|
|
for idx, item in enumerate(resolved_cf_list):
|
|
if item['name'] == format_name:
|
|
resolved_cf_list[idx] = resolved_cf
|
|
break
|
|
else:
|
|
resolved_cf_list.append(resolved_cf)
|
|
resolved_data['custom_formats'] = resolved_cf_list
|
|
|
|
elif field.startswith('tag_'):
|
|
tag_name = field[len('tag_'):]
|
|
current_tags = set(resolved_data.get('tags', []))
|
|
|
|
if choice == 'local':
|
|
if tag_name in ours_data.get('tags', []):
|
|
current_tags.add(tag_name)
|
|
kept_values[field] = 'local'
|
|
discarded_values[field] = 'incoming'
|
|
else:
|
|
current_tags.discard(tag_name)
|
|
kept_values[field] = 'none'
|
|
discarded_values[field] = 'incoming'
|
|
elif choice == 'incoming':
|
|
if tag_name in theirs_data.get('tags', []):
|
|
current_tags.add(tag_name)
|
|
kept_values[field] = 'incoming'
|
|
discarded_values[field] = 'local'
|
|
else:
|
|
current_tags.discard(tag_name)
|
|
kept_values[field] = 'none'
|
|
discarded_values[field] = 'local'
|
|
else:
|
|
raise Exception(
|
|
f"Invalid choice for tag field: {field}")
|
|
|
|
resolved_data['tags'] = sorted(current_tags)
|
|
|
|
else:
|
|
field_key = field
|
|
if choice == 'local':
|
|
resolved_data[field_key] = ours_data.get(field_key)
|
|
kept_values[field_key] = ours_data.get(field_key)
|
|
discarded_values[field_key] = theirs_data.get(
|
|
field_key)
|
|
elif choice == 'incoming':
|
|
resolved_data[field_key] = theirs_data.get(
|
|
field_key)
|
|
kept_values[field_key] = theirs_data.get(field_key)
|
|
discarded_values[field_key] = ours_data.get(
|
|
field_key)
|
|
else:
|
|
raise Exception(
|
|
f"Invalid choice for field: {field}")
|
|
|
|
# Get file type and apply appropriate field ordering
|
|
file_type = determine_type(file_path)
|
|
if file_type == 'Quality Profile':
|
|
_, fields = CATEGORY_MAP['profile']
|
|
elif file_type == 'Custom Format':
|
|
_, fields = CATEGORY_MAP['custom_format']
|
|
elif file_type == 'Regex Pattern':
|
|
_, fields = CATEGORY_MAP['regex_pattern']
|
|
|
|
# Order the fields according to the category's field order
|
|
ordered_data = {
|
|
field: resolved_data.get(field)
|
|
for field in fields if field in resolved_data
|
|
}
|
|
resolved_data = ordered_data
|
|
|
|
# Write resolved version
|
|
full_path = os.path.join(repo.working_dir, file_path)
|
|
with open(full_path, 'w') as f:
|
|
yaml.safe_dump(resolved_data, f, default_flow_style=False)
|
|
|
|
# Stage the resolved file
|
|
repo.index.add([file_path])
|
|
|
|
results[file_path] = {
|
|
'kept_values': kept_values,
|
|
'discarded_values': discarded_values
|
|
}
|
|
|
|
logger.debug(
|
|
f"Successfully resolved regular conflict for {file_path}")
|
|
|
|
logger.debug("==== Status after resolve_conflicts ====")
|
|
status_output = repo.git.status('--porcelain', '-z').split('\0')
|
|
for item in status_output:
|
|
if item:
|
|
logger.debug(f"File status: {item}")
|
|
logger.debug("=======================================")
|
|
|
|
# Reload cache after conflict resolution
|
|
from ...data.cache import data_cache
|
|
logger.info("Reloading data cache after conflict resolution")
|
|
data_cache.initialize(force_reload=True)
|
|
|
|
return {'success': True, 'results': results}
|
|
|
|
except Exception as e:
|
|
# Rollback on any error
|
|
for file_path, initial_state in initial_states.items():
|
|
try:
|
|
full_path = os.path.join(repo.working_dir, file_path)
|
|
if initial_state is None:
|
|
try:
|
|
os.remove(full_path)
|
|
except FileNotFoundError:
|
|
pass
|
|
else:
|
|
with open(full_path, 'w') as f:
|
|
f.write(initial_state)
|
|
except Exception as rollback_error:
|
|
logger.error(
|
|
f"Failed to rollback {file_path}: {str(rollback_error)}")
|
|
|
|
logger.error(f"Failed to resolve conflicts: {str(e)}")
|
|
return {'success': False, 'error': str(e)}
|