Files
AdventureLog/backend/server/adventures/views/location_view.py
Sean Morley 97f4a47ffd feat(lodging): implement quick start feature for lodging creation
- Added LodgingQuickStart component to facilitate quick lodging entry.
- Integrated Google Maps support for lodging selection and details enrichment.
- Enhanced LodgingModal to include quick start step and handle prefill from Google Places.
- Introduced utility function to infer lodging type from Google Places data.
- Updated localization files to include new strings for quick start functionality.
2026-04-24 22:12:10 -04:00

784 lines
31 KiB
Python

import logging
from django.utils import timezone
from django.db import transaction
from django.core.exceptions import PermissionDenied
from django.core.files.base import ContentFile
from django.db.models import Q, Max, Prefetch
from django.db.models.functions import Lower
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
import requests
from adventures.models import Location, Category, Collection, CollectionItineraryItem, ContentImage, Visit
from django.contrib.contenttypes.models import ContentType
from adventures.permissions import IsOwnerOrSharedWithFullAccess
from adventures.serializers import LocationSerializer, MapPinSerializer, CalendarLocationSerializer
from adventures.utils import pagination
from adventures.geocoding import reverse_geocode
from worldtravel.models import City, Country, Region
from .location_image_view import import_remote_images_for_object
from .quick_add_utils import (
build_quick_add_description,
clean_url,
coerce_bool,
coerce_coordinate,
coerce_float,
coerce_int,
extract_google_place_details,
preferred_link,
resolve_quick_add_collection,
sanitize_photo_urls,
sanitize_tags,
)
logger = logging.getLogger(__name__)
class LocationViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing Adventure objects with support for filtering, sorting,
and sharing functionality.
"""
serializer_class = LocationSerializer
permission_classes = [IsOwnerOrSharedWithFullAccess]
pagination_class = pagination.StandardResultsSetPagination
# ==================== QUERYSET & PERMISSIONS ====================
def get_queryset(self):
"""
Returns queryset based on user authentication and action type.
Public actions allow unauthenticated access to public locations.
"""
user = self.request.user
public_allowed_actions = {'retrieve', 'additional_info'}
if not user.is_authenticated:
if self.action in public_allowed_actions:
return Location.objects.retrieve_locations(
user, include_public=True
).order_by('-updated_at')
return Location.objects.none()
include_public = self.action in public_allowed_actions
return Location.objects.retrieve_locations(
user,
include_public=include_public,
include_owned=True,
include_shared=True
).order_by('-updated_at')
# ==================== SORTING & FILTERING ====================
def apply_sorting(self, queryset):
"""Apply sorting and collection filtering to queryset."""
order_by = self.request.query_params.get('order_by', 'updated_at')
order_direction = self.request.query_params.get('order_direction', 'asc')
include_collections = self.request.query_params.get('include_collections', 'true')
# Validate parameters
valid_order_by = ['name', 'type', 'date', 'rating', 'updated_at']
if order_by not in valid_order_by:
order_by = 'name'
if order_direction not in ['asc', 'desc']:
order_direction = 'asc'
# Apply sorting logic
queryset = self._apply_ordering(queryset, order_by, order_direction)
# Filter locations without collections if requested
if include_collections == 'false':
queryset = queryset.filter(collections__isnull=True)
return queryset
def _apply_ordering(self, queryset, order_by, order_direction):
"""Apply ordering to queryset based on field type."""
if order_by == 'date':
queryset = queryset.annotate(
latest_visit=Max('visits__start_date')
).filter(latest_visit__isnull=False)
ordering = 'latest_visit'
elif order_by == 'name':
queryset = queryset.annotate(lower_name=Lower('name'))
ordering = 'lower_name'
elif order_by == 'rating':
queryset = queryset.filter(rating__isnull=False)
ordering = 'rating'
elif order_by == 'updated_at':
# Special handling for updated_at (reverse default order)
ordering = '-updated_at' if order_direction == 'asc' else 'updated_at'
return queryset.order_by(ordering)
else:
ordering = order_by
# Apply direction
if order_direction == 'desc':
ordering = f'-{ordering}'
return queryset.order_by(ordering)
# ==================== CRUD OPERATIONS ====================
@transaction.atomic
def perform_create(self, serializer):
"""Create adventure with collection validation and ownership logic."""
collections = serializer.validated_data.get('collections', [])
# Validate permissions for all collections
self._validate_collection_permissions(collections)
# Determine what user to assign as owner
user_to_assign = self.request.user
if collections:
# Use the current user as owner since ManyToMany allows multiple collection owners
user_to_assign = self.request.user
serializer.save(user=user_to_assign)
def perform_update(self, serializer):
"""Update adventure."""
# Just save the adventure - the signal will handle publicity updates
serializer.save()
def update(self, request, *args, **kwargs):
"""Handle adventure updates with collection permission validation."""
instance = self.get_object()
partial = kwargs.pop('partial', False)
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
# Validate collection permissions if collections are being updated
if 'collections' in serializer.validated_data:
self._validate_collection_update_permissions(
instance, serializer.validated_data['collections']
)
else:
# Remove collections from validated_data if not provided
serializer.validated_data.pop('collections', None)
self.perform_update(serializer)
return Response(serializer.data)
def destroy(self, request, *args, **kwargs):
"""Only allow the owner to delete a location."""
instance = self.get_object()
# Check if the user is the owner
if instance.user != request.user:
raise PermissionDenied("Only the owner can delete this location.")
return super().destroy(request, *args, **kwargs)
# ==================== CUSTOM ACTIONS ====================
@action(detail=False, methods=['post'], url_path='quick-add')
@transaction.atomic
def quick_add(self, request):
"""Create a location from lightweight map/place input in one server-side call."""
payload = request.data if isinstance(request.data, dict) else {}
name = str(payload.get('name') or '').strip()
if not name:
return Response({"error": "name is required"}, status=status.HTTP_400_BAD_REQUEST)
latitude = coerce_coordinate(payload.get('latitude'), -90, 90)
longitude = coerce_coordinate(payload.get('longitude'), -180, 180)
if latitude is None or longitude is None:
return Response(
{"error": "Valid latitude and longitude are required"},
status=status.HTTP_400_BAD_REQUEST,
)
collection = self._resolve_quick_add_collection(payload.get('collection_id'))
if isinstance(collection, Response):
return collection
reverse_data = {}
_, details = extract_google_place_details(payload, fallback_query=name)
try:
reverse_result = reverse_geocode(latitude, longitude, request.user)
if isinstance(reverse_result, dict) and 'error' not in reverse_result:
reverse_data = reverse_result
except Exception:
reverse_data = {}
rating = coerce_float(payload.get('rating'))
if rating is None:
rating = coerce_float(details.get('rating'))
review_count = coerce_int(payload.get('review_count'))
if review_count is None:
review_count = coerce_int(details.get('review_count'))
link = preferred_link(payload, details)
phone_number = str(details.get('phone_number') or payload.get('phone_number') or '').strip() or None
location_label = (
str(payload.get('location') or '').strip()
or str(reverse_data.get('display_name') or '').strip()
or str(details.get('formatted_address') or '').strip()
or None
)
description = build_quick_add_description(
base_description=payload.get('description'),
detailed_description=details.get('description'),
)
category_payload = self._normalize_quick_add_category(payload.get('category'))
if isinstance(category_payload, Response):
return category_payload
serializer_payload = {
'name': name,
'location': location_label,
'latitude': latitude,
'longitude': longitude,
'rating': rating,
'description': description,
'link': link,
'tags': sanitize_tags(payload.get('types') or payload.get('tags')),
'is_public': coerce_bool(payload.get('is_public'), default=False),
}
if category_payload:
serializer_payload['category'] = category_payload
if collection:
serializer_payload['collections'] = [str(collection.id)]
serializer = self.get_serializer(data=serializer_payload)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
location = serializer.instance
self._apply_reverse_geocode_metadata(location, reverse_data, location_label)
photo_urls = sanitize_photo_urls(payload.get('photos'))
image_import_summary = None
if photo_urls:
image_import_summary = import_remote_images_for_object(
location,
photo_urls,
owner=location.user,
max_workers=min(5, len(photo_urls)),
)
response_data = self.get_serializer(location).data
if image_import_summary and image_import_summary.get('failed'):
response_data['quick_add_image_import'] = {
'created_count': image_import_summary['created_count'],
'failed_count': image_import_summary['failed_count'],
'failed': image_import_summary['failed'],
}
return Response(response_data, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'])
def filtered(self, request):
"""Filter locations by category types and visit status."""
types = request.query_params.get('types', '').split(',')
# Handle 'all' types
if 'all' in types:
types = Category.objects.filter(
user=request.user
).values_list('name', flat=True)
else:
# Validate provided types
if not types or not all(
Category.objects.filter(user=request.user, name=type_name).exists()
for type_name in types
):
return Response(
{"error": "Invalid category or no types provided"},
status=400
)
# Build base queryset
queryset = Location.objects.filter(
category__in=Category.objects.filter(name__in=types, user=request.user),
user=request.user.id
)
# Apply visit status filtering
queryset = self._apply_visit_filtering(queryset, request)
queryset = self.apply_sorting(queryset)
return self.paginate_and_respond(queryset, request)
@action(detail=False, methods=['get'])
def all(self, request):
"""Get all locations (public and owned) with optional collection filtering."""
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
include_collections = request.query_params.get('include_collections', 'false') == 'true'
nested = request.query_params.get('nested', 'false') == 'true'
allowedNestedFields = request.query_params.get('allowed_nested_fields', '').split(',')
# Build queryset with collection filtering
base_filter = Q(user=request.user.id)
if include_collections:
queryset = Location.objects.filter(base_filter)
else:
queryset = Location.objects.filter(base_filter, collections__isnull=True)
queryset = self.apply_sorting(queryset)
serializer = self.get_serializer(queryset, many=True, context={'nested': nested, 'allowed_nested_fields': allowedNestedFields})
return Response(serializer.data)
@action(detail=False, methods=['get'])
def calendar(self, request):
"""Return a lightweight payload for calendar rendering."""
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
queryset = (
self.get_queryset()
.filter(visits__isnull=False)
.select_related('category')
.prefetch_related(
Prefetch(
'visits',
queryset=Visit.objects.only('id', 'start_date', 'end_date', 'timezone')
)
)
.only('id', 'name', 'location', 'category__name', 'category__icon')
.distinct()
)
serializer = CalendarLocationSerializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'], url_path='additional-info')
def additional_info(self, request, pk=None):
"""Get adventure with additional sunrise/sunset information."""
adventure = self.get_object()
user = request.user
# Validate access permissions
if not self._has_adventure_access(adventure, user):
return Response(
{"error": "User does not have permission to access this adventure"},
status=status.HTTP_403_FORBIDDEN
)
# Get base adventure data
serializer = self.get_serializer(adventure)
response_data = serializer.data
# Add sunrise/sunset data
response_data['sun_times'] = self._get_sun_times(adventure, response_data.get('visits', []))
return Response(response_data)
@action(detail=True, methods=['post'])
def duplicate(self, request, pk=None):
"""Create a duplicate of an existing location.
Copies all fields except collections and visits. Images are duplicated as
independent files (not shared references). The name is prefixed with
"Copy of " and is_public is reset to False.
"""
original = self.get_object()
# Verify the requesting user owns the location or has access
if not self._has_adventure_access(original, request.user):
return Response(
{"error": "You do not have permission to duplicate this location."},
status=status.HTTP_403_FORBIDDEN,
)
try:
with transaction.atomic():
target_collection = None
target_collection_id = request.data.get('collection_id')
if target_collection_id:
try:
target_collection = Collection.objects.get(id=target_collection_id)
except Collection.DoesNotExist:
return Response(
{"error": "Collection not found."},
status=status.HTTP_404_NOT_FOUND,
)
user_can_link_to_collection = (
target_collection.user == request.user
or target_collection.shared_with.filter(uuid=request.user.uuid).exists()
)
if not user_can_link_to_collection:
return Response(
{"error": "You do not have permission to add locations to this collection."},
status=status.HTTP_403_FORBIDDEN,
)
# Snapshot original images before creating the copy
original_images = list(original.images.all())
# Build the new location
new_location = Location(
user=request.user,
name=f"Copy of {original.name}",
description=original.description,
rating=original.rating,
link=original.link,
location=original.location,
tags=list(original.tags) if original.tags else None,
is_public=False,
longitude=original.longitude,
latitude=original.latitude,
city=original.city,
region=original.region,
country=original.country,
price=original.price,
price_currency=original.price_currency,
)
# Handle category: reuse the user's own matching category or
# create one if necessary.
if original.category:
category, _ = Category.objects.get_or_create(
user=request.user,
name=original.category.name,
defaults={
'display_name': original.category.display_name,
'icon': original.category.icon,
},
)
new_location.category = category
new_location.save()
# If requested, link the duplicate only to the current collection.
# This avoids accidentally inheriting all source collections.
if target_collection:
new_location.collections.set([target_collection])
# Duplicate images as independent files/new records
location_ct = ContentType.objects.get_for_model(Location)
for img in original_images:
if img.image:
try:
img.image.open('rb')
image_bytes = img.image.read()
finally:
try:
img.image.close()
except Exception:
pass
file_name = (img.image.name or '').split('/')[-1] or 'image.webp'
ContentImage.objects.create(
content_type=location_ct,
object_id=str(new_location.id),
image=ContentFile(image_bytes, name=file_name),
immich_id=None,
is_primary=img.is_primary,
user=request.user,
)
else:
ContentImage.objects.create(
content_type=location_ct,
object_id=str(new_location.id),
immich_id=img.immich_id,
is_primary=img.is_primary,
user=request.user,
)
serializer = self.get_serializer(new_location)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except Exception:
logger.exception("Failed to duplicate location %s", pk)
return Response(
{"error": "An error occurred while duplicating the location."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# view to return location name and lat/lon for all locations a user owns for the golobal map
@action(detail=False, methods=['get'], url_path='pins')
def map_locations(self, request):
"""Get all locations with name and lat/lon for map display."""
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
locations = Location.objects.filter(user=request.user)
serializer = MapPinSerializer(locations, many=True)
return Response(serializer.data)
# ==================== HELPER METHODS ====================
def _validate_collection_update_permissions(self, instance, new_collections):
"""Validate collection permissions for updates, allowing collection owners to unlink locations."""
current_collections = set(instance.collections.all())
new_collections_set = set(new_collections)
# Collections being added
collections_to_add = new_collections_set - current_collections
# Collections being removed
collections_to_remove = current_collections - new_collections_set
# Validate permissions for collections being added
for collection in collections_to_add:
# Standard validation for adding collections
if collection.user != self.request.user:
# Check if user has shared access to the collection
if not collection.shared_with.filter(uuid=self.request.user.uuid).exists():
raise PermissionDenied(
f"You don't have permission to add location to collection '{collection.name}'"
)
# For collections being removed, allow if:
# 1. User owns the location, OR
# 2. User owns the collection (even if they don't own the location)
for collection in collections_to_remove:
user_owns_location = instance.user == self.request.user
user_owns_collection = collection.user == self.request.user
if not (user_owns_location or user_owns_collection):
# Check if user has shared access to the collection
if not collection.shared_with.filter(uuid=self.request.user.uuid).exists():
raise PermissionDenied(
f"You don't have permission to remove this location from one of the collections it's linked to.'"
)
else:
# If the removal is permitted, also remove any itinerary items
# in this collection that reference this Location instance.
try:
ct = ContentType.objects.get_for_model(instance.__class__)
# Try deleting by native PK type first, then by string.
qs = CollectionItineraryItem.objects.filter(
collection=collection, content_type=ct, object_id=instance.pk
)
if qs.exists():
qs.delete()
else:
CollectionItineraryItem.objects.filter(
collection=collection, content_type=ct, object_id=str(instance.pk)
).delete()
except Exception:
# Don't raise on cleanup failures; deletion of itinerary items
# is best-effort and shouldn't block the update operation.
pass
def _validate_collection_permissions(self, collections):
"""Validate permissions for all collections (used in create)."""
for collection in collections:
if collection.user != self.request.user:
# Check if user has shared access to the collection
if not collection.shared_with.filter(uuid=self.request.user.uuid).exists():
raise PermissionDenied(
f"You don't have permission to add location to collection '{collection.name}'"
)
def _resolve_quick_add_collection(self, collection_id):
return resolve_quick_add_collection(
collection_id,
validate_permissions=self._validate_collection_permissions,
permission_error_message=(
"You do not have permission to add this location to the selected collection."
),
)
def _coerce_coordinate(self, value, min_value, max_value):
return coerce_coordinate(value, min_value, max_value)
def _coerce_float(self, value):
return coerce_float(value)
def _coerce_int(self, value):
return coerce_int(value)
def _coerce_bool(self, value, default=False):
return coerce_bool(value, default=default)
def _clean_url(self, value):
return clean_url(value)
def _sanitize_tags(self, raw_tags):
return sanitize_tags(raw_tags)
def _sanitize_photo_urls(self, raw_urls):
return sanitize_photo_urls(raw_urls)
def _normalize_quick_add_category(self, raw_category):
if not raw_category:
return None
if isinstance(raw_category, dict):
category_id = raw_category.get('id')
name = str(raw_category.get('name') or '').strip().lower()
display_name = str(raw_category.get('display_name') or '').strip()
icon = str(raw_category.get('icon') or '').strip() or '🌍'
elif isinstance(raw_category, str):
category_id = raw_category.strip()
name = ''
display_name = ''
icon = '🌍'
else:
return Response(
{"error": "category must be an object or string"},
status=status.HTTP_400_BAD_REQUEST,
)
category = None
if category_id:
category = Category.objects.filter(id=category_id, user=self.request.user).first()
if not category:
return Response(
{"error": "Category not found or inaccessible"},
status=status.HTTP_400_BAD_REQUEST,
)
if category:
return {
'name': category.name,
'display_name': category.display_name,
'icon': category.icon,
}
if not name:
return None
return {
'name': name,
'display_name': display_name or name,
'icon': icon,
}
def _build_quick_add_description(
self,
base_description,
detailed_description,
):
return build_quick_add_description(base_description, detailed_description)
def _apply_reverse_geocode_metadata(self, location, reverse_data, fallback_location):
if not isinstance(reverse_data, dict):
reverse_data = {}
updated_fields = []
region_id = reverse_data.get('region_id')
if region_id:
region = Region.objects.filter(id=region_id).first()
if region and location.region_id != region.id:
location.region = region
updated_fields.append('region')
city_id = reverse_data.get('city_id')
if city_id:
city = City.objects.filter(id=city_id).first()
if city and location.city_id != city.id:
location.city = city
updated_fields.append('city')
country_id = reverse_data.get('country_id')
if country_id:
country = Country.objects.filter(country_code=country_id).first()
if country and location.country_id != country.id:
location.country = country
updated_fields.append('country')
if fallback_location and not location.location:
location.location = fallback_location
updated_fields.append('location')
if updated_fields:
location.save(update_fields=updated_fields, _skip_geocode=True)
def _apply_visit_filtering(self, queryset, request):
"""Apply visit status filtering to queryset."""
is_visited_param = request.query_params.get('is_visited')
if is_visited_param is None:
return queryset
# Convert parameter to boolean
if is_visited_param.lower() == 'true':
is_visited_bool = True
elif is_visited_param.lower() == 'false':
is_visited_bool = False
else:
return queryset
# Apply visit filtering
now = timezone.now().date()
if is_visited_bool:
queryset = queryset.filter(visits__start_date__lte=now).distinct()
else:
queryset = queryset.exclude(visits__start_date__lte=now).distinct()
return queryset
def _has_adventure_access(self, adventure, user):
"""Check if user has access to adventure."""
# Allow if public
if adventure.is_public:
return True
# Check ownership
if user.is_authenticated and adventure.user == user:
return True
# Check shared collection access
if user.is_authenticated:
for collection in adventure.collections.all():
if collection.shared_with.filter(uuid=user.uuid).exists() or collection.user == user:
return True
return False
def _get_sun_times(self, adventure, visits):
"""Get sunrise/sunset times for adventure visits."""
sun_times = []
for visit in visits:
date = visit.get('start_date')
if not (date and adventure.longitude and adventure.latitude):
continue
api_url = (
f'https://api.sunrisesunset.io/json?'
f'lat={adventure.latitude}&lng={adventure.longitude}&date={date}'
)
try:
response = requests.get(api_url)
if response.status_code == 200:
data = response.json()
results = data.get('results', {})
if results.get('sunrise') and results.get('sunset'):
sun_times.append({
"date": date,
"visit_id": visit.get('id'),
"sunrise": results.get('sunrise'),
"sunset": results.get('sunset')
})
except requests.RequestException:
# Skip this visit if API call fails
continue
return sun_times
def paginate_and_respond(self, queryset, request):
"""Paginate queryset and return response."""
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
if page is not None:
serializer = self.get_serializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)