Compare commits

...

14 Commits

Author SHA1 Message Date
Andrey Antukh
a124812f21 WIP 2025-08-13 11:47:11 +02:00
Andrey Antukh
f75b7ea284 WIP 2025-08-13 11:38:12 +02:00
Andrey Antukh
3f99b1b626 WIP 2025-08-13 11:33:50 +02:00
Andrey Antukh
b2abd308ca WIP 2025-08-13 11:32:30 +02:00
Andrey Antukh
a4833f95b5 WIP 2025-08-13 11:26:41 +02:00
Andrey Antukh
7e9c8e8f01 WIP 2025-08-13 11:24:06 +02:00
Andrey Antukh
38066c73ee WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
2f0045e835 WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
3a0870690b WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
872b8fec85 WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
cb0d409ebf WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
a774387011 WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
07af88f33b WIP 2025-08-13 09:49:06 +02:00
Andrey Antukh
cc02a4732e 🚧 Refactor file storage
Make it more scallable and make it easily extensible
2025-08-13 09:49:06 +02:00
33 changed files with 1560 additions and 1166 deletions

View File

@@ -141,13 +141,11 @@
([index coll attr]
(reduce #(index-object %1 %2 attr) index coll)))
(defn decode-row
[{:keys [data changes features] :as row}]
(defn- decode-row-features
[{:keys [features] :as row}]
(when row
(cond-> row
features (assoc :features (db/decode-pgarray features #{}))
changes (assoc :changes (blob/decode changes))
data (assoc :data (blob/decode data)))))
(db/pgarray? features) (assoc :features (db/decode-pgarray features #{})))))
(def sql:get-minimal-file
"SELECT f.id,
@@ -161,6 +159,7 @@
[cfg id & {:as opts}]
(db/get-with-sql cfg [sql:get-minimal-file id] opts))
;; DEPRECATED
(defn decode-file
"A general purpose file decoding function that resolves all external
pointers, run migrations and return plain vanilla file map"
@@ -168,7 +167,8 @@
(binding [pmap/*load-fn* (partial fdata/load-pointer cfg id)]
(let [file (->> file
(fmigr/resolve-applied-migrations cfg)
(fdata/resolve-file-data cfg))
(fdata/resolve-file-data cfg)
(fdata/decode-file-data cfg))
libs (delay (get-resolved-file-libraries cfg file))]
(-> file
@@ -179,6 +179,119 @@
(update :data assoc :id id)
(cond-> migrate? (fmg/migrate-file libs))))))
(def sql:get-file
"SELECT f.id,
f.project_id,
f.created_at,
f.modified_at,
f.deleted_at,
f.name,
f.is_shared,
f.has_media_trimmed,
f.revn,
f.data AS legacy_data,
f.ignore_sync_until,
f.comment_thread_seqn,
f.features,
f.version,
f.vern,
p.team_id,
coalesce(fd.backend, 'db') AS backend,
fd.metadata AS metadata,
fd.data AS data
FROM file AS f
LEFT JOIN file_data AS fd ON (fd.file_id = f.id AND fd.id = f.id)
INNER JOIN project AS p ON (p.id = f.project_id)
WHERE f.id = ?")
(defn- migrate-file
[{:keys [::db/conn] :as cfg} {:keys [read-only?]} {:keys [id] :as file}]
(binding [pmap/*load-fn* (partial fdata/load-pointer cfg id)
pmap/*tracked* (pmap/create-tracked)]
(let [libs (delay (get-resolved-file-libraries cfg file))
;; For avoid unnecesary overhead of creating multiple
;; pointers and handly internally with objects map in their
;; worst case (when probably all shapes and all pointers
;; will be readed in any case), we just realize/resolve them
;; before applying the migration to the file.
file (-> (fdata/realize cfg file)
(fmg/migrate-file libs))]
(if (or read-only? (db/read-only? conn))
file
(do ;; When file is migrated, we break the rule of no
;; perform mutations on get operations and update the
;; file with all migrations applied
(update-file! cfg file)
(fmigr/resolve-applied-migrations cfg file))))))
;; FIXME: filter by project-id
(defn- get-file*
[{:keys [::db/conn] :as cfg} id
{:keys [#_project-id
migrate?
realize?
decode?
skip-locked?
include-deleted?
throw-if-not-exists?
lock-for-update?]
:or {lock-for-update? false
migrate? true
decode? true
include-deleted? false
throw-if-not-exists? true
realize? false}
:as options}]
(assert (db/connection? conn) "expected cfg with valid connection")
(let [sql
(if lock-for-update?
(str sql:get-file " FOR UPDATE of f")
sql:get-file)
sql
(if skip-locked?
(str sql " SKIP LOCKED")
sql)
file
(db/get-with-sql conn [sql id]
{::db/throw-if-not-exists false
::db/remove-deleted (not include-deleted?)})
file
(-> file
(d/update-when :features db/decode-pgarray #{})
(d/update-when :metadata fdata/decode-metadata))]
(if file
(let [file
(->> file
(fmigr/resolve-applied-migrations cfg)
(fdata/resolve-file-data cfg))
will-migrate?
(and migrate? (fmg/need-migration? file))]
(if decode?
(cond->> (fdata/decode-file-data cfg file)
(and realize? (not will-migrate?))
(fdata/realize cfg)
will-migrate?
(migrate-file cfg options))
file))
(when-not (or skip-locked? (not throw-if-not-exists?))
(ex/raise :type :not-found
:code :object-not-found
:hint "database object not found"
:table :file
:file-id id)))))
(defn get-file
"Get file, resolve all features and apply migrations.
@@ -186,10 +299,7 @@
operations on file, because it removes the ovehead of lazy fetching
and decoding."
[cfg file-id & {:as opts}]
(db/run! cfg (fn [{:keys [::db/conn] :as cfg}]
(some->> (db/get* conn :file {:id file-id}
(assoc opts ::db/remove-deleted false))
(decode-file cfg)))))
(db/run! cfg get-file* file-id opts))
(defn clean-file-features
[file]
@@ -213,12 +323,12 @@
(let [conn (db/get-connection cfg)
ids (db/create-array conn "uuid" ids)]
(->> (db/exec! conn [sql:get-teams ids])
(map decode-row))))
(map decode-row-features))))
(defn get-team
[cfg team-id]
(-> (db/get cfg :team {:id team-id})
(decode-row)))
(decode-row-features)))
(defn get-fonts
[cfg team-id]
@@ -497,21 +607,43 @@
(defn- file->params
[file]
(-> (select-keys file file-attrs)
(assoc :data nil)
(dissoc :team-id)
(dissoc :migrations)))
(defn file->file-data-params
[{:keys [id backend] :as file} & {:as opts}]
(let [created-at (or (:created-at file) (ct/now))
modified-at (or (:modified-at file) created-at)
backend (if (and (::overwrite-storage-backend opts) backend)
backend
(cf/get :file-storage-backend))]
(d/without-nils
{:id id
:type "main"
:file-id id
:data (:data file)
:metadata (:metadata file)
:backend backend
:created-at created-at
:modified-at modified-at})))
(defn insert-file!
"Insert a new file into the database table. Expectes a not-encoded file.
Returns nil."
[{:keys [::db/conn] :as cfg} file & {:as opts}]
(when (:migrations file)
(fmigr/upsert-migrations! conn file))
(let [file (encode-file cfg file)]
(db/insert! conn :file
(file->params file)
{::db/return-keys false})
(assoc opts ::db/return-keys false))
(->> (file->file-data-params file)
(fdata/update! cfg))
nil))
(defn update-file!
@@ -525,21 +657,25 @@
(let [file
(encode-file cfg file)
params
(file->params (dissoc file :id))]
file-params
(file->params (dissoc file :id))
(db/update! conn :file params
file-data-params
(file->file-data-params file)]
(db/update! conn :file file-params
{:id id}
{::db/return-keys false})
(fdata/update! cfg file-data-params)
nil))
(defn save-file!
"Applies all the final validations and perist the file, binfile
specific, should not be used outside of binfile domain.
Returns nil"
[{:keys [::timestamp] :as cfg} file & {:as opts}]
(assert (ct/inst? timestamp) "expected valid timestamp")
(let [file (-> file
@@ -603,7 +739,7 @@
;; FIXME: :is-indirect set to false to all rows looks
;; completly useless
(map #(assoc % :is-indirect false))
(map decode-row))
(map decode-row-features))
(db/exec! conn [sql:get-file-libraries file-id])))
;; FIXME: this will use a lot of memory if file uses too many big

View File

@@ -346,7 +346,7 @@
thumbnails (->> (bfc/get-file-object-thumbnails cfg file-id)
(mapv #(dissoc % :file-id)))
file (cond-> (bfc/get-file cfg file-id)
file (cond-> (bfc/get-file cfg file-id :realize? true)
detach?
(-> (ctf/detach-external-references file-id)
(dissoc :libraries))

View File

@@ -153,7 +153,7 @@
(defn- write-file!
[cfg file-id]
(let [file (bfc/get-file cfg file-id)
(let [file (bfc/get-file cfg file-id :realize? true)
thumbs (bfc/get-file-object-thumbnails cfg file-id)
media (bfc/get-file-media cfg file)
rels (bfc/get-files-rels cfg #{file-id})]

View File

@@ -222,9 +222,11 @@
(throw (IllegalArgumentException.
"the `include-libraries` and `embed-assets` are mutally excluding options")))
(let [detach? (and (not embed-assets) (not include-libraries))]
(let [detach? (and (not embed-assets) (not include-libraries))]
(db/tx-run! cfg (fn [cfg]
(cond-> (bfc/get-file cfg file-id {::sql/for-update true})
(cond-> (bfc/get-file cfg file-id
{:realize? true
:lock-for-update? true})
detach?
(-> (ctf/detach-external-references file-id)
(dissoc :libraries))

View File

@@ -52,6 +52,8 @@
:redis-uri "redis://redis/0"
:file-storage-backend "db"
:objects-storage-backend "fs"
:objects-storage-fs-directory "assets"
@@ -105,7 +107,8 @@
[:auto-file-snapshot-timeout {:optional true} ::ct/duration]
[:media-max-file-size {:optional true} ::sm/int]
[:deletion-delay {:optional true} ::ct/duration] ;; REVIEW
[:deletion-delay {:optional true} ::ct/duration]
[:file-clean-delay {:optional true} ::ct/duration]
[:telemetry-enabled {:optional true} ::sm/boolean]
[:default-blob-version {:optional true} ::sm/int]
[:allow-demo-users {:optional true} ::sm/boolean]
@@ -210,6 +213,8 @@
[:prepl-host {:optional true} :string]
[:prepl-port {:optional true} ::sm/int]
[:file-storage-backend :string]
[:media-directory {:optional true} :string] ;; REVIEW
[:media-uri {:optional true} :string]
[:assets-path {:optional true} :string]
@@ -300,6 +305,11 @@
(or (c/get config :deletion-delay)
(ct/duration {:days 7})))
(defn get-file-clean-delay
[]
(or (c/get config :file-clean-delay)
(ct/duration {:days 2})))
(defn get
"A configuration getter. Helps code be more testable."
([key]

View File

@@ -12,7 +12,10 @@
[app.common.files.helpers :as cfh]
[app.common.files.migrations :as fmg]
[app.common.logging :as l]
[app.common.schema :as sm]
[app.common.time :as ct]
[app.common.types.path :as path]
[app.config :as cf]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.storage :as sto]
@@ -22,14 +25,6 @@
[app.worker :as wrk]
[promesa.exec :as px]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; OFFLOAD
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn offloaded?
[file]
(= "objects-storage" (:data-backend file)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; OBJECTS-MAP
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@@ -65,36 +60,25 @@
objects)))))
fdata))
(defn realize-objects
"Process a file and remove all instances of objects mao realizing them
to a plain data. Used in operation where is more efficient have the
whole file loaded in memory or we going to persist it in an
alterantive storage."
[_cfg file]
(update file :data process-objects (partial into {})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; POINTER-MAP
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn get-file-data
"Get file data given a file instance."
[system file]
(if (offloaded? file)
(let [storage (sto/resolve system ::db/reuse-conn true)]
(->> (sto/get-object storage (:data-ref-id file))
(sto/get-object-bytes storage)))
(:data file)))
(defn resolve-file-data
[system file]
(let [data (get-file-data system file)]
(assoc file :data data)))
(defn decode-file-data
[{:keys [::wrk/executor]} {:keys [data] :as file}]
(cond-> file
(bytes? data)
(assoc :data (px/invoke! executor #(blob/decode data)))))
(defn load-pointer
"A database loader pointer helper"
[system file-id id]
(let [fragment (db/get* system :file-data-fragment
{:id id :file-id file-id}
{::sql/columns [:data :data-backend :data-ref-id :id]})]
[cfg file-id id]
(let [fragment (db/get* cfg :file-data
{:id id :file-id file-id :type "fragment"}
{::sql/columns [:content :backend :id]})]
(l/trc :hint "load pointer"
:file-id (str file-id)
@@ -108,22 +92,22 @@
:file-id file-id
:fragment-id id))
(let [data (get-file-data system fragment)]
;; FIXME: conditional thread scheduling for decoding big objects
(blob/decode data))))
;; FIXME: conditional thread scheduling for decoding big objects
(blob/decode (:data fragment))))
(defn persist-pointers!
"Persist all currently tracked pointer objects"
[system file-id]
(let [conn (db/get-connection system)]
[cfg file-id]
(let [conn (db/get-connection cfg)]
(doseq [[id item] @pmap/*tracked*]
(when (pmap/modified? item)
(l/trc :hint "persist pointer" :file-id (str file-id) :id (str id))
(let [content (-> item deref blob/encode)]
(db/insert! conn :file-data-fragment
(db/insert! conn :file-data
{:id id
:file-id file-id
:data content}))))))
:type "fragment"
:content content}))))))
(defn process-pointers
"Apply a function to all pointers on the file. Usuly used for
@@ -137,6 +121,14 @@
(d/update-vals update-fn')
(update :pages-index d/update-vals update-fn'))))
(defn realize-pointers
"Process a file and remove all instances of pointers realizing them to
a plain data. Used in operation where is more efficient have the
whole file loaded in memory."
[cfg {:keys [id] :as file}]
(binding [pmap/*load-fn* (partial load-pointer cfg id)]
(update file :data process-pointers deref)))
(defn get-used-pointer-ids
"Given a file, return all pointer ids used in the data."
[fdata]
@@ -200,3 +192,314 @@
(update :features disj "fdata/path-data")
(update :migrations disj "0003-convert-path-content")
(vary-meta update ::fmg/migrated disj "0003-convert-path-content"))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; GENERAL PURPOSE HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn realize
"A helper that combines realize-pointers and realize-objects"
[cfg file]
(->> file
(realize-pointers cfg)
(realize-objects cfg)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; STORAGE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmulti resolve-file-data
(fn [_cfg file] (or (get file :backend) "db")))
(defmethod resolve-file-data "db"
[_cfg {:keys [legacy-data data] :as file}]
(if (and (some? legacy-data) (not data))
(-> file
(assoc :data legacy-data)
(dissoc :legacy-data))
(dissoc file :legacy-data)))
(defmethod resolve-file-data "storage"
[cfg object]
(let [storage (sto/resolve cfg ::db/reuse-conn true)
ref-id (-> object :metadata :storage-ref-id)
data (->> (sto/get-object storage ref-id)
(sto/get-object-bytes storage))]
(-> object
(assoc :data data)
(dissoc :legacy-data))))
(defn decode-file-data
[{:keys [::wrk/executor]} {:keys [data] :as file}]
(cond-> file
(bytes? data)
(assoc :data (px/invoke! executor #(blob/decode data)))))
(def ^:private sql:insert-file-data
"INSERT INTO file_data (file_id, id, created_at, modified_at,
type, backend, metadata, data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
(def ^:private sql:upsert-file-data
(str sql:insert-file-data
" ON CONFLICT (file_id, id)
DO UPDATE SET modified_at=?,
backend=?,
metadata=?,
data=?;"))
(defn- create-in-database
[cfg {:keys [id file-id created-at modified-at type backend data metadata]}]
(let [metadata (some-> metadata db/json)
created-at (or created-at (ct/now))
modified-at (or modified-at created-at)]
(db/exec-one! cfg [sql:insert-file-data
file-id id
created-at
modified-at
type
backend
metadata
data])))
(defn- upsert-in-database
[cfg {:keys [id file-id created-at modified-at type backend data metadata]}]
(let [metadata (some-> metadata db/json)
created-at (or created-at (ct/now))
modified-at (or modified-at created-at)]
(db/exec-one! cfg [sql:upsert-file-data
file-id id
created-at
modified-at
type
backend
metadata
data
modified-at
backend
metadata
data])))
(defmulti ^:private handle-persistence
(fn [_cfg params] (:backend params)))
(defmethod handle-persistence "db"
[_ params]
(dissoc params :metadata))
(defmethod handle-persistence "storage"
[{:keys [::sto/storage] :as cfg}
{:keys [id file-id data] :as params}]
(let [content (sto/content data)
sobject (sto/put-object! storage
{::sto/content content
::sto/touch true
:bucket "file-data"
:content-type "application/octet-stream"
:file-id file-id
:id id})
metadata {:storage-ref-id (:id sobject)}]
(-> params
(assoc :metadata metadata)
(assoc :data nil))))
(defn- process-metadata
[cfg metadata]
(when-let [storage-id (:storage-ref-id metadata)]
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
(sto/touch-object! storage storage-id))))
(defn- default-backend
[backend]
(or backend (cf/get :file-storage-backend "db")))
(def ^:private schema:metadata
[:map {:title "Metadata"}
[:storage-ref-id {:optional true} ::sm/uuid]])
(def decode-metadata-with-schema
(sm/decoder schema:metadata sm/json-transformer))
(defn decode-metadata
[metadata]
(some-> metadata
(db/decode-json-pgobject)
(decode-metadata-with-schema)))
(def ^:private schema:update-params
[:map {:closed true}
[:id ::sm/uuid]
[:type [:enum "main" "snapshot"]]
[:file-id ::sm/uuid]
[:backend {:optional true} [:enum "db" "storage"]]
[:metadata {:optional true} [:maybe schema:metadata]]
[:data {:optional true} bytes?]
[:created-at {:optional true} ::ct/inst]
[:modified-at {:optional true} ::ct/inst]])
(def ^:private check-update-params
(sm/check-fn schema:update-params :hint "invalid params received for update"))
(defn update!
[cfg params & {:keys [throw-if-not-exists?]}]
(let [params (-> (check-update-params params)
(update :backend default-backend))]
(some->> (:metadata params) (process-metadata cfg))
(let [result (handle-persistence cfg params)
result (if throw-if-not-exists?
(create-in-database cfg result)
(upsert-in-database cfg result))]
(-> result db/get-update-count pos?))))
(defn create!
[cfg params]
(update! cfg params :throw-on-conflict? true))
(def ^:private schema:delete-params
[:map {:closed true}
[:id ::sm/uuid]
[:type [:enum "main" "snapshot"]]
[:file-id ::sm/uuid]])
(def check-delete-params
(sm/check-fn schema:delete-params :hint "invalid params received for delete"))
(defn delete!
[cfg params]
(when-let [fdata (db/get* cfg :file-data
(check-delete-params params))]
(some->> (get fdata :metadata)
(decode-metadata)
(process-metadata cfg))
(-> (db/delete! cfg :file-data params)
(db/get-update-count)
(pos?))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SCRIPTS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private sql:get-unmigrated-files
"SELECT f.id, f.data, f.created_at, f.modified_at
FROM file AS f
WHERE f.data IS NOT NULL
ORDER BY f.modified_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn migrate-files-to-storage
"Migrate the current existing files to store data in new storage
tables."
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(reduce (fn [total {:keys [id data index created-at modified-at]}]
(l/dbg :hint "migrating file" :file-id (str id))
(db/update! conn :file {:data nil} {:id id} ::db/return-keys false)
(db/insert! conn :file-data
{:backend "db"
:metadata nil
:type "main"
:data data
:created-at created-at
:modified-at modified-at
:file-id id
:id id}
{::db/return-keys false})
(inc total))
0
(db/plan conn [sql:get-unmigrated-files chunk-size]
{:fetch-size 1})))))
(def ^:private sql:get-migrated-files
"SELECT f.id, f.data
FROM file_data AS f
WHERE f.data IS NOT NULL
AND f.id = f.file_id
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn rollback-files-from-storage
"Migrate back to the file table storage."
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(reduce (fn [total {:keys [id data]}]
(l/dbg :hint "rollback file" :file-id (str id))
(db/update! conn :file {:data data} {:id id} ::db/return-keys false)
(db/delete! conn :file-data {:id id} ::db/return-keys false)
(inc total))
0
(db/plan conn [sql:get-migrated-files chunk-size]
{:fetch-size 1})))))
(def ^:private sql:get-unmigrated-snapshots
"SELECT fc.id, fc.data, fc.file_id, fc.created_at, fc.updated_at AS modified_at
FROM file_change AS fc
WHERE fc.data IS NOT NULL
AND f.label IS NOT NULL
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn migrate-snapshots-to-storage
"Migrate the current existing files to store data in new storage
tables."
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(reduce (fn [total {:keys [id file-id data created-at modified-at]}]
(l/dbg :hint "migrating snapshot" :file-id (str file-id) :id (str id))
(db/update! conn :file-change {:data nil} {:id id :file-id file-id} ::db/return-keys false)
(db/insert! conn :file-data
{:backend "db"
:metadata nil
:type "snapshot"
:data data
:created-at created-at
:modified-at modified-at
:file-id file-id
:id id}
{::db/return-keys false})
(inc total))
0
(db/plan conn [sql:get-unmigrated-snapshots chunk-size]
{:fetch-size 1})))))
(def ^:private sql:get-migrated-snapshots
"SELECT f.id, f.data, f.file_id
FROM file_data AS f
WHERE f.data IS NOT NULL
AND f.type = 'snapshot'
AND f.id != f.file_id
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn rollback-snapshots-from-storage
"Migrate back to the file table storage."
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(reduce (fn [total {:keys [id file-id data]}]
(l/dbg :hint "rollback snapshot" :file-id (str id) :id (str id))
(db/update! conn :file-change {:data data} {:id id :file-id file-id} ::db/return-keys false)
(db/delete! conn :file-data {:id id :file-id file-id} ::db/return-keys false)
(inc total))
0
(db/plan conn [sql:get-migrated-snapshots chunk-size]
{:fetch-size 1})))))

View File

@@ -0,0 +1,373 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.features.file-snapshots
(:require
[app.binfile.common :as bfc]
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.features :as-alias cfeat]
[app.common.files.migrations :as fmg]
[app.common.logging :as l]
[app.common.schema :as sm]
[app.common.time :as ct]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as fdata]
[app.storage :as sto]
[app.util.blob :as blob]
[app.worker :as wrk]
[cuerdas.core :as str]
[promesa.exec :as px]))
(def sql:snapshots
"SELECT c.id,
c.label,
c.created_at,
c.updated_at AS modified_at,
c.deleted_at,
c.profile_id,
c.created_by,
c.locked_by,
c.revn,
c.features,
c.migrations,
c.version,
c.file_id,
c.data AS legacy_data,
fd.data AS data,
coalesce(fd.backend, 'db') AS backend,
fd.metadata AS metadata
FROM file_change AS c
LEFT JOIN file_data AS fd ON (fd.file_id = c.file_id
AND fd.id = c.id
AND fd.type = 'snapshot')
WHERE c.label IS NOT NULL")
(def ^:private sql:get-snapshot
(str sql:snapshots " AND c.file_id = ? AND c.id = ?"))
(def ^:private sql:get-snapshots
(str sql:snapshots " AND c.file_id = ?"))
(def ^:private sql:get-snapshot-without-data
(str "WITH snapshots AS (" sql:snapshots ")"
"SELECT c.id,
c.label,
c.revn,
c.created_at,
c.modified_at,
c.deleted_at,
c.profile_id,
c.created_by,
c.features,
c.metadata,
c.migrations,
c.version,
c.file_id
FROM snapshots AS c
WHERE c.id = ?"))
(defn- decode-snapshot
[snapshot]
(some-> snapshot (-> (d/update-when :metadata fdata/decode-metadata)
(d/update-when :migrations db/decode-pgarray [])
(d/update-when :features db/decode-pgarray #{}))))
(def sql:get-minimal-file
"SELECT f.id,
f.revn,
f.modified_at,
f.deleted_at,
fd.backend AS backend,
fd.metadata AS metadata
FROM file AS f
LEFT JOIN file_data AS fd ON (fd.file_id = f.id AND fd.id = f.id)
WHERE f.id = ?")
(defn get-minimal-file
[cfg id & {:as opts}]
(-> (db/get-with-sql cfg [sql:get-minimal-file id] opts)
(d/update-when :metadata fdata/decode-metadata)))
(defn get-minimal-snapshot
[cfg snapshot-id]
(-> (db/get-with-sql cfg [sql:get-snapshot-without-data snapshot-id])
(decode-snapshot)))
(defn get-snapshot
"Get snapshot with decoded data"
[cfg file-id snapshot-id]
(->> (db/get-with-sql cfg [sql:get-snapshot file-id snapshot-id])
(decode-snapshot)
(fdata/resolve-file-data cfg)
(fdata/decode-file-data cfg)))
(def ^:private sql:get-visible-snapshots
(str "WITH "
"snapshots1 AS ( " sql:snapshots "),"
"snapshots2 AS (
SELECT c.id,
c.label,
c.version,
c.created_at,
c.modified_at,
c.created_by,
c.locked_by,
c.profile_id
FROM snapshots1 AS c
WHERE c.file_id = ?
AND (c.deleted_at IS NULL OR deleted_at > now())
), snapshots3 AS (
(SELECT * FROM snapshots2 WHERE created_by = 'system' LIMIT 1000)
UNION ALL
(SELECT * FROM snapshots2 WHERE created_by != 'system' LIMIT 1000)
)
SELECT * FROM snapshots3
ORDER BY created_at DESC;"))
(defn get-visible-snapshots
"Return a list of snapshots fecheable from the API, it has a limited
set of fields and applies big but safe limits over all available
snapshots. It return a ordered vector by the snapshot date of
creation."
[cfg file-id]
(->> (db/exec! cfg [sql:get-visible-snapshots file-id])
(mapv decode-snapshot)))
(def ^:private schema:decoded-file
[:map {:title "DecodedFile"}
[:id ::sm/uuid]
[:revn :int]
[:vern :int]
[:data :map]
[:version :int]
[:features ::cfeat/features]
[:migrations [::sm/set :string]]])
(def ^:private schema:snapshot
[:map {:title "Snapshot"}
[:id ::sm/uuid]
[:revn [::sm/int {:min 0}]]
[:version [::sm/int {:min 0}]]
[:features ::cfeat/features]
[:migrations [::sm/set ::sm/text]]
[:profile-id {:optional true} ::sm/uuid]
[:label ::sm/text]
[:file-id ::sm/uuid]
[:created-by [:enum "system" "user" "admin"]]
[:deleted-at {:optional true} ::ct/inst]
[:modified-at ::ct/inst]
[:created-at ::ct/inst]])
(def ^:private schema:snapshot-params
[:map {:title "SnapshotParams"}
[:id ::sm/uuid]
[:file-id ::sm/uuid]
[:label ::sm/text]
[:modified-at {:optional true} ::ct/inst]])
(def ^:private check-snapshot
(sm/check-fn schema:snapshot))
(def ^:private check-snapshot-params
(sm/check-fn schema:snapshot-params))
(def ^:private check-decoded-file
(sm/check-fn schema:decoded-file))
(defn- generate-snapshot-label
[]
(let [ts (-> (ct/now)
(ct/format-inst)
(str/replace #"[T:\.]" "-")
(str/rtrim "Z"))]
(str "snapshot-" ts)))
(defn create!
"Create a file snapshot; expects a non-encoded file."
[cfg file & {:keys [label created-by deleted-at profile-id session-id]
:or {deleted-at :default
created-by "system"}}]
(let [file (check-decoded-file file)
snapshot-id (uuid/next)
created-at (ct/now)
deleted-at (cond
(= deleted-at :default)
(ct/plus (ct/now) (cf/get-deletion-delay))
(ct/inst? deleted-at)
deleted-at
:else
nil)
label (or label (generate-snapshot-label))
data (px/invoke! (::wrk/executor cfg) #(blob/encode (:data file)))
features (:features file)
migrations (:migrations file)
snapshot {:id snapshot-id
:revn (:revn file)
:version (:version file)
:file-id (:id file)
:features features
:migrations migrations
:label label
:created-at created-at
:modified-at created-at
:created-by created-by}
snapshot (cond-> snapshot
deleted-at
(assoc :deleted-at deleted-at)
:always
(check-snapshot))]
(db/insert! cfg :file-change
(-> snapshot
(update :features into-array)
(update :migrations into-array)
(assoc :updated-at created-at)
(assoc :profile-id profile-id)
(assoc :session-id session-id)
(dissoc :modified-at))
{::db/return-keys false})
(fdata/create! cfg
{:id snapshot-id
:file-id (:id file)
:type "snapshot"
:data data
:created-at created-at
:modified-at created-at})
snapshot))
(defn update!
[cfg params]
(let [{:keys [id file-id label modified-at]}
(check-snapshot-params params)
modified-at
(or modified-at (ct/now))]
(-> (db/update! cfg :file-change
{:label label
:created-by "user"
:updated-at modified-at
:deleted-at nil}
{:file-id file-id
:id id}
{::db/return-keys false})
(db/get-update-count)
(pos?))))
(defn restore!
[{:keys [::db/conn] :as cfg} file-id snapshot-id]
(let [file (get-minimal-file conn file-id {::db/for-update true})
vern (rand-int Integer/MAX_VALUE)
storage
(sto/resolve cfg {::db/reuse-conn true})
snapshot
(get-snapshot cfg file-id snapshot-id)]
(when-not snapshot
(ex/raise :type :not-found
:code :snapshot-not-found
:hint "unable to find snapshot with the provided label"
:snapshot-id snapshot-id
:file-id file-id))
(when-not (:data snapshot)
(ex/raise :type :internal
:code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
(let [;; If the snapshot has applied migrations stored, we reuse
;; them, if not, we take a safest set of migrations as
;; starting point. This is because, at the time of
;; implementing snapshots, migrations were not taken into
;; account so we need to make this backward compatible in
;; some way.
migrations
(or (:migrations snapshot)
(fmg/generate-migrations-from-version 67))
file
(-> file
(update :revn inc)
(assoc :migrations migrations)
(assoc :data (:data snapshot))
(assoc :vern vern)
(assoc :version (:version snapshot))
(assoc :has-media-trimmed false)
(assoc :modified-at (:modified-at snapshot))
(assoc :features (:features snapshot)))]
(l/dbg :hint "restoring snapshot"
:file-id (str file-id)
:label (:label snapshot)
:snapshot-id (str (:id snapshot)))
;; In the same way, on reseting the file data, we need to restore
;; the applied migrations on the moment of taking the snapshot
(bfc/update-file! cfg file ::bfc/reset-migrations true)
;; FIXME: this should be separated functions, we should not have
;; inline sql here.
;; clean object thumbnails
(let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; clean file thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
vern)))
(defn delete!
[cfg {:keys [id file-id]}]
(let [deleted-at (ct/now)]
(db/update! cfg :file-change
{:deleted-at deleted-at}
{:id id :file-id file-id}
{::db/return-keys false})
true))
(defn reduce-snapshots
"Process the file snapshots using efficient reduction."
[cfg file-id xform f init]
(let [conn (db/get-connection cfg)
xform (comp
(map (partial fdata/resolve-file-data cfg))
(map (partial fdata/decode-file-data cfg))
xform)]
(->> (db/plan conn [sql:get-snapshots file-id] {:fetch-size 1})
(transduce xform f init))))

View File

@@ -444,7 +444,10 @@
:fn (mg/resource "app/migrations/sql/0140-mod-file-change-table.sql")}
{:name "0140-add-locked-by-column-to-file-change-table"
:fn (mg/resource "app/migrations/sql/0140-add-locked-by-column-to-file-change-table.sql")}])
:fn (mg/resource "app/migrations/sql/0140-add-locked-by-column-to-file-change-table.sql")}
{:name "0141-add-file-data-table.sql"
:fn (mg/resource "app/migrations/sql/0141-add-file-data-table.sql")}])
(defn apply-migrations!
[pool name migrations]

View File

@@ -0,0 +1,33 @@
CREATE TABLE file_data (
file_id uuid NOT NULL REFERENCES file(id) DEFERRABLE,
id uuid NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
modified_at timestamptz NOT NULL DEFAULT now(),
type text NULL,
backend text NULL,
metadata jsonb NULL,
data bytea NULL,
PRIMARY KEY (file_id, id)
) PARTITION BY HASH (file_id, id);
CREATE TABLE file_data_00 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 0);
CREATE TABLE file_data_01 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 1);
CREATE TABLE file_data_02 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 2);
CREATE TABLE file_data_03 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 3);
CREATE TABLE file_data_04 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 4);
CREATE TABLE file_data_05 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 5);
CREATE TABLE file_data_06 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 6);
CREATE TABLE file_data_07 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 7);
CREATE TABLE file_data_08 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 8);
CREATE TABLE file_data_09 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 9);
CREATE TABLE file_data_10 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 10);
CREATE TABLE file_data_11 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 11);
CREATE TABLE file_data_12 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 12);
CREATE TABLE file_data_13 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 13);
CREATE TABLE file_data_14 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 14);
CREATE TABLE file_data_15 PARTITION OF file_data FOR VALUES WITH (MODULUS 16, REMAINDER 15);

View File

@@ -239,7 +239,6 @@
'app.rpc.commands.files
'app.rpc.commands.files-create
'app.rpc.commands.files-share
'app.rpc.commands.files-temp
'app.rpc.commands.files-update
'app.rpc.commands.files-snapshot
'app.rpc.commands.files-thumbnails

View File

@@ -184,8 +184,8 @@
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)]
(let [file (->> file
(files/decode-row)
(feat.fdata/resolve-file-data cfg))
(feat.fdata/resolve-file-data cfg)
(feat.fdata/decode-file-data cfg))
data (get file :data)]
(-> file
(assoc :page-name (dm/get-in data [:pages-index page-id :name]))

View File

@@ -24,7 +24,6 @@
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as feat.fdata]
[app.features.file-migrations :as feat.fmigr]
[app.features.logical-deletion :as ldel]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
@@ -39,8 +38,7 @@
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.worker :as wrk]
[cuerdas.core :as str]
[promesa.exec :as px]))
[cuerdas.core :as str]))
;; --- FEATURES
@@ -55,12 +53,10 @@
(ct/duration {:days 7}))
(defn decode-row
[{:keys [data changes features] :as row}]
[{:keys [features] :as row}]
(when row
(cond-> row
features (assoc :features (db/decode-pgarray features #{}))
changes (assoc :changes (blob/decode changes))
data (assoc :data (blob/decode data)))))
(db/pgarray? features) (assoc :features (db/decode-pgarray features #{})))))
(defn check-version!
[file]
@@ -209,90 +205,9 @@
[:id ::sm/uuid]
[:project-id {:optional true} ::sm/uuid]])
(defn- migrate-file
[{:keys [::db/conn] :as cfg} {:keys [id] :as file} {:keys [read-only?]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)
pmap/*tracked* (pmap/create-tracked)]
(let [libs (delay (bfc/get-resolved-file-libraries cfg file))
;; For avoid unnecesary overhead of creating multiple pointers and
;; handly internally with objects map in their worst case (when
;; probably all shapes and all pointers will be readed in any
;; case), we just realize/resolve them before applying the
;; migration to the file
file (-> file
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(fmg/migrate-file libs))]
(if (or read-only? (db/read-only? conn))
file
(let [;; When file is migrated, we break the rule of no perform
;; mutations on get operations and update the file with all
;; migrations applied
file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(feat.fdata/enable-pointer-map file)
file)]
(db/update! conn :file
{:data (blob/encode (:data file))
:version (:version file)
:features (db/create-array conn "text" (:features file))}
{:id id}
{::db/return-keys false})
(when (contains? (:features file) "fdata/pointer-map")
(feat.fdata/persist-pointers! cfg id))
(feat.fmigr/upsert-migrations! conn file)
(feat.fmigr/resolve-applied-migrations cfg file))))))
(defn get-file
[{:keys [::db/conn ::wrk/executor] :as cfg} id
& {:keys [project-id
migrate?
include-deleted?
lock-for-update?
preload-pointers?]
:or {include-deleted? false
lock-for-update? false
migrate? true
preload-pointers? false}
:as options}]
(assert (db/connection? conn) "expected cfg with valid connection")
(let [params (merge {:id id}
(when (some? project-id)
{:project-id project-id}))
file (->> (db/get conn :file params
{::db/check-deleted (not include-deleted?)
::db/remove-deleted (not include-deleted?)
::sql/for-update lock-for-update?})
(feat.fmigr/resolve-applied-migrations cfg)
(feat.fdata/resolve-file-data cfg))
;; NOTE: we perform the file decoding in a separate thread
;; because it has heavy and synchronous operations for
;; decoding file body that are not very friendly with virtual
;; threads.
file (px/invoke! executor #(decode-row file))
file (if (and migrate? (fmg/need-migration? file))
(migrate-file cfg file options)
file)]
(if preload-pointers?
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(update file :data feat.fdata/process-pointers deref))
file)))
(defn get-minimal-file
[cfg id & {:as opts}]
(let [opts (assoc opts ::sql/columns [:id :modified-at :deleted-at :revn :vern :data-ref-id :data-backend])]
(let [opts (assoc opts ::sql/columns [:id :modified-at :deleted-at :revn :vern])]
(db/get cfg :file {:id id} opts)))
(defn- get-minimal-file-with-perms
@@ -332,9 +247,9 @@
:project-id project-id
:file-id id)
file (-> (get-file cfg id :project-id project-id)
file (-> (bfc/get-file cfg id
:project-id project-id)
(assoc :permissions perms)
(assoc :team-id (:id team))
(check-version!))]
(-> (cfeat/get-team-enabled-features cf/flags team)
@@ -346,8 +261,7 @@
;; pointers on backend and return a complete file.
(if (and (contains? (:features file) "fdata/pointer-map")
(not (contains? (:features params) "fdata/pointer-map")))
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(update file :data feat.fdata/process-pointers deref))
(feat.fdata/realize-pointers cfg file)
file))))
;; --- COMMAND QUERY: get-file-fragment (by id)
@@ -357,7 +271,7 @@
[:id ::sm/uuid]
[:file-id ::sm/uuid]
[:created-at ::ct/inst]
[:content any?]])
[:data any?]])
(def schema:get-file-fragment
[:map {:title "get-file-fragment"}
@@ -367,10 +281,8 @@
(defn- get-file-fragment
[cfg file-id fragment-id]
(let [resolve-file-data (partial feat.fdata/resolve-file-data cfg)]
(some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id})
(resolve-file-data)
(update :data blob/decode))))
(some-> (db/get cfg :file-data {:file-id file-id :id fragment-id :type "fragment"})
(update :data blob/decode)))
(sv/defmethod ::get-file-fragment
"Retrieve a file fragment by its ID. Only authenticated users."
@@ -495,7 +407,7 @@
(let [perms (get-permissions conn profile-id file-id share-id)
file (get-file cfg file-id :read-only? true)
file (bfc/get-file cfg file-id :read-only? true)
proj (db/get conn :project {:id (:project-id file)})
@@ -721,9 +633,9 @@
:project-id project-id
:file-id id)
file (get-file cfg id
:project-id project-id
:read-only? true)]
file (bfc/get-file cfg id
:project-id project-id
:read-only? true)]
(-> (cfeat/get-team-enabled-features cf/flags team)
(cfeat/check-client-features! (:features params))
@@ -810,7 +722,7 @@
;; --- MUTATION COMMAND: set-file-shared
(def sql:get-referenced-files
(def ^:private sql:get-referenced-files
"SELECT f.id
FROM file_library_rel AS flr
INNER JOIN file AS f ON (f.id = flr.file_id)
@@ -821,56 +733,51 @@
(defn- absorb-library-by-file!
[cfg ldata file-id]
(dm/assert!
"expected cfg with valid connection"
(db/connection-map? cfg))
(assert (db/connection-map? cfg)
"expected cfg with valid connection")
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg file-id)
pmap/*tracked* (pmap/create-tracked)]
(let [file (-> (get-file cfg file-id
:include-deleted? true
:lock-for-update? true)
(let [file (-> (bfc/get-file cfg file-id
:include-deleted? true
:lock-for-update? true)
(update :data ctf/absorb-assets ldata))]
(l/trc :hint "library absorbed"
:library-id (str (:id ldata))
:file-id (str file-id))
(db/update! cfg :file
{:revn (inc (:revn file))
:data (blob/encode (:data file))
:modified-at (ct/now)
:has-media-trimmed false}
{:id file-id})
(feat.fdata/persist-pointers! cfg file-id))))
(bfc/update-file! cfg {:id file-id
:migrations (:migrations file)
:revn (inc (:revn file))
:data (:data file)
:modified-at (ct/now)
:has-media-trimmed false}))))
(defn- absorb-library
"Find all files using a shared library, and absorb all library assets
into the file local libraries"
[cfg {:keys [id] :as library}]
[cfg {:keys [id data] :as library}]
(dm/assert!
"expected cfg with valid connection"
(db/connection-map? cfg))
(assert (db/connection-map? cfg)
"expected cfg with valid connection")
(let [ldata (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(-> library :data (feat.fdata/process-pointers deref)))
ids (->> (db/exec! cfg [sql:get-referenced-files id])
(map :id))]
(let [ids (->> (db/exec! cfg [sql:get-referenced-files id])
(sequence bfc/xf-map-id))]
(l/trc :hint "absorbing library"
:library-id (str id)
:files (str/join "," (map str ids)))
(run! (partial absorb-library-by-file! cfg ldata) ids)
(run! (partial absorb-library-by-file! cfg data) ids)
library))
(defn absorb-library!
[{:keys [::db/conn] :as cfg} id]
(let [file (-> (get-file cfg id
:lock-for-update? true
:include-deleted? true)
(let [file (-> (bfc/get-file cfg id
:realize? true
:lock-for-update? true
:include-deleted? true)
(check-version!))
proj (db/get* conn :project {:id (:project-id file)}

View File

@@ -8,6 +8,7 @@
(:require
[app.binfile.common :as bfc]
[app.common.features :as cfeat]
[app.common.files.migrations :as fmg]
[app.common.schema :as sm]
[app.common.time :as ct]
[app.common.types.file :as ctf]
@@ -51,6 +52,7 @@
:revn revn
:is-shared is-shared
:features features
:migrations fmg/available-migrations
:ignore-sync-until ignore-sync-until
:created-at modified-at
:deleted-at deleted-at}
@@ -66,7 +68,7 @@
{:modified-at (ct/now)}
{:id project-id})
file)))
(bfc/get-file cfg (:id file)))))
(def ^:private schema:create-file
[:map {:title "create-file"}

View File

@@ -8,52 +8,17 @@
(:require
[app.binfile.common :as bfc]
[app.common.exceptions :as ex]
[app.common.files.migrations :as fmg]
[app.common.logging :as l]
[app.common.schema :as sm]
[app.common.time :as ct]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as feat.fdata]
[app.features.file-migrations :refer [reset-migrations!]]
[app.features.file-snapshots :as fsnap]
[app.main :as-alias main]
[app.msgbus :as mbus]
[app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
[app.rpc.doc :as-alias doc]
[app.rpc.quotes :as quotes]
[app.storage :as sto]
[app.util.blob :as blob]
[app.util.services :as sv]
[cuerdas.core :as str]))
(defn decode-row
[{:keys [migrations] :as row}]
(when row
(cond-> row
(some? migrations)
(assoc :migrations (db/decode-pgarray migrations)))))
(def sql:get-file-snapshots
"WITH changes AS (
SELECT id, label, revn, created_at, created_by, profile_id, locked_by
FROM file_change
WHERE file_id = ?
AND data IS NOT NULL
AND (deleted_at IS NULL OR deleted_at > now())
), versions AS (
(SELECT * FROM changes WHERE created_by = 'system' LIMIT 1000)
UNION ALL
(SELECT * FROM changes WHERE created_by != 'system' LIMIT 1000)
)
SELECT * FROM versions
ORDER BY created_at DESC;")
(defn get-file-snapshots
[conn file-id]
(db/exec! conn [sql:get-file-snapshots file-id]))
[app.util.services :as sv]))
(def ^:private schema:get-file-snapshots
[:map {:title "get-file-snapshots"}
@@ -65,73 +30,7 @@
[cfg {:keys [::rpc/profile-id file-id] :as params}]
(db/run! cfg (fn [{:keys [::db/conn]}]
(files/check-read-permissions! conn profile-id file-id)
(get-file-snapshots conn file-id))))
(defn- generate-snapshot-label
[]
(let [ts (-> (ct/now)
(ct/format-inst)
(str/replace #"[T:\.]" "-")
(str/rtrim "Z"))]
(str "snapshot-" ts)))
(defn create-file-snapshot!
[cfg file & {:keys [label created-by deleted-at profile-id]
:or {deleted-at :default
created-by :system}}]
(assert (#{:system :user :admin} created-by)
"expected valid keyword for created-by")
(let [created-by
(name created-by)
deleted-at
(cond
(= deleted-at :default)
(ct/plus (ct/now) (cf/get-deletion-delay))
(ct/inst? deleted-at)
deleted-at
:else
nil)
label
(or label (generate-snapshot-label))
snapshot-id
(uuid/next)
data
(blob/encode (:data file))
features
(into-array (:features file))
migrations
(into-array (:migrations file))]
(l/dbg :hint "creating file snapshot"
:file-id (str (:id file))
:id (str snapshot-id)
:label label)
(db/insert! cfg :file-change
{:id snapshot-id
:revn (:revn file)
:data data
:version (:version file)
:features features
:migrations migrations
:profile-id profile-id
:file-id (:id file)
:label label
:deleted-at deleted-at
:created-by created-by}
{::db/return-keys false})
{:id snapshot-id :label label}))
(fsnap/get-visible-snapshots conn file-id))))
(def ^:private schema:create-file-snapshot
[:map
@@ -144,7 +43,7 @@
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id label]}]
(files/check-edition-permissions! conn profile-id file-id)
(let [file (bfc/get-file cfg file-id)
(let [file (bfc/get-file cfg file-id :realize? true)
project (db/get-by-id cfg :project (:project-id file))]
(-> cfg
@@ -155,96 +54,10 @@
(quotes/check! {::quotes/id ::quotes/snapshots-per-file}
{::quotes/id ::quotes/snapshots-per-team}))
(create-file-snapshot! cfg file
{:label label
:profile-id profile-id
:created-by :user})))
(defn restore-file-snapshot!
[{:keys [::db/conn ::mbus/msgbus] :as cfg} file-id snapshot-id]
(let [storage (sto/resolve cfg {::db/reuse-conn true})
file (files/get-minimal-file conn file-id {::db/for-update true})
vern (rand-int Integer/MAX_VALUE)
snapshot (some->> (db/get* conn :file-change
{:file-id file-id
:id snapshot-id}
{::db/for-share true})
(feat.fdata/resolve-file-data cfg)
(decode-row))
;; If snapshot has tracked applied migrations, we reuse them,
;; if not we take a safest set of migrations as starting
;; point. This is because, at the time of implementing
;; snapshots, migrations were not taken into account so we
;; need to make this backward compatible in some way.
file (assoc file :migrations
(or (:migrations snapshot)
(fmg/generate-migrations-from-version 67)))]
(when-not snapshot
(ex/raise :type :not-found
:code :snapshot-not-found
:hint "unable to find snapshot with the provided label"
:snapshot-id snapshot-id
:file-id file-id))
(when-not (:data snapshot)
(ex/raise :type :validation
:code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
(l/dbg :hint "restoring snapshot"
:file-id (str file-id)
:label (:label snapshot)
:snapshot-id (str (:id snapshot)))
;; If the file was already offloaded, on restoring the snapshot we
;; are going to replace the file data, so we need to touch the old
;; referenced storage object and avoid possible leaks
(when (feat.fdata/offloaded? file)
(sto/touch-object! storage (:data-ref-id file)))
;; In the same way, on reseting the file data, we need to restore
;; the applied migrations on the moment of taking the snapshot
(reset-migrations! conn file)
(db/update! conn :file
{:data (:data snapshot)
:revn (inc (:revn file))
:vern vern
:version (:version snapshot)
:data-backend nil
:data-ref-id nil
:has-media-trimmed false
:features (:features snapshot)}
{:id file-id})
;; clean object thumbnails
(let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; clean file thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; Send to the clients a notification to reload the file
(mbus/pub! msgbus
:topic (:id file)
:message {:type :file-restore
:file-id (:id file)
:vern vern})
{:id (:id snapshot)
:label (:label snapshot)}))
(fsnap/create! cfg file
{:label label
:profile-id profile-id
:created-by "user"})))
(def ^:private schema:restore-file-snapshot
[:map {:title "restore-file-snapshot"}
@@ -253,88 +66,56 @@
(sv/defmethod ::restore-file-snapshot
{::doc/added "1.20"
::sm/params schema:restore-file-snapshot}
[cfg {:keys [::rpc/profile-id file-id id] :as params}]
(db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}]
(files/check-edition-permissions! conn profile-id file-id)
(let [file (bfc/get-file cfg file-id)]
(create-file-snapshot! cfg file
{:profile-id profile-id
:created-by :system})
(restore-file-snapshot! cfg file-id id)))))
::sm/params schema:restore-file-snapshot
::db/transaction true}
[{:keys [::db/conn ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id file-id id] :as params}]
(files/check-edition-permissions! conn profile-id file-id)
(let [file (bfc/get-file cfg file-id)]
(fsnap/create! cfg file
{:profile-id profile-id
:created-by "system"})
(let [vern (fsnap/restore! cfg file-id id)]
;; Send to the clients a notification to reload the file
(mbus/pub! msgbus
:topic (:id file)
:message {:type :file-restore
:file-id (:id file)
:vern vern})
nil)))
(def ^:private schema:update-file-snapshot
[:map {:title "update-file-snapshot"}
[:id ::sm/uuid]
[:label ::sm/text]])
(defn- update-file-snapshot!
[conn snapshot-id label]
(-> (db/update! conn :file-change
{:label label
:created-by "user"
:deleted-at nil}
{:id snapshot-id}
{::db/return-keys true})
(dissoc :data :features :migrations)))
(defn- get-snapshot
"Get a minimal snapshot from database and lock for update"
[conn id]
(db/get conn :file-change
{:id id}
{::sql/columns [:id :file-id :created-by :deleted-at :profile-id :locked-by]
::db/for-update true}))
(sv/defmethod ::update-file-snapshot
{::doc/added "1.20"
::sm/params schema:update-file-snapshot}
[cfg {:keys [::rpc/profile-id id label]}]
(db/tx-run! cfg
(fn [{:keys [::db/conn]}]
(let [snapshot (get-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
(update-file-snapshot! conn id label)))))
::sm/params schema:update-file-snapshot
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id label]}]
(let [snapshot (fsnap/get-minimal-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
(fsnap/update! conn (assoc snapshot :label label))))
(def ^:private schema:remove-file-snapshot
[:map {:title "remove-file-snapshot"}
[:id ::sm/uuid]])
(defn- delete-file-snapshot!
[conn snapshot-id]
(db/update! conn :file-change
{:deleted-at (ct/now)}
{:id snapshot-id}
{::db/return-keys false})
nil)
(sv/defmethod ::delete-file-snapshot
{::doc/added "1.20"
::sm/params schema:remove-file-snapshot}
[cfg {:keys [::rpc/profile-id id]}]
(db/tx-run! cfg
(fn [{:keys [::db/conn]}]
(let [snapshot (get-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
::sm/params schema:remove-file-snapshot
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id]}]
(let [snapshot (fsnap/get-minimal-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
(when (not= (:created-by snapshot) "user")
(ex/raise :type :validation
:code :system-snapshots-cant-be-deleted
:snapshot-id id
:profile-id profile-id))
;; Check if version is locked by someone else
(when (and (:locked-by snapshot)
(not= (:locked-by snapshot) profile-id))
(ex/raise :type :validation
:code :snapshot-is-locked
:hint "Cannot delete a locked version"
:snapshot-id id
:profile-id profile-id
:locked-by (:locked-by snapshot)))
(delete-file-snapshot! conn id)))))
(when (not= (:created-by snapshot) "user")
(ex/raise :type :validation
:code :system-snapshots-cant-be-deleted
:file-id (:file-id snapshot)
:snapshot-id id
:profile-id profile-id))
(fsnap/delete! conn snapshot)))
;;; Lock/unlock version endpoints
@@ -342,6 +123,7 @@
[:map {:title "lock-file-snapshot"}
[:id ::sm/uuid]])
;; MOVE to fsnap
(defn- lock-file-snapshot!
[conn snapshot-id profile-id]
(db/update! conn :file-change
@@ -352,44 +134,45 @@
(sv/defmethod ::lock-file-snapshot
{::doc/added "1.20"
::sm/params schema:lock-file-snapshot}
[cfg {:keys [::rpc/profile-id id]}]
(db/tx-run! cfg
(fn [{:keys [::db/conn]}]
(let [snapshot (get-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
::sm/params schema:lock-file-snapshot
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id]}]
(let [snapshot (fsnap/get-minimal-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
(when (not= (:created-by snapshot) "user")
(ex/raise :type :validation
:code :system-snapshots-cant-be-locked
:hint "Only user-created versions can be locked"
:snapshot-id id
:profile-id profile-id))
(when (not= (:created-by snapshot) "user")
(ex/raise :type :validation
:code :system-snapshots-cant-be-locked
:hint "Only user-created versions can be locked"
:snapshot-id id
:profile-id profile-id))
;; Only the creator can lock their own version
(when (not= (:profile-id snapshot) profile-id)
(ex/raise :type :validation
:code :only-creator-can-lock
:hint "Only the version creator can lock it"
:snapshot-id id
:profile-id profile-id
:creator-id (:profile-id snapshot)))
;; Only the creator can lock their own version
(when (not= (:profile-id snapshot) profile-id)
(ex/raise :type :validation
:code :only-creator-can-lock
:hint "Only the version creator can lock it"
:snapshot-id id
:profile-id profile-id
:creator-id (:profile-id snapshot)))
;; Check if already locked
(when (:locked-by snapshot)
(ex/raise :type :validation
:code :snapshot-already-locked
:hint "Version is already locked"
:snapshot-id id
:profile-id profile-id
:locked-by (:locked-by snapshot)))
;; Check if already locked
(when (:locked-by snapshot)
(ex/raise :type :validation
:code :snapshot-already-locked
:hint "Version is already locked"
:snapshot-id id
:profile-id profile-id
:locked-by (:locked-by snapshot)))
(lock-file-snapshot! conn id profile-id)))))
(lock-file-snapshot! conn id profile-id)))
(def ^:private schema:unlock-file-snapshot
[:map {:title "unlock-file-snapshot"}
[:id ::sm/uuid]])
;; MOVE to fsnap
(defn- unlock-file-snapshot!
[conn snapshot-id]
(db/update! conn :file-change
@@ -400,35 +183,34 @@
(sv/defmethod ::unlock-file-snapshot
{::doc/added "1.20"
::sm/params schema:unlock-file-snapshot}
[cfg {:keys [::rpc/profile-id id]}]
(db/tx-run! cfg
(fn [{:keys [::db/conn]}]
(let [snapshot (get-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
::sm/params schema:unlock-file-snapshot
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id]}]
(let [snapshot (fsnap/get-minimal-snapshot conn id)]
(files/check-edition-permissions! conn profile-id (:file-id snapshot))
(when (not= (:created-by snapshot) "user")
(ex/raise :type :validation
:code :system-snapshots-cant-be-unlocked
:hint "Only user-created versions can be unlocked"
:snapshot-id id
:profile-id profile-id))
(when (not= (:created-by snapshot) "user")
(ex/raise :type :validation
:code :system-snapshots-cant-be-unlocked
:hint "Only user-created versions can be unlocked"
:snapshot-id id
:profile-id profile-id))
;; Only the creator can unlock their own version
(when (not= (:profile-id snapshot) profile-id)
(ex/raise :type :validation
:code :only-creator-can-unlock
:hint "Only the version creator can unlock it"
:snapshot-id id
:profile-id profile-id
:creator-id (:profile-id snapshot)))
;; Only the creator can unlock their own version
(when (not= (:profile-id snapshot) profile-id)
(ex/raise :type :validation
:code :only-creator-can-unlock
:hint "Only the version creator can unlock it"
:snapshot-id id
:profile-id profile-id
:creator-id (:profile-id snapshot)))
;; Check if not locked
(when (not (:locked-by snapshot))
(ex/raise :type :validation
:code :snapshot-not-locked
:hint "Version is not locked"
:snapshot-id id
:profile-id profile-id))
;; Check if not locked
(when (not (:locked-by snapshot))
(ex/raise :type :validation
:code :snapshot-not-locked
:hint "Version is not locked"
:snapshot-id id
:profile-id profile-id))
(unlock-file-snapshot! conn id)))))
(unlock-file-snapshot! conn id)))

View File

@@ -1,161 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.rpc.commands.files-temp
(:require
[app.common.exceptions :as ex]
[app.common.features :as cfeat]
[app.common.files.changes :as cpc]
[app.common.schema :as sm]
[app.common.time :as ct]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.db.sql :as sql]
[app.features.fdata :as fdata]
[app.loggers.audit :as audit]
[app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
[app.rpc.commands.files-create :as files.create]
[app.rpc.commands.files-update :as-alias files.update]
[app.rpc.commands.projects :as projects]
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[clojure.set :as set]))
;; --- MUTATION COMMAND: create-temp-file
(def ^:private schema:create-temp-file
[:map {:title "create-temp-file"}
[:name [:string {:max 250}]]
[:project-id ::sm/uuid]
[:id {:optional true} ::sm/uuid]
[:is-shared ::sm/boolean]
[:features ::cfeat/features]
[:create-page ::sm/boolean]])
(sv/defmethod ::create-temp-file
{::doc/added "1.17"
::doc/module :files
::sm/params schema:create-temp-file
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id project-id] :as params}]
(projects/check-edition-permissions! conn profile-id project-id)
(let [team (teams/get-team conn :profile-id profile-id :project-id project-id)
;; When we create files, we only need to respect the team
;; features, because some features can be enabled
;; globally, but the team is still not migrated properly.
input-features
(:features params #{})
;; If the imported project doesn't contain v2 we need to remove it
team-features
(cond-> (cfeat/get-team-enabled-features cf/flags team)
(not (contains? input-features "components/v2"))
(disj "components/v2"))
;; We also include all no migration features declared by
;; client; that enables the ability to enable a runtime
;; feature on frontend and make it permanent on file
features
(-> input-features
(set/intersection cfeat/no-migration-features)
(set/union team-features))
params
(-> params
(assoc :profile-id profile-id)
(assoc :deleted-at (ct/in-future {:days 1}))
(assoc :features features))]
(files.create/create-file cfg params)))
;; --- MUTATION COMMAND: update-temp-file
(def ^:private schema:update-temp-file
[:map {:title "update-temp-file"}
[:changes [:vector ::cpc/change]]
[:revn [::sm/int {:min 0}]]
[:session-id ::sm/uuid]
[:id ::sm/uuid]])
(sv/defmethod ::update-temp-file
{::doc/added "1.17"
::doc/module :files
::sm/params schema:update-temp-file}
[cfg {:keys [::rpc/profile-id session-id id revn changes] :as params}]
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
(db/insert! conn :file-change
{:id (uuid/next)
:session-id session-id
:profile-id profile-id
:created-at (ct/now)
:file-id id
:revn revn
:data nil
:changes (blob/encode changes)})
(rph/with-meta (rph/wrap nil)
{::audit/replace-props {:file-id id
:revn revn}}))))
;; --- MUTATION COMMAND: persist-temp-file
(defn persist-temp-file
[{:keys [::db/conn] :as cfg} {:keys [id] :as params}]
(let [file (files/get-file cfg id
:migrate? false
:lock-for-update? true)]
(when (nil? (:deleted-at file))
(ex/raise :type :validation
:code :cant-persist-already-persisted-file))
(let [changes (->> (db/cursor conn
(sql/select :file-change {:file-id id}
{:order-by [[:revn :asc]]})
{:chunk-size 10})
(sequence (mapcat (comp blob/decode :changes))))
file (update file :data cpc/process-changes changes)
file (if (contains? (:features file) "fdata/objects-map")
(fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (fdata/enable-pointer-map file)]
(fdata/persist-pointers! cfg id)
file))
file)]
;; Delete changes from the changes history
(db/delete! conn :file-change {:file-id id})
(db/update! conn :file
{:deleted-at nil
:revn 1
:data (blob/encode (:data file))}
{:id id})
nil)))
(def ^:private schema:persist-temp-file
[:map {:title "persist-temp-file"}
[:id ::sm/uuid]])
(sv/defmethod ::persist-temp-file
{::doc/added "1.17"
::doc/module :files
::sm/params schema:persist-temp-file}
[cfg {:keys [::rpc/profile-id id] :as params}]
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(files/check-edition-permissions! conn profile-id id)
(persist-temp-file cfg params))))

View File

@@ -6,6 +6,7 @@
(ns app.rpc.commands.files-thumbnails
(:require
[app.binfile.common :as bfc]
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.features :as cfeat]
@@ -202,9 +203,9 @@
:profile-id profile-id
:file-id file-id)
file (files/get-file cfg file-id
:preload-pointers? true
:read-only? true)]
file (bfc/get-file cfg file-id
:realize? true
:read-only? true)]
(-> (cfeat/get-team-enabled-features cf/flags team)
(cfeat/check-file-features! (:features file)))
@@ -339,6 +340,7 @@
data (-> (sto/content path)
(sto/wrap-with-hash hash))
tnow (ct/now)
media (sto/put-object! storage
{::sto/content data
::sto/deduplicate? true

View File

@@ -19,8 +19,9 @@
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.features.fdata :as feat.fdata]
[app.features.fdata :as fdata]
[app.features.file-migrations :as feat.fmigr]
[app.features.file-snapshots :as fsnap]
[app.features.logical-deletion :as ldel]
[app.http.errors :as errors]
[app.loggers.audit :as audit]
@@ -33,7 +34,6 @@
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.storage :as sto]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
@@ -129,77 +129,78 @@
::sm/params schema:update-file
::sm/result schema:update-file-result
::doc/module :files
::doc/added "1.17"}
[{:keys [::mtx/metrics] :as cfg}
::doc/added "1.17"
::db/transaction true}
[{:keys [::mtx/metrics ::db/conn] :as cfg}
{:keys [::rpc/profile-id id changes changes-with-metadata] :as params}]
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(files/check-edition-permissions! conn profile-id id)
(db/xact-lock! conn id)
(let [file (get-file conn id)
team (teams/get-team conn
:profile-id profile-id
:team-id (:team-id file))
(files/check-edition-permissions! conn profile-id id)
(db/xact-lock! conn id)
features (-> (cfeat/get-team-enabled-features cf/flags team)
(cfeat/check-client-features! (:features params))
(cfeat/check-file-features! (:features file)))
(let [file (get-file cfg id)
team (teams/get-team conn
:profile-id profile-id
:team-id (:team-id file))
changes (if changes-with-metadata
(->> changes-with-metadata (mapcat :changes) vec)
(vec changes))
features (-> (cfeat/get-team-enabled-features cf/flags team)
(cfeat/check-client-features! (:features params))
(cfeat/check-file-features! (:features file)))
params (-> params
(assoc :profile-id profile-id)
(assoc :features (set/difference features cfeat/frontend-only-features))
(assoc :team team)
(assoc :file file)
(assoc :changes changes))
changes (if changes-with-metadata
(->> changes-with-metadata (mapcat :changes) vec)
(vec changes))
cfg (assoc cfg ::timestamp (ct/now))
params (-> params
(assoc :profile-id profile-id)
(assoc :features (set/difference features cfeat/frontend-only-features))
(assoc :team team)
(assoc :file file)
(assoc :changes changes))
tpoint (ct/tpoint)]
cfg (assoc cfg ::timestamp (ct/now))
tpoint (ct/tpoint)]
(when (not= (:vern params)
(:vern file))
(ex/raise :type :validation
:code :vern-conflict
:hint "A different version has been restored for the file."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
(when (not= (:vern params)
(:vern file))
(ex/raise :type :validation
:code :vern-conflict
:hint "A different version has been restored for the file."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
(when (> (:revn params)
(:revn file))
(ex/raise :type :validation
:code :revn-conflict
:hint "The incoming revision number is greater that stored version."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
(when (> (:revn params)
(:revn file))
(ex/raise :type :validation
:code :revn-conflict
:hint "The incoming revision number is greater that stored version."
:context {:incoming-revn (:revn params)
:stored-revn (:revn file)}))
;; When newly computed features does not match exactly with
;; the features defined on team row, we update it
(when-let [features (-> features
(set/difference (:features team))
(set/difference cfeat/no-team-inheritable-features)
(not-empty))]
(let [features (->> features
(set/union (:features team))
(db/create-array conn "text"))]
(db/update! conn :team
{:features features}
{:id (:id team)}
{::db/return-keys false})))
;; When newly computed features does not match exactly with
;; the features defined on team row, we update it
(when-let [features (-> features
(set/difference (:features team))
(set/difference cfeat/no-team-inheritable-features)
(not-empty))]
(let [features (->> features
(set/union (:features team))
(db/create-array conn "text"))]
(db/update! conn :team
{:features features}
{:id (:id team)}
{::db/return-keys false})))
(mtx/run! metrics {:id :update-file-changes :inc (count changes)})
(mtx/run! metrics {:id :update-file-changes :inc (count changes)})
(binding [l/*context* (some-> (meta params)
(get :app.http/request)
(errors/request->context))]
(-> (update-file* cfg params)
(rph/with-defer #(let [elapsed (tpoint)]
(l/trace :hint "update-file" :time (ct/format-duration elapsed))))))))))
(binding [l/*context* (some-> (meta params)
(get :app.http/request)
(errors/request->context))]
(-> (update-file* cfg params)
(rph/with-defer #(let [elapsed (tpoint)]
(l/trace :hint "update-file" :time (ct/format-duration elapsed))))))))
(defn- update-file*
"Internal function, part of the update-file process, that encapsulates
@@ -212,28 +213,44 @@
[{:keys [::db/conn ::wrk/executor ::timestamp] :as cfg}
{:keys [profile-id file team features changes session-id skip-validate] :as params}]
(let [;; Retrieve the file data
file (feat.fmigr/resolve-applied-migrations cfg file)
file (feat.fdata/resolve-file-data cfg file)
file (assoc file :features
(-> features
(set/difference cfeat/frontend-only-features)
(set/union (:features file))))]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial fdata/load-pointer cfg (:id file))]
;; We create a new lexycal scope for clearly delimit the result of
;; executing this update file operation and all its side effects
(let [file (px/invoke! executor
(fn []
;; Process the file data on separated thread for avoid to do
;; the CPU intensive operation on vthread.
(binding [cfeat/*current* features
cfeat/*previous* (:features file)]
(update-file-data! cfg file
process-changes-and-validate
changes skip-validate))))]
(let [file (assoc file :features
(-> features
(set/difference cfeat/frontend-only-features)
(set/union (:features file))))
(feat.fmigr/upsert-migrations! conn file)
(persist-file! cfg file)
;; We need to preserve the original revn for the response
revn
(get file :revn)
;; We create a new lexical scope for clearly delimit the result of
;; executing this update file operation and all its side effects
file
(px/invoke! executor
(fn []
;; Process the file data on separated thread
;; for avoid to do the CPU intensive operation
;; on vthread.
(binding [cfeat/*current* features
cfeat/*previous* (:features file)]
(update-file-data! cfg file
process-changes-and-validate
changes skip-validate))))
deleted-at
(ct/plus timestamp (ct/duration {:hours 1}))]
(when-let [file (::snapshot file)]
(let [deleted-at (ct/plus timestamp (ldel/get-deletion-delay team))
label (str "internal/snapshot/" revn)]
(fsnap/create! cfg file
{:label label
:deleted-at deleted-at
:profile-id profile-id
:session-id session-id})))
;; Insert change (xlog) with deleted_at in a future data for
;; make them automatically eleggible for GC once they expires
@@ -243,34 +260,28 @@
:profile-id profile-id
:created-at timestamp
:updated-at timestamp
:deleted-at (if (::snapshot-data file)
(ct/plus timestamp (ldel/get-deletion-delay team))
(ct/plus timestamp (ct/duration {:hours 1})))
:deleted-at deleted-at
:file-id (:id file)
:revn (:revn file)
:version (:version file)
:features (:features file)
:label (::snapshot-label file)
:data (::snapshot-data file)
:features (into-array (:features file))
:changes (blob/encode changes)}
{::db/return-keys false})
(persist-file! cfg file)
;; Send asynchronous notifications
(send-notifications! cfg params file))
(send-notifications! cfg params file)
(when (feat.fdata/offloaded? file)
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
(some->> (:data-ref-id file) (sto/touch-object! storage))))
(let [response {:revn (:revn file)
:lagged (get-lagged-changes conn params)}]
(vary-meta response assoc ::audit/replace-props
{:id (:id file)
:name (:name file)
:features (:features file)
:project-id (:project-id file)
:team-id (:team-id file)}))))
(with-meta {:revn revn :lagged (get-lagged-changes conn params)}
{::audit/replace-props
{:id (:id file)
:name (:name file)
:features (:features file)
:project-id (:project-id file)
:team-id (:team-id file)}}))))
;: FIXME: DEPRECATED
(defn update-file!
"A public api that allows apply a transformation to a file with all context setup."
[{:keys [::db/conn] :as cfg} file-id update-fn & args]
@@ -279,51 +290,42 @@
(feat.fmigr/upsert-migrations! conn file)
(persist-file! cfg file)))
(def ^:private sql:get-file
"SELECT f.*, p.team_id
FROM file AS f
JOIN project AS p ON (p.id = f.project_id)
WHERE f.id = ?
AND (f.deleted_at IS NULL OR
f.deleted_at > now())
FOR KEY SHARE")
(defn get-file
"Get not-decoded file, only decodes the features set."
[conn id]
(let [file (db/exec-one! conn [sql:get-file id])]
(when-not file
(ex/raise :type :not-found
:code :object-not-found
:hint (format "file with id '%s' does not exists" id)))
(update file :features db/decode-pgarray #{})))
[cfg id]
;; FIXME: lock for share
(bfc/get-file cfg id :decode? false :lock-for-update? true))
(defn persist-file!
"Function responsible of persisting already encoded file. Should be
used together with `get-file` and `update-file-data!`.
It also updates the project modified-at attr."
[{:keys [::db/conn ::timestamp]} file]
[{:keys [::db/conn ::timestamp] :as cfg} file]
(let [;; The timestamp can be nil because this function is also
;; intended to be used outside of this module
modified-at (or timestamp (ct/now))]
modified-at
(or timestamp (ct/now))
file
(-> file
(dissoc ::snapshot)
(assoc :modified-at modified-at)
(assoc :has-media-trimmed false))]
(db/update! conn :project
{:modified-at modified-at}
{:id (:project-id file)}
{::db/return-keys false})
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:version (:version file)
:features (:features file)
:data-backend nil
:data-ref-id nil
:modified-at modified-at
:has-media-trimmed false}
{:id (:id file)}
{::db/return-keys false})))
(bfc/update-file! cfg file)))
(defn- attach-snapshot
"Attach snapshot data to the file. This should be called before the
upcoming file operations are applied to the file."
[file migrated? cfg]
(let [snapshot (if migrated? file (update file :data (partial fdata/realize cfg)))]
(assoc file ::snapshot snapshot)))
(defn- update-file-data!
"Perform a file data transformation in with all update context setup.
@@ -335,52 +337,35 @@
fdata/pointer-map modified fragments."
[cfg {:keys [id] :as file} update-fn & args]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(let [file (update file :data (fn [data]
(-> data
(blob/decode)
(assoc :id (:id file)))))
libs (delay (bfc/get-resolved-file-libraries cfg file))
(let [file (update file :data (fn [data]
(-> data
(blob/decode)
(assoc :id id))))
libs (delay (bfc/get-resolved-file-libraries cfg file))
;; For avoid unnecesary overhead of creating multiple pointers
;; and handly internally with objects map in their worst
;; case (when probably all shapes and all pointers will be
;; readed in any case), we just realize/resolve them before
;; applying the migration to the file
file (if (fmg/need-migration? file)
(-> file
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(fmg/migrate-file libs))
file)
need-migration?
(fmg/need-migration? file)
file (apply update-fn cfg file args)
take-snapshot?
(take-snapshot? file)
;; TODO: reuse operations if file is migrated
;; TODO: move encoding to a separated thread
file (if (take-snapshot? file)
(let [tpoint (ct/tpoint)
snapshot (-> (:data file)
(feat.fdata/process-pointers deref)
(feat.fdata/process-objects (partial into {}))
(blob/encode))
elapsed (tpoint)
label (str "internal/snapshot/" (:revn file))]
;; For avoid unnecesary overhead of creating multiple
;; pointers and handly internally with objects map in their
;; worst case (when probably all shapes and all pointers
;; will be readed in any case), we just realize/resolve them
;; before applying the migration to the file
file
(cond-> file
need-migration?
(->> (fdata/realize cfg))
(l/trc :hint "take snapshot"
:file-id (str (:id file))
:revn (:revn file)
:label label
:elapsed (ct/format-duration elapsed))
need-migration?
(fmg/migrate-file libs)
(-> file
(assoc ::snapshot-data snapshot)
(assoc ::snapshot-label label)))
file)]
(bfc/encode-file cfg file))))
take-snapshot?
(attach-snapshot need-migration? cfg))]
(apply update-fn cfg file args)))
(defn- soft-validate-file-schema!
[file]
@@ -470,8 +455,9 @@
(defn- get-lagged-changes
[conn {:keys [id revn] :as params}]
(->> (db/exec! conn [sql:lagged-changes id revn])
(map files/decode-row)
(vec)))
(filter :changes)
(mapv (fn [row]
(update row :changes blob/decode)))))
(defn- send-notifications!
[cfg {:keys [team changes session-id] :as params} file]

View File

@@ -6,6 +6,7 @@
(ns app.rpc.commands.teams-invitations
(:require
[app.binfile.common :as bfc]
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
@@ -21,7 +22,6 @@
[app.loggers.audit :as audit]
[app.main :as-alias main]
[app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
[app.rpc.commands.profile :as profile]
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
@@ -499,7 +499,7 @@
"A specific method for obtain a file with name and page-id used for
team request access procediment"
[cfg file-id]
(let [file (files/get-file cfg file-id :migrate? false)]
(let [file (bfc/get-file cfg file-id :migrate? false)]
(-> file
(dissoc :data)
(dissoc :deleted-at)

View File

@@ -51,7 +51,7 @@
(defn- get-view-only-bundle
[{:keys [::db/conn] :as cfg} {:keys [profile-id file-id ::perms] :as params}]
(let [file (files/get-file cfg file-id)
(let [file (bfc/get-file cfg file-id)
project (db/get conn :project
{:id (:project-id file)}
@@ -81,7 +81,7 @@
libs (->> (bfc/get-file-libraries conn file-id)
(mapv (fn [{:keys [id] :as lib}]
(merge lib (files/get-file cfg id)))))
(merge lib (bfc/get-file cfg id)))))
links (->> (db/query conn :share-link {:file-id file-id})
(mapv (fn [row]

View File

@@ -14,9 +14,8 @@
[app.common.files.validate :as cfv]
[app.common.time :as ct]
[app.db :as db]
[app.main :as main]
[app.rpc.commands.files :as files]
[app.rpc.commands.files-snapshot :as fsnap]))
[app.features.file-snapshots :as fsnap]
[app.main :as main]))
(def ^:dynamic *system* nil)
@@ -48,7 +47,7 @@
([system id]
(db/run! system
(fn [system]
(files/get-file system id :migrate? false)))))
(bfc/get-file system id :decode? false)))))
(defn update-team!
[system {:keys [id] :as team}]
@@ -118,10 +117,10 @@
(let [conn (db/get-connection system)]
(->> (get-and-lock-team-files conn team-id)
(reduce (fn [result file-id]
(let [file (fsnap/get-file-snapshots system file-id)]
(fsnap/create-file-snapshot! system file
{:label label
:created-by :admin})
(let [file (bfc/get-file system file-id :realize? true :lock-for-update? true)]
(fsnap/create! system file
{:label label
:created-by "admin"})
(inc result)))
0))))
@@ -132,21 +131,23 @@
(into #{}))
snap (search-file-snapshots conn ids label)
ids' (into #{} (map :file-id) snap)]
(when (not= ids ids')
(throw (RuntimeException. "no uniform snapshot available")))
(reduce (fn [result {:keys [file-id id]}]
(fsnap/restore-file-snapshot! system file-id id)
(fsnap/restore! system file-id id)
(inc result))
0
snap)))
(defn process-file!
[system file-id update-fn & {:keys [label validate? with-libraries?] :or {validate? true} :as opts}]
(let [file (bfc/get-file system file-id ::db/for-update true)
(let [file (bfc/get-file system file-id
:lock-for-update? true
:realize? true)
libs (when with-libraries?
(bfc/get-resolved-file-libraries system file))
@@ -163,10 +164,10 @@
(cfv/validate-file-schema! file'))
(when (string? label)
(fsnap/create-file-snapshot! system file
{:label label
:deleted-at (ct/in-future {:days 30})
:created-by :admin}))
(fsnap/create! system file
{:label label
:deleted-at (ct/in-future {:days 30})
:created-by "admin"}))
(let [file' (update file' :revn inc)]
(bfc/update-file! system file')

View File

@@ -24,13 +24,13 @@
[app.config :as cf]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as feat.fdata]
[app.features.fdata :as fdata]
[app.features.file-snapshots :as fsnap]
[app.loggers.audit :as audit]
[app.main :as main]
[app.msgbus :as mbus]
[app.rpc.commands.auth :as auth]
[app.rpc.commands.files :as files]
[app.rpc.commands.files-snapshot :as fsnap]
[app.rpc.commands.management :as mgmt]
[app.rpc.commands.profile :as profile]
[app.rpc.commands.projects :as projects]
@@ -150,15 +150,15 @@
(defn enable-objects-map-feature-on-file!
[file-id & {:as opts}]
(process-file! file-id feat.fdata/enable-objects-map opts))
(process-file! file-id fdata/enable-objects-map opts))
(defn enable-pointer-map-feature-on-file!
[file-id & {:as opts}]
(process-file! file-id feat.fdata/enable-pointer-map opts))
(process-file! file-id fdata/enable-pointer-map opts))
(defn enable-path-data-feature-on-file!
[file-id & {:as opts}]
(process-file! file-id feat.fdata/enable-path-data opts))
(process-file! file-id fdata/enable-path-data opts))
(defn enable-storage-features-on-file!
[file-id & {:as opts}]
@@ -338,7 +338,10 @@
collectable file-changes entry."
[& {:keys [file-id label]}]
(let [file-id (h/parse-uuid file-id)]
(db/tx-run! main/system fsnap/create-file-snapshot! {:file-id file-id :label label})))
(db/tx-run! main/system
(fn [cfg]
(let [file (bfc/get-file cfg file-id :realize? true)]
(fsnap/create! cfg file {:label label :created-by "admin"}))))))
(defn restore-file-snapshot!
[file-id & {:keys [label id]}]
@@ -348,13 +351,13 @@
(fn [{:keys [::db/conn] :as system}]
(cond
(uuid? snapshot-id)
(fsnap/restore-file-snapshot! system file-id snapshot-id)
(fsnap/restore! system file-id snapshot-id)
(string? label)
(->> (h/search-file-snapshots conn #{file-id} label)
(map :id)
(first)
(fsnap/restore-file-snapshot! system file-id))
(fsnap/restore! system file-id))
:else
(throw (ex-info "snapshot id or label should be provided" {})))))))
@@ -363,9 +366,9 @@
[file-id & {:as _}]
(let [file-id (h/parse-uuid file-id)]
(db/tx-run! main/system
(fn [{:keys [::db/conn]}]
(->> (fsnap/get-file-snapshots conn file-id)
(print-table [:label :id :revn :created-at]))))))
(fn [cfg]
(->> (fsnap/get-visible-snapshots cfg file-id)
(print-table [:label :id :revn :created-at :created-by]))))))
(defn take-team-snapshot!
[team-id & {:keys [label rollback?] :or {rollback? true}}]
@@ -547,6 +550,68 @@
:rollback rollback?
:elapsed elapsed))))))
(defn process!
"Apply a function to all files in the database"
[& {:keys [max-jobs
rollback?
max-items
chunk-size
proc-fn]
:or {max-items Long/MAX_VALUE
chunk-size 100
rollback? true}
:as opts}]
(let [tpoint (ct/tpoint)
max-jobs (or max-jobs (px/get-available-processors))
processed (atom 0)
opts (-> opts
(assoc :chunk-size chunk-size)
(dissoc :rollback?)
(dissoc :proc-fn)
(dissoc :max-jobs)
(dissoc :max-items))
start-job
(fn [jid]
(l/dbg :hint "start job thread" :jid jid)
(px/sleep 1000)
(loop []
(let [result (-> main/system
(assoc ::db/rollback rollback?)
(proc-fn opts))]
(let [total (swap! processed + result)]
(l/dbg :hint "chunk processed" :jid jid :total total :chunk result ::l/sync? true)
(when (and (pos? result)
(< total max-items))
(recur))))))]
(l/dbg :hint "process:start"
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(try
(let [jobs (->> (range max-jobs)
(map (fn [jid] (px/fn->thread (partial start-job jid))))
(doall))]
(doseq [job jobs]
(.join ^java.lang.Thread job)))
(catch Throwable cause
(l/dbg :hint "process:error" :cause cause))
(finally
(let [elapsed (ct/format-duration (tpoint))]
(l/dbg :hint "process:end"
:processed @processed
:rollback rollback?
:elapsed elapsed))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; DELETE/RESTORE OBJECTS (WITH CASCADE, SOFT)
@@ -606,11 +671,10 @@
(let [file-id (h/parse-uuid file-id)]
(db/tx-run! main/system
(fn [system]
(when-let [file (some-> (db/get* system :file
{:id file-id}
{::db/remove-deleted false
::sql/columns [:id :name]})
(files/decode-row))]
(when-let [file (db/get* system :file
{:id file-id}
{::db/remove-deleted false
::sql/columns [:id :name]})]
(audit/insert! system
{::audit/name "restore-file"
::audit/type "action"
@@ -831,6 +895,19 @@
(with-open [reader (io/reader path)]
(process-data! system deleted-at (line-seq reader))))))))
(defn process-chunks
"A generic function that executes the specified proc iterativelly
until 0 results is returned"
[cfg proc-fn & params]
(loop [total 0]
(let [result (apply proc-fn cfg params)]
(if (pos? result)
(do
(l/trc :hint "chunk processed" :size result :total total)
(recur (+ total result)))
total))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; CASCADE FIXING
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@@ -113,13 +113,10 @@
(defn- create-database-object
[{:keys [::backend ::db/connectable]} {:keys [::content ::expired-at ::touched-at ::touch] :as params}]
(let [id (or (:id params) (uuid/random))
(let [id (or (::id params) (uuid/random))
mdata (cond-> (get-metadata params)
(satisfies? impl/IContentHash content)
(assoc :hash (impl/get-hash content))
:always
(dissoc :id))
(assoc :hash (impl/get-hash content)))
touched-at (if touch
(or touched-at (ct/now))

View File

@@ -34,7 +34,7 @@
(SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE ttf_file_id = ?))) AS has_refs")
(defn- has-team-font-variant-refs?
[conn id]
[conn {:keys [id]}]
(-> (db/exec-one! conn [sql:has-team-font-variant-refs id id id id])
(get :has-refs)))
@@ -44,7 +44,7 @@
(SELECT EXISTS (SELECT 1 FROM file_media_object WHERE thumbnail_id = ?))) AS has_refs")
(defn- has-file-media-object-refs?
[conn id]
[conn {:keys [id]}]
(-> (db/exec-one! conn [sql:has-file-media-object-refs id id])
(get :has-refs)))
@@ -53,7 +53,7 @@
(SELECT EXISTS (SELECT 1 FROM team WHERE photo_id = ?))) AS has_refs")
(defn- has-profile-refs?
[conn id]
[conn {:keys [id]}]
(-> (db/exec-one! conn [sql:has-profile-refs id id])
(get :has-refs)))
@@ -62,7 +62,7 @@
"SELECT EXISTS (SELECT 1 FROM file_tagged_object_thumbnail WHERE media_id = ?) AS has_refs")
(defn- has-file-object-thumbnails-refs?
[conn id]
[conn {:keys [id]}]
(-> (db/exec-one! conn [sql:has-file-object-thumbnail-refs id])
(get :has-refs)))
@@ -71,36 +71,23 @@
"SELECT EXISTS (SELECT 1 FROM file_thumbnail WHERE media_id = ?) AS has_refs")
(defn- has-file-thumbnails-refs?
[conn id]
[conn {:keys [id]}]
(-> (db/exec-one! conn [sql:has-file-thumbnail-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-data-refs
"SELECT EXISTS (SELECT 1 FROM file WHERE data_ref_id = ?) AS has_refs")
(def sql:exists-file-data-refs
"SELECT EXISTS (
SELECT 1 FROM file_data
WHERE file_id = ?
AND id = ?
AND metadata->>'storage-ref-id' = ?::text
) AS has_refs")
(defn- has-file-data-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-data-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-data-fragment-refs
"SELECT EXISTS (SELECT 1 FROM file_data_fragment WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-data-fragment-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-data-fragment-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-change-refs
"SELECT EXISTS (SELECT 1 FROM file_change WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-change-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-change-refs id])
(get :has-refs)))
[conn sobject]
(let [{:keys [file-id id]} (:metadata sobject)]
(-> (db/exec-one! conn [sql:exists-file-data-refs file-id id (:id sobject)])
(get :has-refs))))
(def ^:private sql:mark-freeze-in-bulk
"UPDATE storage_object
@@ -143,52 +130,50 @@
"file-media-object"))
(defn- process-objects!
[conn has-refs? ids bucket]
[conn has-refs? bucket objects]
(loop [to-freeze #{}
to-delete #{}
ids (seq ids)]
(if-let [id (first ids)]
(if (has-refs? conn id)
objects (seq objects)]
(if-let [{:keys [id] :as object} (first objects)]
(if (has-refs? conn object)
(do
(l/debug :hint "processing object"
:id (str id)
:status "freeze"
:bucket bucket)
(recur (conj to-freeze id) to-delete (rest ids)))
(recur (conj to-freeze id) to-delete (rest objects)))
(do
(l/debug :hint "processing object"
:id (str id)
:status "delete"
:bucket bucket)
(recur to-freeze (conj to-delete id) (rest ids))))
(recur to-freeze (conj to-delete id) (rest objects))))
(do
(some->> (seq to-freeze) (mark-freeze-in-bulk! conn))
(some->> (seq to-delete) (mark-delete-in-bulk! conn))
[(count to-freeze) (count to-delete)]))))
(defn- process-bucket!
[conn bucket ids]
[conn bucket objects]
(case bucket
"file-media-object" (process-objects! conn has-file-media-object-refs? ids bucket)
"team-font-variant" (process-objects! conn has-team-font-variant-refs? ids bucket)
"file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket)
"file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket)
"profile" (process-objects! conn has-profile-refs? ids bucket)
"file-data" (process-objects! conn has-file-data-refs? ids bucket)
"file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket)
"file-change" (process-objects! conn has-file-change-refs? ids bucket)
"file-media-object" (process-objects! conn has-file-media-object-refs? bucket objects)
"team-font-variant" (process-objects! conn has-team-font-variant-refs? bucket objects)
"file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? bucket objects)
"file-thumbnail" (process-objects! conn has-file-thumbnails-refs? bucket objects)
"profile" (process-objects! conn has-profile-refs? bucket objects)
"file-data" (process-objects! conn has-file-data-refs? bucket objects)
(ex/raise :type :internal
:code :unexpected-unknown-reference
:hint (dm/fmt "unknown reference '%'" bucket))))
(defn process-chunk!
[{:keys [::db/conn]} chunk]
(reduce-kv (fn [[nfo ndo] bucket ids]
(let [[nfo' ndo'] (process-bucket! conn bucket ids)]
(reduce-kv (fn [[nfo ndo] bucket objects]
(let [[nfo' ndo'] (process-bucket! conn bucket objects)]
[(+ nfo nfo')
(+ ndo ndo')]))
[0 0]
(d/group-by lookup-bucket :id #{} chunk)))
(d/group-by lookup-bucket identity #{} chunk)))
(def ^:private
sql:get-touched-storage-objects

View File

@@ -45,6 +45,11 @@
{:deleted-at deleted-at}
{:file-id id})
;; Mark file data fragment to be deleted
(db/update! conn :file-data-fragment
{:deleted-at deleted-at}
{:file-id id})
;; Mark file media objects to be deleted
(db/update! conn :file-media-object
{:deleted-at deleted-at}

View File

@@ -23,26 +23,13 @@
[app.config :as cf]
[app.db :as db]
[app.features.fdata :as feat.fdata]
[app.features.file-snapshots :as fsnap]
[app.storage :as sto]
[app.worker :as wrk]
[integrant.core :as ig]))
(declare get-file)
(def sql:get-snapshots
"SELECT fc.file_id AS id,
fc.id AS snapshot_id,
fc.data,
fc.revn,
fc.version,
fc.features,
fc.data_backend,
fc.data_ref_id
FROM file_change AS fc
WHERE fc.file_id = ?
AND fc.data IS NOT NULL
ORDER BY fc.created_at ASC")
(def ^:private sql:mark-file-media-object-deleted
"UPDATE file_media_object
SET deleted_at = now()
@@ -57,21 +44,22 @@
(defn- clean-file-media!
"Performs the garbage collection of file media objects."
[{:keys [::db/conn] :as cfg} {:keys [id] :as file}]
(let [xform (comp
(map (partial bfc/decode-file cfg))
xf:collect-used-media)
(let [used-media
(fsnap/reduce-snapshots cfg id xf:collect-used-media conj #{})
used (->> (db/plan conn [sql:get-snapshots id] {:fetch-size 1})
(transduce xform conj #{}))
used (into used xf:collect-used-media [file])
used-media
(into used-media xf:collect-used-media [file])
ids (db/create-array conn "uuid" used)
unused (->> (db/exec! conn [sql:mark-file-media-object-deleted id ids])
(into #{} (map :id)))]
used-media
(db/create-array conn "uuid" used-media)
(l/dbg :hint "clean" :rel "file-media-object" :file-id (str id) :total (count unused))
unused-media
(->> (db/exec! conn [sql:mark-file-media-object-deleted id used-media])
(into #{} (map :id)))]
(doseq [id unused]
(l/dbg :hint "clean" :rel "file-media-object" :file-id (str id) :total (count unused-media))
(doseq [id unused-media]
(l/trc :hint "mark deleted"
:rel "file-media-object"
:id (str id)
@@ -98,7 +86,7 @@
(thc/fmt-object-id file-id page-id id "frame")
(thc/fmt-object-id file-id page-id id "component")))))))
ids (db/create-array conn "text" using)
ids (db/create-array conn "uuid" using)
unused (->> (db/exec! conn [sql:mark-file-object-thumbnails-deleted file-id ids])
(into #{} (map :object-id)))]
@@ -134,13 +122,7 @@
file))
(def ^:private sql:get-files-for-library
"SELECT f.id,
f.data,
f.modified_at,
f.features,
f.version,
f.data_backend,
f.data_ref_id
"SELECT f.id
FROM file AS f
LEFT JOIN file_library_rel AS fl ON (fl.file_id = f.id)
WHERE fl.library_file_id = ?
@@ -161,15 +143,21 @@
deleted-components
(ctkl/deleted-components-seq data)
xform
file-xform
(mapcat (partial get-used-components deleted-components file-id))
library-xform
(comp
(map :id)
(map #(bfc/get-file cfg % :realize? true :read-only? true))
file-xform)
used-remote
(->> (db/plan conn [sql:get-files-for-library file-id] {:fetch-size 1})
(transduce (comp (map (partial bfc/decode-file cfg)) xform) conj #{}))
(transduce library-xform conj #{}))
used-local
(into #{} xform [file])
(into #{} file-xform [file])
unused
(transduce bfc/xf-map-id disj
@@ -229,34 +217,22 @@
(cfv/validate-file-schema! file)
file))
(def ^:private sql:get-file
"SELECT f.id,
f.data,
f.revn,
f.version,
f.features,
f.modified_at,
f.data_backend,
f.data_ref_id
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
AND f.id = ?
FOR UPDATE
SKIP LOCKED")
(defn get-file
[{:keys [::db/conn ::min-age]} file-id]
(let [min-age (if min-age
(db/interval min-age)
(db/interval 0))]
(->> (db/exec! conn [sql:get-file min-age file-id])
(first))))
[cfg {:keys [file-id revn]}]
(let [file (bfc/get-file cfg file-id
:realize? true
:skip-locked? true
:lock-for-update? true)]
;; We should ensure that the scheduled file and the procesing file
;; has not changed since schedule, for this reason we check the
;; revn from props with the revn from retrieved file from database
(when (= revn (:revn file))
file)))
(defn- process-file!
[cfg file-id]
(if-let [file (get-file cfg file-id)]
[cfg {:keys [file-id] :as props}]
(if-let [file (get-file cfg props)]
(let [file (->> file
(bfc/decode-file cfg)
(bfl/clean-file)
@@ -267,7 +243,7 @@
true)
(do
(l/dbg :hint "skip" :file-id (str file-id))
(l/dbg :hint "skip cleaning, criteria does not match" :file-id (str file-id))
false)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@@ -282,26 +258,20 @@
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(let [min-age (ct/duration (or (:min-age props)
(cf/get-deletion-delay)))
file-id (get props :file-id)
cfg (-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::min-age min-age))]
(try
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(let [cfg (update cfg ::sto/storage sto/configure conn)
processed? (process-file! cfg file-id)]
(when (and processed? (contains? cf/flags :tiered-file-data-storage))
(wrk/submit! (-> cfg
(assoc ::wrk/task :offload-file-data)
(assoc ::wrk/params props)
(assoc ::wrk/priority 10)
(assoc ::wrk/delay 1000))))
processed?)))
(catch Throwable cause
(l/err :hint "error on cleaning file"
:file-id (str (:file-id props))
:cause cause))))))
(try
(-> cfg
(assoc ::db/rollback (:rollback? props))
(db/tx-run! (fn [{:keys [::db/conn] :as cfg}]
(let [cfg (update cfg ::sto/storage sto/configure conn)
processed? (process-file! cfg props)]
(when (and processed? (contains? cf/flags :tiered-file-data-storage))
(wrk/submit! (-> cfg
(assoc ::wrk/task :offload-file-data)
(assoc ::wrk/params props)
(assoc ::wrk/priority 10)
(assoc ::wrk/delay 1000))))
processed?))))
(catch Throwable cause
(l/err :hint "error on cleaning file"
:file-id (str (:file-id props))
:cause cause)))))

View File

@@ -17,29 +17,29 @@
(def ^:private
sql:get-candidates
"SELECT f.id,
f.revn,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC
FOR UPDATE
FOR UPDATE OF f
SKIP LOCKED")
(defn- get-candidates
[{:keys [::db/conn ::min-age] :as cfg}]
(let [min-age (db/interval min-age)]
(db/cursor conn [sql:get-candidates min-age] {:chunk-size 10})))
(db/plan conn [sql:get-candidates min-age] {:fetch-size 10})))
(defn- schedule!
[{:keys [::min-age] :as cfg}]
(let [total (reduce (fn [total {:keys [id]}]
(let [params {:file-id id :min-age min-age}]
[cfg]
(let [total (reduce (fn [total {:keys [id modified-at revn]}]
(let [params {:file-id id :modified-at modified-at :revn revn}]
(wrk/submit! (assoc cfg ::wrk/params params))
(inc total)))
0
(get-candidates cfg))]
{:processed total}))
(defmethod ig/assert-key ::handler
@@ -48,7 +48,7 @@
(defmethod ig/expand-key ::handler
[k v]
{k (assoc v ::min-age (cf/get-deletion-delay))})
{k (assoc v ::min-age (cf/get-file-clean-delay))})
(defmethod ig/init-key ::handler
[_ cfg]

View File

@@ -11,6 +11,7 @@
[app.common.logging :as l]
[app.common.time :as ct]
[app.db :as db]
[app.features.fdata :as fdata]
[app.storage :as sto]
[integrant.core :as ig]))
@@ -123,17 +124,19 @@
0)))
(def ^:private sql:get-files
"SELECT id, deleted_at, project_id, data_backend, data_ref_id
FROM file
WHERE deleted_at IS NOT NULL
AND deleted_at < now() + ?::interval
ORDER BY deleted_at ASC
"SELECT f.id,
f.deleted_at,
f.project_id
FROM file AS f
WHERE f.deleted_at IS NOT NULL
AND f.deleted_at < now() + ?::interval
ORDER BY f.deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-files!
[{:keys [::db/conn ::sto/storage ::deletion-threshold ::chunk-size] :as cfg}]
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
(->> (db/plan conn [sql:get-files deletion-threshold chunk-size] {:fetch-size 5})
(reduce (fn [total {:keys [id deleted-at project-id] :as file}]
(l/trc :hint "permanently delete"
@@ -142,8 +145,8 @@
:project-id (str project-id)
:deleted-at (ct/format-inst deleted-at))
(when (= "objects-storage" (:data-backend file))
(sto/touch-object! storage (:data-ref-id file)))
;; Delete associated file data
(fdata/delete! cfg {:file-id id :id id :type "main"})
;; And finally, permanently delete the file.
(db/delete! conn :file {:id id})
@@ -209,32 +212,6 @@
(inc total))
0)))
(def ^:private sql:get-file-data-fragments
"SELECT file_id, id, deleted_at, data_ref_id
FROM file_data_fragment
WHERE deleted_at IS NOT NULL
AND deleted_at < now() + ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-file-data-fragments!
[{:keys [::db/conn ::sto/storage ::deletion-threshold ::chunk-size] :as cfg}]
(->> (db/plan conn [sql:get-file-data-fragments deletion-threshold chunk-size] {:fetch-size 5})
(reduce (fn [total {:keys [file-id id deleted-at data-ref-id]}]
(l/trc :hint "permanently delete"
:rel "file-data-fragment"
:id (str id)
:file-id (str file-id)
:deleted-at (ct/format-inst deleted-at))
(some->> data-ref-id (sto/touch-object! storage))
(db/delete! conn :file-data-fragment {:file-id file-id :id id})
(inc total))
0)))
(def ^:private sql:get-file-media-objects
"SELECT id, file_id, media_id, thumbnail_id, deleted_at
FROM file_media_object
@@ -264,8 +241,35 @@
(inc total))
0)))
(def ^:private sql:get-file-data-fragments
"SELECT file_id, id, deleted_at
FROM file_data_fragment
WHERE deleted_at IS NOT NULL
AND deleted_at < now() + ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-file-data-fragments!
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
(->> (db/plan conn [sql:get-file-data-fragments deletion-threshold chunk-size] {:fetch-size 5})
(reduce (fn [total {:keys [file-id id deleted-at]}]
(l/trc :hint "permanently delete"
:rel "file-data-fragment"
:id (str id)
:file-id (str file-id)
:deleted-at (ct/format-inst deleted-at))
;; Delete associated file data
(fdata/delete! cfg {:file-id file-id :id id :type "fragment"})
(db/delete! conn :file-data-fragment {:file-id file-id :id id})
(inc total))
0)))
(def ^:private sql:get-file-change
"SELECT id, file_id, deleted_at, data_backend, data_ref_id
"SELECT id, file_id, deleted_at
FROM file_change
WHERE deleted_at IS NOT NULL
AND deleted_at < now() + ?::interval
@@ -275,7 +279,7 @@
SKIP LOCKED")
(defn- delete-file-changes!
[{:keys [::db/conn ::deletion-threshold ::chunk-size ::sto/storage] :as cfg}]
[{:keys [::db/conn ::deletion-threshold ::chunk-size] :as cfg}]
(->> (db/plan conn [sql:get-file-change deletion-threshold chunk-size] {:fetch-size 5})
(reduce (fn [total {:keys [id file-id deleted-at] :as xlog}]
(l/trc :hint "permanently delete"
@@ -284,8 +288,8 @@
:file-id (str file-id)
:deleted-at (ct/format-inst deleted-at))
(when (= "objects-storage" (:data-backend xlog))
(sto/touch-object! storage (:data-ref-id xlog)))
;; Delete associated file data, if it exists
(fdata/delete! cfg {:file-id file-id :id id :type "snapshot"})
(db/delete! conn :file-change {:id id})
@@ -295,10 +299,10 @@
(def ^:private deletion-proc-vars
[#'delete-profiles!
#'delete-file-media-objects!
#'delete-file-data-fragments!
#'delete-file-object-thumbnails!
#'delete-file-thumbnails!
#'delete-file-changes!
#'delete-file-data-fragments!
#'delete-files!
#'delete-projects!
#'delete-fonts!

View File

@@ -8,101 +8,73 @@
"A maintenance task responsible of moving file data from hot
storage (the database row) to a cold storage (fs or s3)."
(:require
[app.common.exceptions :as ex]
[app.binfile.common :as bfc]
[app.common.logging :as l]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as fdata]
[app.features.file-snapshots :as fsnap]
[app.storage :as sto]
[app.util.blob :as blob]
[integrant.core :as ig]))
(defn- offload-file-data!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(let [file (db/get conn :file {:id file-id}
{::sql/for-update true})]
(when (nil? (:data file))
(ex/raise :hint "file already offloaded"
:type :internal
:code :file-already-offloaded
:file-id file-id))
(defn- offload-file-data
[{:keys [::db/conn ::file-id] :as cfg}]
(let [file (bfc/get-file cfg file-id :realize? true :lock-for-update? true)]
(cond
(not= "db" (:backend file))
(l/wrn :hint (str "skiping file offload (file offloaded or incompatible with offloading) for " file-id)
:file-id (str file-id))
(let [data (sto/content (:data file))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-data"
:content-type "application/octet-stream"
:file-id file-id})]
(nil? (:data file))
(l/err :hint (str "skiping file offload (missing data) for " file-id)
:file-id (str file-id))
(l/trc :hint "offload file data"
:file-id (str file-id)
:storage-id (str (:id sobj)))
:else
(do
(fdata/update! cfg {:id file-id
:file-id file-id
:type "main"
:backend "storage"
:data (blob/encode (:data file))})
(db/update! conn :file
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id file-id}
{::db/return-keys false}))))
(db/update! conn :file
{:data nil}
{:id file-id}
{::db/return-keys false})
(defn- offload-file-data-fragments!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(doseq [fragment (db/query conn :file-data-fragment
{:file-id file-id
:deleted-at nil
:data-backend nil}
{::db/for-update true})]
(let [data (sto/content (:data fragment))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-data-fragment"
:content-type "application/octet-stream"
:file-id file-id
:file-fragment-id (:id fragment)})]
(l/trc :hint "offload file data fragment"
:file-id (str file-id)
:file-fragment-id (str (:id fragment))
:storage-id (str (:id sobj)))
(db/update! conn :file-data-fragment
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id (:id fragment)}
{::db/return-keys false}))))
(l/trc :hint "offload file data"
:file-id (str file-id))))))
(def sql:get-snapshots
"SELECT fc.*
FROM file_change AS fc
WHERE fc.file_id = ?
AND fc.label IS NOT NULL
AND fc.data IS NOT NULL
AND fc.data_backend IS NULL")
(str "WITH snapshots AS (" fsnap/sql:snapshots ")"
"SELECT s.*
FROM snapshots AS s
WHERE s.backend = 'db'
AND s.file_id = ?
ORDER BY s.created_at"))
(defn- offload-file-snapshots!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(doseq [snapshot (db/exec! conn [sql:get-snapshots file-id])]
(let [data (sto/content (:data snapshot))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-change"
:content-type "application/octet-stream"
:file-id file-id
:file-change-id (:id snapshot)})]
(l/trc :hint "offload file change"
(defn- offload-snapshot-data
[{:keys [::db/conn ::file-id] :as cfg} snapshot]
(let [{:keys [id data] :as snapshot} (fdata/resolve-file-data cfg snapshot)]
(if (nil? (:data snapshot))
(l/err :hint (str "skiping snapshot offload (missing data) for " file-id)
:file-id (str file-id)
:file-change-id (str (:id snapshot))
:storage-id (str (:id sobj)))
:snapshot-id id)
(do
(fsnap/create! cfg {:id id
:file-id file-id
:type "snapshot"
:backend "storage"
:data data})
(db/update! conn :file-change
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id (:id snapshot)}
{::db/return-keys false}))))
(l/trc :hint "offload snapshot data"
:file-id (str file-id)
:snapshot-id (str id))
(db/update! conn :file-change
{:data nil}
{:id id :file-id file-id}
{::db/return-keys false})))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
@@ -116,10 +88,12 @@
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::file-id (:file-id props))
(db/tx-run! (fn [cfg]
(offload-file-data! cfg)
(offload-file-data-fragments! cfg)
(offload-file-snapshots! cfg))))))
(let [file-id (:file-id props)]
(-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::file-id (:file-id props))
(db/tx-run! (fn [{:keys [::db/conn] :as cfg}]
(offload-file-data cfg)
(run! (partial offload-snapshot-data cfg)
(db/plan conn [sql:get-snapshots file-id]))))))))

View File

@@ -99,13 +99,14 @@
(into frontend-only-features)
(into backend-only-features)))
(sm/register!
^{::sm/type ::features}
[:schema
{:title "FileFeatures"
::smdj/inline true
:gen/gen (smg/subseq supported-features)}
[::sm/set :string]])
(def schema:features
(sm/register!
^{::sm/type ::features}
[:schema
{:title "FileFeatures"
::smdj/inline true
:gen/gen (smg/subseq supported-features)}
[::sm/set :string]]))
(defn- flag->feature
"Translate a flag to a feature name"

View File

@@ -943,6 +943,8 @@
:gen/gen (sg/uri)
:decode/string decode-uri
:decode/json decode-uri
:encode/json str
:encode/string str
::oapi/type "string"
::oapi/format "uri"}})

View File

@@ -156,7 +156,7 @@
(defn make-file
[{:keys [id project-id name revn is-shared features migrations
ignore-sync-until created-at modified-at deleted-at]
metadata backend ignore-sync-until created-at modified-at deleted-at]
:as params}
& {:keys [create-page with-data page-id]
@@ -187,8 +187,9 @@
:data data
:features features
:migrations migrations
:metadata metadata
:backend backend
:ignore-sync-until ignore-sync-until
:has-media-trimmed false
:created-at created-at
:modified-at modified-at
:deleted-at deleted-at})]

View File

@@ -16,7 +16,7 @@
(letfn [(resolve-pointer [[key val :as kv]]
(if (t/pointer? val)
(->> (rp/cmd! :get-file-fragment {:file-id id :fragment-id @val})
(rx/map #(get % :data))
(rx/map #(get % :content))
(rx/map #(vector key %)))
(rx/of kv)))

View File

@@ -62,10 +62,10 @@
[data]
(->> (concat
(->> data
(filterv #(= "user" (:created-by %)))
(filter #(= "user" (:created-by %)))
(map #(assoc % :type :version)))
(->> data
(filterv #(= "system" (:created-by %)))
(filter #(= "system" (:created-by %)))
(group-by #(ct/format-inst (:created-at %) :iso-date))
(map (fn [[day entries]]
{:type :snapshot