diff --git a/backend/resources/app/templates/api-doc-entry.tmpl b/backend/resources/app/templates/api-doc-entry.tmpl index c61157ec24..31c48deebf 100644 --- a/backend/resources/app/templates/api-doc-entry.tmpl +++ b/backend/resources/app/templates/api-doc-entry.tmpl @@ -25,6 +25,12 @@ SCHEMA {% endif %} + + {% if item.sse %} + + SSE + + {% endif %}
The methods marked with SSE returns + a SSE + formatted stream on the response body, always with status 200. The events are + always encoded using `application/transit+json` encoding (for now no content + negotiation is possible on methods that return SSE streams).
+On the javascript side you can use + the eventsoure-parser + library for propertly parsing the response body using the + standard Fetch + API
The rate limit work per user basis (this means that different api keys share diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 3a41d818a0..d1c8282327 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -32,6 +32,7 @@ [app.common.types.shape-tree :as ctst] [app.common.uuid :as uuid] [app.db :as db] + [app.http.sse :as sse] [app.media :as media] [app.rpc.commands.files :as files] [app.rpc.commands.files-snapshot :as fsnap] @@ -362,6 +363,8 @@ shapes from library components. Mark the file with the :components-v2 option." [file-data libraries] + (sse/tap {:type :migration-progress + :section :components}) (let [components (ctkl/components-seq file-data)] (if (empty? components) (assoc-in file-data [:options :components-v2] true) @@ -435,6 +438,9 @@ add-instance-grid (fn [fdata frame-id grid assets] (reduce (fn [result [component position]] + (sse/tap {:type :migration-progress + :section :components + :name (:name component)}) (add-main-instance result component frame-id (gpt/add position (gpt/point grid-gap grid-gap)))) fdata @@ -701,6 +707,9 @@ (->> (d/zip media-group grid) (map (fn [[mobj position]] (l/trc :hint "submit graphic processing" :file-id (str (:id fdata)) :id (str (:id mobj))) + (sse/tap {:type :migration-progress + :section :graphics + :name (:name mobj)}) (px/submit! executor (partial process mobj position)))) (reduce (fn [fdata promise] (if-let [changes (deref promise)] @@ -713,6 +722,8 @@ (defn- migrate-graphics [fdata] + (sse/tap {:type :migration-progress + :section :graphics}) (if (empty? (:media fdata)) fdata (let [[fdata page-id start-pos] @@ -812,7 +823,6 @@ (defn migrate-file! [system file-id & {:keys [validate? throw-on-validate?]}] - (let [tpoint (dt/tpoint) file-id (if (string? file-id) (parse-uuid file-id) diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index 93c67845b9..0e8e065ad2 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -232,3 +232,7 @@ (if (ex/error? cause) (handle-error cause request nil) (handle-exception cause request nil))) + +(defn handle' + [cause request] + (::rres/body (handle cause request))) diff --git a/backend/src/app/http/sse.clj b/backend/src/app/http/sse.clj new file mode 100644 index 0000000000..c89c91b618 --- /dev/null +++ b/backend/src/app/http/sse.clj @@ -0,0 +1,86 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.http.sse + "SSE (server sent events) helpers" + (:refer-clojure :exclude [tap]) + (:require + [app.common.data :as d] + [app.common.exceptions :as ex] + [app.common.logging :as l] + [app.common.transit :as t] + [app.http.errors :as errors] + [promesa.core :as p] + [promesa.exec :as px] + [promesa.exec.csp :as sp] + [promesa.util :as pu] + [ring.response :as rres]) + (:import + java.io.OutputStream)) + +(def ^:dynamic *channel* nil) + +(defn- write! + [^OutputStream output ^bytes data] + (l/trc :hint "writting data" :data data :length (alength data)) + (.write output data) + (.flush output)) + +(defn- create-writer-loop + [^OutputStream output] + (try + (loop [] + (when-let [event (sp/take! *channel*)] + (let [result (ex/try! (write! output event))] + (if (ex/exception? result) + (l/wrn :hint "unexpected exception on sse writer" :cause result) + (recur))))) + (finally + (pu/close! output)))) + +(defn- encode + [[name data]] + (try + (let [data (with-out-str + (println "event:" (d/name name)) + (println "data:" (t/encode-str data {:type :json-verbose})) + (println))] + (.getBytes data "UTF-8")) + (catch Throwable cause + (l/err :hint "unexpected error on encoding value on sse stream" + :cause cause) + nil))) + +;; ---- PUBLIC API + +(def default-headers + {"Content-Type" "text/event-stream;charset=UTF-8" + "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" + "Pragma" "no-cache"}) + +(defn tap + ([data] (tap "event" data)) + ([name data] + (when-let [channel *channel*] + (sp/put! channel [name data]) + nil))) + +(defn response + [handler & {:keys [buf] :or {buf 32} :as opts}] + (fn [request] + {::rres/headers default-headers + ::rres/status 200 + ::rres/body (reify rres/StreamableResponseBody + (-write-body-to-stream [_ _ output] + (binding [*channel* (sp/chan :buf buf :xf (keep encode))] + (let [writer (px/run! :virtual (partial create-writer-loop output))] + (try + (tap "end" (handler)) + (catch Throwable cause + (tap "error" (errors/handle' cause request))) + (finally + (sp/close! *channel*) + (p/await! writer)))))))})) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index d4b3a4baaf..69f9d84fbc 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -57,14 +57,16 @@ (defn- handle-response [request result] - (if (fn? result) - (result request) - (let [mdata (meta result)] - (-> {::rres/status (::http/status mdata 200) - ::rres/headers (::http/headers mdata {}) - ::rres/body (rph/unwrap result)} - (handle-response-transformation request mdata) - (handle-before-comple-hook mdata))))) + (let [mdata (meta result) + response (if (fn? result) + (result request) + (let [result (rph/unwrap result)] + {::rres/status (::http/status mdata 200) + ::rres/headers (::http/headers mdata {}) + ::rres/body result}))] + (-> response + (handle-response-transformation request mdata) + (handle-before-comple-hook mdata)))) (defn- rpc-handler "Ring handler that dispatches cmd requests and convert between diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index ba00b26fa0..a347c893fd 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -15,6 +15,7 @@ [app.common.files.validate :as fval] [app.common.fressian :as fres] [app.common.logging :as l] + [app.common.schema :as sm] [app.common.spec :as us] [app.common.types.file :as ctf] [app.common.uuid :as uuid] @@ -22,6 +23,7 @@ [app.db :as db] [app.features.components-v2 :as features.components-v2] [app.features.fdata :as features.fdata] + [app.http.sse :as sse] [app.loggers.audit :as-alias audit] [app.loggers.webhooks :as-alias webhooks] [app.media :as media] @@ -30,7 +32,6 @@ [app.rpc.commands.projects :as projects] [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] - [app.rpc.helpers :as rph] [app.storage :as sto] [app.storage.tmp :as tmp] [app.tasks.file-gc] @@ -38,11 +39,13 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.set :as set] [clojure.spec.alpha :as s] [clojure.walk :as walk] [cuerdas.core :as str] [datoteka.io :as io] + [promesa.core :as p] [promesa.util :as pu] [ring.response :as rres] [yetti.adapter :as yt]) @@ -642,6 +645,9 @@ validate? (contains? cf/flags :file-validation) features (cfeat/get-team-enabled-features cf/flags team)] + (sse/tap {:type :import-progress + :section :read-import}) + ;; Process all sections (run! (fn [section] (l/dbg :hint "reading section" :section section ::l/sync? true) @@ -651,6 +657,8 @@ (assoc ::section section) (assoc ::input input))] (binding [*options* options] + (sse/tap {:type :import-progress + :section section}) (read-section options)))) [:v1/metadata :v1/files :v1/rels :v1/sobjects]) @@ -1056,54 +1064,71 @@ ;; --- Command: export-binfile -(s/def ::file-id ::us/uuid) -(s/def ::include-libraries? ::us/boolean) -(s/def ::embed-assets? ::us/boolean) - -(s/def ::export-binfile - (s/keys :req [::rpc/profile-id] - :req-un [::file-id ::include-libraries? ::embed-assets?])) +(def ^:private + schema:export-binfile + (sm/define + [:map {:title "export-binfile"} + [:file-id ::sm/uuid] + [:include-libraries? :boolean] + [:embed-assets? :boolean]])) (sv/defmethod ::export-binfile "Export a penpot file in a binary format." {::doc/added "1.15" - ::webhooks/event? true} + ::webhooks/event? true + ::sm/result schema:export-binfile} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}] (files/check-read-permissions! pool profile-id file-id) - (let [body (reify rres/StreamableResponseBody - (-write-body-to-stream [_ _ output-stream] - (-> cfg - (assoc ::file-ids [file-id]) - (assoc ::embed-assets? embed-assets?) - (assoc ::include-libraries? include-libraries?) - (export! output-stream))))] + (fn [_] + {::rres/status 200 + ::rres/headers {"content-type" "application/octet-stream"} + ::rres/body (reify rres/StreamableResponseBody + (-write-body-to-stream [_ _ output-stream] + (-> cfg + (assoc ::file-ids [file-id]) + (assoc ::embed-assets? embed-assets?) + (assoc ::include-libraries? include-libraries?) + (export! output-stream))))})) - (fn [_] - {::rres/status 200 - ::rres/body body - ::rres/headers {"content-type" "application/octet-stream"}}))) -(s/def ::file ::media/upload) -(s/def ::import-binfile - (s/keys :req [::rpc/profile-id] - :req-un [::project-id ::file])) +;; --- Command: import-binfile + +(def ^:private + schema:import-binfile + (sm/define + [:map {:title "import-binfile"} + [:project-id ::sm/uuid] + [:file ::media/upload]])) + +(declare ^:private import-binfile) (sv/defmethod ::import-binfile "Import a penpot file in a binary format." {::doc/added "1.15" - ::webhooks/event? true} + ::webhooks/event? true + ::sse/stream? true + ::sm/params schema:import-binfile} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id project-id file] :as params}] - (db/with-atomic [conn pool] - (projects/check-read-permissions! conn profile-id project-id) - (let [ids (import! (assoc cfg - ::input (:path file) - ::project-id project-id - ::profile-id profile-id - ::ignore-index-errors? true))] + (projects/check-read-permissions! pool profile-id project-id) + (let [params (-> cfg + (assoc ::input (:path file)) + (assoc ::project-id project-id) + (assoc ::profile-id profile-id) + (assoc ::ignore-index-errors? true))] + (with-meta + (sse/response #(import-binfile params)) + {::audit/props {:file nil}}))) - (db/update! conn :project - {:modified-at (dt/now)} - {:id project-id}) - - (rph/with-meta ids - {::audit/props {:file nil :file-ids ids}})))) +(defn- import-binfile + [{:keys [::wrk/executor ::project-id] :as params}] + (db/tx-run! params + (fn [{:keys [::db/conn] :as params}] + ;; NOTE: the importation process performs some operations that + ;; are not very friendly with virtual threads, and for avoid + ;; unexpected blocking of other concurrent operations we + ;; dispatch that operation to a dedicated executor. + (let [result (p/thread-call executor (partial import! params))] + (db/update! conn :project + {:modified-at (dt/now)} + {:id project-id}) + (deref result))))) diff --git a/backend/src/app/rpc/commands/management.clj b/backend/src/app/rpc/commands/management.clj index 1c0090e69d..00195d17ed 100644 --- a/backend/src/app/rpc/commands/management.clj +++ b/backend/src/app/rpc/commands/management.clj @@ -14,6 +14,7 @@ [app.common.schema :as sm] [app.common.uuid :as uuid] [app.db :as db] + [app.http.sse :as sse] [app.loggers.webhooks :as-alias webhooks] [app.rpc :as-alias rpc] [app.rpc.commands.binfile :as binfile] @@ -27,7 +28,9 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.walk :as walk] + [promesa.core :as p] [promesa.exec :as px])) ;; --- COMMAND: Duplicate File @@ -405,29 +408,6 @@ ;; --- COMMAND: Clone Template -(defn- clone-template! - [{:keys [::db/conn] :as cfg} {:keys [profile-id template-id project-id]}] - (let [template (tmpl/get-template-stream cfg template-id) - project (db/get-by-id conn :project project-id {:columns [:id :team-id]})] - - (when-not template - (ex/raise :type :not-found - :code :template-not-found - :hint "template not found")) - - (teams/check-edition-permissions! conn profile-id (:team-id project)) - - (-> cfg - ;; FIXME: maybe reuse the conn instead of creating more - ;; connections in the import process? - (dissoc ::db/conn) - (assoc ::binfile/input template) - (assoc ::binfile/project-id (:id project)) - (assoc ::binfile/profile-id profile-id) - (assoc ::binfile/ignore-index-errors? true) - (assoc ::binfile/migrate? true) - (binfile/import!)))) - (def ^:private schema:clone-template (sm/define @@ -435,15 +415,46 @@ [:project-id ::sm/uuid] [:template-id ::sm/word-string]])) +(declare ^:private clone-template) + (sv/defmethod ::clone-template "Clone into the specified project the template by its id." {::doc/added "1.16" + ::sse/stream? true ::webhooks/event? true ::sm/params schema:clone-template} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}] - (db/with-atomic [conn pool] - (-> (assoc cfg ::db/conn conn) - (clone-template! (assoc params :profile-id profile-id))))) + [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id project-id template-id] :as params}] + (let [project (db/get-by-id pool :project project-id {:columns [:id :team-id]}) + _ (teams/check-edition-permissions! pool profile-id (:team-id project)) + template (tmpl/get-template-stream cfg template-id) + params (-> cfg + (assoc ::binfile/input template) + (assoc ::binfile/project-id (:id project)) + (assoc ::binfile/profile-id profile-id) + (assoc ::binfile/ignore-index-errors? true) + (assoc ::binfile/migrate? true))] + + (when-not template + (ex/raise :type :not-found + :code :template-not-found + :hint "template not found")) + + (sse/response #(clone-template params)))) + +(defn- clone-template + [{:keys [::wrk/executor ::binfile/project-id] :as params}] + (db/tx-run! params + (fn [{:keys [::db/conn] :as params}] + ;; NOTE: the importation process performs some operations that + ;; are not very friendly with virtual threads, and for avoid + ;; unexpected blocking of other concurrent operations we + ;; dispatch that operation to a dedicated executor. + (let [result (p/thread-call executor (partial binfile/import! params))] + (db/update! conn :project + {:modified-at (dt/now)} + {:id project-id}) + + (deref result))))) ;; --- COMMAND: Get list of builtin templates diff --git a/backend/src/app/rpc/doc.clj b/backend/src/app/rpc/doc.clj index bcfcd96c80..185f3fc4c2 100644 --- a/backend/src/app/rpc/doc.clj +++ b/backend/src/app/rpc/doc.clj @@ -16,6 +16,7 @@ [app.common.schema.openapi :as oapi] [app.common.schema.registry :as sr] [app.config :as cf] + [app.http.sse :as-alias sse] [app.loggers.webhooks :as-alias webhooks] [app.rpc :as-alias rpc] [app.util.json :as json] @@ -55,6 +56,7 @@ :module (or (some-> (::module mdata) d/name) (-> (:ns mdata) (str/split ".") last)) :auth (::rpc/auth mdata true) + :sse (::sse/stream? mdata false) :webhook (::webhooks/event? mdata false) :docs (::sv/docstring mdata) :deprecated (::deprecated mdata) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 6b8ef788a4..3fad161142 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -15,12 +15,13 @@ [app.common.pprint :as pp] [app.common.schema :as sm] [app.common.spec :as us] + [app.common.transit :as tr] [app.common.uuid :as uuid] [app.config :as cf] [app.db :as db] [app.main :as main] - [app.media] [app.media :as-alias mtx] + [app.media] [app.migrations] [app.msgbus :as-alias mbus] [app.rpc :as-alias rpc] @@ -43,8 +44,12 @@ [integrant.core :as ig] [mockery.core :as mk] [promesa.core :as p] + [promesa.exec :as px] + [ring.response :as rres] [yetti.request :as yrq]) (:import + java.io.PipedInputStream + java.io.PipedOutputStream java.util.UUID org.postgresql.ds.PGSimpleDataSource)) @@ -553,3 +558,28 @@ (assoc :return-list []) (assoc :call-args nil) (assoc :call-args-list []))))) + +(defn- slurp' + [input & opts] + (let [sw (java.io.StringWriter.)] + (with-open [^java.io.Reader r (java.io.InputStreamReader. input "UTF-8")] + (io/copy r sw) + (.toString sw)))) + +(defn consume-sse + [callback] + (let [{:keys [::rres/status ::rres/body ::rres/headers] :as response} (callback {}) + output (PipedOutputStream.) + input (PipedInputStream. output)] + + (try + (px/exec! :virtual #(rres/-write-body-to-stream body nil output)) + (into [] + (map (fn [event] + (let [[item1 item2] (re-seq #"(.*): (.*)\n?" event)] + [(keyword (nth item1 2)) + (tr/decode-str (nth item2 2))]))) + (-> (slurp' input) + (str/split "\n\n"))) + (finally + (.close input))))) diff --git a/backend/test/backend_tests/rpc_management_test.clj b/backend/test/backend_tests/rpc_management_test.clj index e56109d334..2b5f182e88 100644 --- a/backend/test/backend_tests/rpc_management_test.clj +++ b/backend/test/backend_tests/rpc_management_test.clj @@ -6,6 +6,7 @@ (ns backend-tests.rpc-management-test (:require + [app.common.pprint :as pp] [app.common.uuid :as uuid] [app.db :as db] [app.http :as http] @@ -604,9 +605,11 @@ (t/is (nil? (:error out))) (let [result (:result out)] - (t/is (set? result)) - (t/is (uuid? (first result))) - (t/is (= 1 (count result)))))) + (t/is (fn? result)) + + (let [events (th/consume-sse result)] + (t/is (= 8 (count events))) + (t/is (= :end (first (last events)))))))) (t/deftest get-list-of-buitin-templates (let [prof (th/create-profile* 1 {:is-active true}) diff --git a/common/deps.edn b/common/deps.edn index 4bb93a5a8e..4dcd1987db 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -32,7 +32,7 @@ funcool/tubax {:mvn/version "2021.05.20-0"} funcool/cuerdas {:mvn/version "2022.06.16-403"} - funcool/promesa {:git/sha "658c429c56c11c33da7594fa2ef53f4e6afedac4" + funcool/promesa {:git/sha "484b7f5c0d08d817746caa685ed9ac5583eb37fa" :git/url "https://github.com/funcool/promesa"} funcool/datoteka {:mvn/version "3.0.66" diff --git a/docker/devenv/files/nginx.conf b/docker/devenv/files/nginx.conf index 175f7f2cf6..ab2c5ea6e9 100644 --- a/docker/devenv/files/nginx.conf +++ b/docker/devenv/files/nginx.conf @@ -105,6 +105,8 @@ http { location /api { proxy_pass http://127.0.0.1:6060/api; + proxy_buffering off; + proxy_http_version 1.1; } location /admin { diff --git a/frontend/package.json b/frontend/package.json index d2ac50f09a..e33d2a6924 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -78,6 +78,7 @@ "dependencies": { "date-fns": "^2.30.0", "draft-js": "^0.11.7", + "eventsource-parser": "^1.1.1", "highlight.js": "^11.8.0", "js-beautify": "^1.14.9", "jszip": "^3.10.1", diff --git a/frontend/src/app/main/data/dashboard.cljs b/frontend/src/app/main/data/dashboard.cljs index fbe417428a..9e723ca20b 100644 --- a/frontend/src/app/main/data/dashboard.cljs +++ b/frontend/src/app/main/data/dashboard.cljs @@ -10,6 +10,7 @@ [app.common.data.macros :as dm] [app.common.features :as cfeat] [app.common.files.helpers :as cfh] + [app.common.logging :as log] [app.common.schema :as sm] [app.common.uri :as u] [app.common.uuid :as uuid] @@ -25,6 +26,7 @@ [app.util.dom :as dom] [app.util.i18n :as i18n :refer [tr]] [app.util.router :as rt] + [app.util.sse :as sse] [app.util.time :as dt] [app.util.timers :as tm] [app.util.webapi :as wapi] @@ -32,6 +34,8 @@ [clojure.set :as set] [potok.core :as ptk])) +(log/set-level! :warn) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Initialization ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -949,7 +953,17 @@ (let [{:keys [on-success on-error] :or {on-success identity on-error rx/throw}} (meta params)] - (->> (rp/cmd! :clone-template {:project-id project-id :template-id template-id}) + (->> (rp/cmd! ::sse/clone-template {:project-id project-id + :template-id template-id}) + (rx/tap (fn [event] + (let [payload (sse/get-payload event) + type (sse/get-type event)] + (if (= type "event") + (log/dbg :hint "clone-template: progress" :section (:section payload) :name (:name payload)) + (log/dbg :hint "clone-template: end"))))) + + (rx/filter sse/end-of-stream?) + (rx/map sse/get-payload) (rx/tap on-success) (rx/catch on-error)))))) diff --git a/frontend/src/app/main/repo.cljs b/frontend/src/app/main/repo.cljs index e8b5f8f018..e99f9a4612 100644 --- a/frontend/src/app/main/repo.cljs +++ b/frontend/src/app/main/repo.cljs @@ -7,9 +7,11 @@ (ns app.main.repo (:require [app.common.data :as d] + [app.common.transit :as t] [app.common.uri :as u] [app.config :as cf] [app.util.http :as http] + [app.util.sse :as sse] [beicon.core :as rx] [cuerdas.core :as str])) @@ -56,8 +58,14 @@ {:query-params [:file-id :revn] :form-data? true} + ::sse/clone-template + {:response-type ::sse/stream} + + ::sse/import-binfile + {:response-type ::sse/stream + :form-data? true} + :export-binfile {:response-type :blob} - :import-binfile {:form-data? true} :retrieve-list-of-builtin-templates {:query-params :all} }) @@ -85,9 +93,9 @@ :else :post) request {:method method - :uri (u/join cf/public-uri "api/rpc/command/" (name id)) + :uri (u/join cf/public-uri "api/rpc/command/" nid) :credentials "include" - :headers {"accept" "application/transit+json"} + :headers {"accept" "application/transit+json,text/event-stream,*/*"} :body (when (= method :post) (if form-data? (http/form-data params) @@ -97,11 +105,21 @@ (if query-params (select-keys params query-params) nil)) - :response-type (or response-type :text)}] - (->> (http/send! request) - (rx/map decode-fn) - (rx/mapcat handle-response)))) + :response-type + (if (= response-type ::sse/stream) + :stream + (or response-type :text))} + + result (->> (http/send! request) + (rx/map decode-fn) + (rx/mapcat handle-response))] + + (cond->> result + (= ::sse/stream response-type) + (rx/mapcat (fn [body] + (-> (sse/create-stream body) + (sse/read-stream t/decode-str))))))) (defmulti cmd! (fn [id _] id)) diff --git a/frontend/src/app/main/ui/dashboard/projects.cljs b/frontend/src/app/main/ui/dashboard/projects.cljs index b017c20204..545d9c0aa9 100644 --- a/frontend/src/app/main/ui/dashboard/projects.cljs +++ b/frontend/src/app/main/ui/dashboard/projects.cljs @@ -147,8 +147,10 @@ (mf/use-fn (mf/deps template default-project-id) (fn [] - (let [mdata {:on-success on-template-cloned-success :on-error on-template-cloned-error} - params {:project-id default-project-id :template-id (:id template)}] + (let [mdata {:on-success on-template-cloned-success + :on-error on-template-cloned-error} + params {:project-id default-project-id + :template-id (:id template)}] (swap! state #(assoc % :status :importing)) (st/emit! (with-meta (dd/clone-template (with-meta params mdata)) {::ev/origin "get-started-hero-block"})))))] diff --git a/frontend/src/app/util/http.cljs b/frontend/src/app/util/http.cljs index 60fc675f6e..35c8ae485d 100644 --- a/frontend/src/app/util/http.cljs +++ b/frontend/src/app/util/http.cljs @@ -105,17 +105,22 @@ (defn send! [{:keys [response-type] :or {response-type :text} :as params}] - (letfn [(on-response [response] - (let [body (case response-type - :json (.json ^js response) - :text (.text ^js response) - :blob (.blob ^js response))] - (->> (rx/from body) - (rx/map (fn [body] - {::response response - :status (.-status ^js response) - :headers (parse-headers (.-headers ^js response)) - :body body})))))] + (letfn [(on-response [^js response] + (if (= :stream response-type) + (rx/of {:status (.-status response) + :headers (parse-headers (.-headers response)) + :body (.-body response) + ::response response}) + (let [body (case response-type + :json (.json ^js response) + :text (.text ^js response) + :blob (.blob ^js response))] + (->> (rx/from body) + (rx/map (fn [body] + {::response response + :status (.-status ^js response) + :headers (parse-headers (.-headers ^js response)) + :body body}))))))] (->> (fetch params) (rx/mapcat on-response)))) diff --git a/frontend/src/app/util/sse.cljs b/frontend/src/app/util/sse.cljs new file mode 100644 index 0000000000..222e1bda7f --- /dev/null +++ b/frontend/src/app/util/sse.cljs @@ -0,0 +1,54 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.util.sse + (:require + ["eventsource-parser/stream" :as sse] + [beicon.core :as rx])) + +(defn create-stream + [^js/ReadableStream stream] + (.. stream + (pipeThrough (js/TextDecoderStream.)) + (pipeThrough (sse/EventSourceParserStream.)))) + +(defn read-stream + [^js/ReadableStream stream decode-fn] + (letfn [(read-items [^js reader] + (->> (rx/from (.read reader)) + (rx/mapcat (fn [result] + (if (.-done result) + (rx/empty) + (rx/concat + (rx/of (.-value result)) + (read-items reader)))))))] + (->> (read-items (.getReader stream)) + (rx/mapcat (fn [^js event] + (let [type (.-event event) + data (.-data event) + data (decode-fn data)] + (if (= "error" type) + (rx/throw (ex-info "stream exception" data)) + (rx/of #js {:type type :data data})))))))) + +(defn get-type + [event] + (unchecked-get event "type")) + +(defn get-payload + [event] + (unchecked-get event "data")) + +(defn end-of-stream? + [event] + (= "end" (get-type event))) + +(defn event? + [event] + (= "event" (get-type event))) + + + diff --git a/frontend/src/app/worker/import.cljs b/frontend/src/app/worker/import.cljs index 9895aaf47d..bccb975ea0 100644 --- a/frontend/src/app/worker/import.cljs +++ b/frontend/src/app/worker/import.cljs @@ -22,6 +22,7 @@ [app.util.i18n :as i18n :refer [tr]] [app.util.import.parser :as cip] [app.util.json :as json] + [app.util.sse :as sse] [app.util.webapi :as wapi] [app.util.zip :as uz] [app.worker.impl :as impl] @@ -329,7 +330,7 @@ (map #(assoc % :type :fill))) stroke-images-data (->> (cip/get-stroke-images-data node) (map #(assoc % :type :stroke))) - + images-data (concat fill-images-data stroke-images-data @@ -709,15 +710,22 @@ :response-type :blob :method :get}) (rx/map :body) - (rx/mapcat #(rp/cmd! :import-binfile {:file % :project-id project-id})) - (rx/map (fn [_] - {:status :import-finish - :file-id (:file-id data)})) + (rx/mapcat (fn [file] + (->> (rp/cmd! ::sse/import-binfile {:file file :project-id project-id}) + (rx/tap (fn [event] + (let [payload (sse/get-payload event) + type (sse/get-type event)] + (if (= type "event") + (log/dbg :hint "import-binfile: progress" :section (:section payload) :name (:name payload)) + (log/dbg :hint "import-binfile: end"))))) + (rx/filter sse/end-of-stream?) + (rx/map (fn [_] + {:status :import-finish + :file-id (:file-id data)}))))) (rx/catch (fn [cause] (log/error :hint "unexpected error on import process" :project-id project-id ::log/sync? true) - ;; TODO: consider do thi son logging directly ? (when (map? cause) (println "Error data:") diff --git a/frontend/yarn.lock b/frontend/yarn.lock index fc9af1bd1a..0cd007f2ff 100644 --- a/frontend/yarn.lock +++ b/frontend/yarn.lock @@ -7028,6 +7028,13 @@ __metadata: languageName: node linkType: hard +"eventsource-parser@npm:^1.1.1": + version: 1.1.1 + resolution: "eventsource-parser@npm:1.1.1" + checksum: 6eae5e8300dc5d4dcd29d09d037a43b9954077fbcc936904065d1abf5ec503f1fa56a942ad4f796b7dcc3bf64224440917ea635ef4a75f5522e9951df4b9aadf + languageName: node + linkType: hard + "evp_bytestokey@npm:^1.0.0, evp_bytestokey@npm:^1.0.3": version: 1.0.3 resolution: "evp_bytestokey@npm:1.0.3" @@ -7578,6 +7585,7 @@ __metadata: concurrently: "npm:^8.2.2" date-fns: "npm:^2.30.0" draft-js: "npm:^0.11.7" + eventsource-parser: "npm:^1.1.1" gettext-parser: "npm:^7.0.1" gulp: "npm:4.0.2" gulp-cached: "npm:^1.1.1"