mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-12-23 22:48:20 -05:00
fix: Improve recipe bulk deletion (#6772)
This commit is contained in:
@@ -5,6 +5,7 @@ from pydantic import UUID4
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from mealie.db.models._model_utils.guid import GUID
|
||||
from mealie.db.models.group import Group, ReportEntryModel, ReportModel
|
||||
from mealie.db.models.group.exports import GroupDataExportsModel
|
||||
from mealie.db.models.group.preferences import GroupPreferencesModel
|
||||
@@ -117,6 +118,12 @@ class AllRepositories:
|
||||
self.group_id = group_id
|
||||
self.household_id = household_id
|
||||
|
||||
def uuid_to_str(self, val: UUID4) -> str:
|
||||
"""Helper method to convert a UUID into a database-safe string"""
|
||||
|
||||
dialect = self.session.bind.dialect
|
||||
return GUID.convert_value_to_guid(val, dialect) or ""
|
||||
|
||||
# ================================================================
|
||||
# Recipe
|
||||
|
||||
|
||||
@@ -130,7 +130,7 @@ class RepositoryRecipes(HouseholdRepositoryGeneric[Recipe, RecipeModel]):
|
||||
return self._delete_recipe(recipe_in_db)
|
||||
|
||||
def delete_many(self, values: Iterable) -> list[Recipe]:
|
||||
query = self._query().filter(self.model.id.in_(values))
|
||||
query = self._query().filter(self.model.slug.in_(values)).filter_by(**self._filter_builder())
|
||||
recipes_in_db = self.session.execute(query).unique().scalars().all()
|
||||
results: list[Recipe] = []
|
||||
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from pydantic import UUID4
|
||||
|
||||
from mealie.core.dependencies.dependencies import get_temporary_zip_path
|
||||
from mealie.core.exceptions import PermissionDenied
|
||||
from mealie.core.security import create_file_token
|
||||
from mealie.routes._base import BaseUserController, controller
|
||||
from mealie.schema.group.group_exports import GroupDataExport
|
||||
@@ -15,8 +16,9 @@ from mealie.schema.recipe.recipe_bulk_actions import (
|
||||
DeleteRecipes,
|
||||
ExportRecipes,
|
||||
)
|
||||
from mealie.schema.response.responses import SuccessResponse
|
||||
from mealie.schema.response.responses import ErrorResponse, SuccessResponse
|
||||
from mealie.services.recipe.recipe_bulk_service import RecipeBulkActionsService
|
||||
from mealie.services.recipe.recipe_service import RecipeService
|
||||
|
||||
router = APIRouter(prefix="/bulk-actions")
|
||||
|
||||
@@ -27,6 +29,10 @@ class RecipeBulkActionsController(BaseUserController):
|
||||
def service(self) -> RecipeBulkActionsService:
|
||||
return RecipeBulkActionsService(self.repos, self.user, self.group)
|
||||
|
||||
@cached_property
|
||||
def recipe_service(self) -> RecipeService:
|
||||
return RecipeService(self.repos, self.user, self.household, self.translator)
|
||||
|
||||
# TODO Should these actions return some success response?
|
||||
@router.post("/tag")
|
||||
def bulk_tag_recipes(self, tag_data: AssignTags):
|
||||
@@ -42,7 +48,14 @@ class RecipeBulkActionsController(BaseUserController):
|
||||
|
||||
@router.post("/delete")
|
||||
def bulk_delete_recipes(self, delete_recipes: DeleteRecipes):
|
||||
self.service.delete_recipes(delete_recipes.recipes)
|
||||
# TODO: this route should be migrated to the standard recipe controller
|
||||
try:
|
||||
self.recipe_service.delete_many(delete_recipes.recipes)
|
||||
except PermissionDenied as e:
|
||||
self.logger.error("Permission Denied on recipe controller action")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail=ErrorResponse.respond(message="Permission Denied")
|
||||
) from e
|
||||
|
||||
@router.post("/export", status_code=202)
|
||||
def bulk_export_recipes(self, export_recipes: ExportRecipes):
|
||||
|
||||
@@ -110,11 +110,3 @@ class RecipeBulkActionsService(BaseService):
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to categorize recipe {slug}")
|
||||
self.logger.error(e)
|
||||
|
||||
def delete_recipes(self, recipes: list[str]) -> None:
|
||||
for slug in recipes:
|
||||
try:
|
||||
self.repos.recipes.delete(slug)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to delete recipe {slug}")
|
||||
self.logger.error(e)
|
||||
|
||||
@@ -4,10 +4,12 @@ import shutil
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from shutil import copytree, rmtree
|
||||
from textwrap import dedent
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
from zipfile import ZipFile
|
||||
|
||||
import sqlalchemy as sa
|
||||
from fastapi import UploadFile
|
||||
|
||||
from mealie.core import exceptions
|
||||
@@ -64,31 +66,57 @@ class RecipeService(RecipeServiceBase):
|
||||
raise exceptions.NoEntryFound("Recipe not found.")
|
||||
return recipe
|
||||
|
||||
def can_delete(self, recipe: Recipe) -> bool:
|
||||
def can_delete(self, recipe_slugs: list[str]) -> bool:
|
||||
if self.user.admin:
|
||||
return True
|
||||
else:
|
||||
return self.can_update(recipe)
|
||||
return self.can_update(recipe_slugs)
|
||||
|
||||
def can_update(self, recipe: Recipe) -> bool:
|
||||
if recipe.settings is None:
|
||||
raise exceptions.UnexpectedNone("Recipe Settings is None")
|
||||
def can_update(self, recipe_slugs: list[str]) -> bool:
|
||||
sql = dedent(
|
||||
"""
|
||||
SELECT
|
||||
CASE
|
||||
WHEN COUNT(*) = SUM(
|
||||
CASE
|
||||
-- User owns the recipe
|
||||
WHEN r.user_id = :user_id THEN 1
|
||||
|
||||
# Check if this user owns the recipe
|
||||
if self.user.id == recipe.user_id:
|
||||
return True
|
||||
-- Not owner: check if recipe is locked
|
||||
WHEN COALESCE(rs.locked, TRUE) = TRUE THEN 0
|
||||
|
||||
# Check if this user has permission to edit this recipe
|
||||
if self.household.id != recipe.household_id:
|
||||
other_household = self.repos.households.get_one(recipe.household_id)
|
||||
if not (other_household and other_household.preferences):
|
||||
return False
|
||||
if other_household.preferences.lock_recipe_edits_from_other_households:
|
||||
return False
|
||||
if recipe.settings.locked:
|
||||
return False
|
||||
-- Different household: check household policy
|
||||
WHEN
|
||||
u.household_id != :household_id
|
||||
AND COALESCE(hp.lock_recipe_edits_from_other_households, TRUE) = TRUE
|
||||
THEN 0
|
||||
|
||||
return True
|
||||
-- All other cases: can update
|
||||
ELSE 1
|
||||
END
|
||||
) THEN 1
|
||||
ELSE 0
|
||||
END AS all_can_update
|
||||
FROM recipes r
|
||||
LEFT JOIN recipe_settings rs ON rs.recipe_id = r.id
|
||||
LEFT JOIN users u ON u.id = r.user_id
|
||||
LEFT JOIN households h ON h.id = u.household_id
|
||||
LEFT JOIN household_preferences hp ON hp.household_id = h.id
|
||||
WHERE r.slug IN :recipe_slugs AND r.group_id = :group_id;
|
||||
"""
|
||||
)
|
||||
|
||||
result = self.repos.session.execute(
|
||||
sa.text(sql).bindparams(sa.bindparam("recipe_slugs", expanding=True)),
|
||||
params={
|
||||
"user_id": self.repos.uuid_to_str(self.user.id),
|
||||
"household_id": self.repos.uuid_to_str(self.household.id),
|
||||
"group_id": self.repos.uuid_to_str(self.user.group_id),
|
||||
"recipe_slugs": recipe_slugs,
|
||||
},
|
||||
).scalar()
|
||||
|
||||
return bool(result)
|
||||
|
||||
def can_lock_unlock(self, recipe: Recipe) -> bool:
|
||||
return recipe.user_id == self.user.id
|
||||
@@ -423,7 +451,7 @@ class RecipeService(RecipeServiceBase):
|
||||
if recipe is None or recipe.settings is None:
|
||||
raise exceptions.NoEntryFound("Recipe not found.")
|
||||
|
||||
if not self.can_update(recipe):
|
||||
if not self.can_update([recipe.slug]):
|
||||
raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
|
||||
|
||||
setting_lock = new_data.settings is not None and recipe.settings.locked != new_data.settings.locked
|
||||
@@ -444,7 +472,7 @@ class RecipeService(RecipeServiceBase):
|
||||
|
||||
def update_recipe_image(self, slug: str, image: bytes, extension: str):
|
||||
recipe = self.get_one(slug)
|
||||
if not self.can_update(recipe):
|
||||
if not self.can_update([recipe.slug]):
|
||||
raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
|
||||
|
||||
data_service = RecipeDataService(recipe.id)
|
||||
@@ -454,7 +482,7 @@ class RecipeService(RecipeServiceBase):
|
||||
|
||||
def delete_recipe_image(self, slug: str) -> None:
|
||||
recipe = self.get_one(slug)
|
||||
if not self.can_update(recipe):
|
||||
if not self.can_update([recipe.slug]):
|
||||
raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
|
||||
|
||||
data_service = RecipeDataService(recipe.id)
|
||||
@@ -482,12 +510,24 @@ class RecipeService(RecipeServiceBase):
|
||||
|
||||
def delete_one(self, slug_or_id: str | UUID) -> Recipe:
|
||||
recipe = self.get_one(slug_or_id)
|
||||
resp = self.delete_many([recipe.slug])
|
||||
return resp[0]
|
||||
|
||||
if not self.can_delete(recipe):
|
||||
raise exceptions.PermissionDenied("You do not have permission to delete this recipe.")
|
||||
def delete_many(self, recipe_slugs: list[str]) -> list[Recipe]:
|
||||
if not self.can_delete(recipe_slugs):
|
||||
if len(recipe_slugs) == 1:
|
||||
msg = "You do not have permission to delete this recipe."
|
||||
else:
|
||||
msg = "You do not have permission to delete all of these recipes."
|
||||
raise exceptions.PermissionDenied(msg)
|
||||
|
||||
data = self.group_recipes.delete_many(recipe_slugs)
|
||||
for r in data:
|
||||
try:
|
||||
self.delete_assets(r)
|
||||
except Exception:
|
||||
self.logger.exception(f"Failed to delete recipe assets for {r.slug}")
|
||||
|
||||
data = self.group_recipes.delete(recipe.id, "id")
|
||||
self.delete_assets(data)
|
||||
return data
|
||||
|
||||
# =================================================================
|
||||
|
||||
@@ -160,6 +160,36 @@ def test_other_user_cant_delete_recipe(api_client: TestClient, user_tuple: list[
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_other_user_bulk_delete(api_client: TestClient, user_tuple: list[TestUser]):
|
||||
slug_locked = random_string(10)
|
||||
slug_unlocked = random_string(10)
|
||||
unique_user, other_user = user_tuple
|
||||
|
||||
unique_user.repos.recipes.create(
|
||||
Recipe(
|
||||
user_id=unique_user.user_id,
|
||||
group_id=unique_user.group_id,
|
||||
name=slug_locked,
|
||||
settings=RecipeSettings(locked=True),
|
||||
)
|
||||
)
|
||||
unique_user.repos.recipes.create(
|
||||
Recipe(
|
||||
user_id=unique_user.user_id,
|
||||
group_id=unique_user.group_id,
|
||||
name=slug_unlocked,
|
||||
settings=RecipeSettings(locked=False),
|
||||
)
|
||||
)
|
||||
|
||||
response = api_client.post(
|
||||
api_routes.recipes_bulk_actions_delete,
|
||||
json={"recipes": [slug_locked, slug_unlocked]},
|
||||
headers=other_user.token,
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_admin_can_delete_locked_recipe_owned_by_another_user(
|
||||
api_client: TestClient, unfiltered_database: AllRepositories, unique_user: TestUser, admin_user: TestUser
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user