From 5c561dfdf1f2c895cd4d526ede6c11d49d4e2e00 Mon Sep 17 00:00:00 2001
From: Pascal Bleser
Date: Mon, 4 Aug 2025 17:49:18 +0200
Subject: [PATCH] groupware: blob streaming (upload and download)
---
pkg/jmap/jmap.go | 35 +++++++-
pkg/jmap/jmap_api.go | 5 ++
pkg/jmap/jmap_http.go | 87 +++++++++++++++++++
pkg/jmap/jmap_model.go | 9 ++
pkg/jmap/jmap_test.go | 41 ++++++++-
.../pkg/groupware/groupware_api_blob.go | 77 ++++++++++++++++
.../pkg/groupware/groupware_error.go | 7 ++
.../pkg/groupware/groupware_framework.go | 49 ++++++++++-
.../pkg/groupware/groupware_route.go | 21 +++--
9 files changed, 322 insertions(+), 9 deletions(-)
diff --git a/pkg/jmap/jmap.go b/pkg/jmap/jmap.go
index 29ffde521..bbec7c962 100644
--- a/pkg/jmap/jmap.go
+++ b/pkg/jmap/jmap.go
@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/url"
+ "strings"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/rs/zerolog"
@@ -18,6 +19,7 @@ type SessionEventListener interface {
type Client struct {
wellKnown SessionClient
api ApiClient
+ blob BlobClient
sessionEventListeners *eventListeners[SessionEventListener]
io.Closer
}
@@ -26,10 +28,11 @@ func (j *Client) Close() error {
return j.api.Close()
}
-func NewClient(wellKnown SessionClient, api ApiClient) Client {
+func NewClient(wellKnown SessionClient, api ApiClient, blob BlobClient) Client {
return Client{
wellKnown: wellKnown,
api: api,
+ blob: blob,
sessionEventListeners: newEventListeners[SessionEventListener](),
}
}
@@ -63,6 +66,9 @@ type Session struct {
// The upload URL template
UploadUrlTemplate string
+ // The upload URL template
+ DownloadUrlTemplate string
+
// TODO
DefaultMailAccountId string
@@ -91,12 +97,17 @@ func newSession(sessionResponse SessionResponse) (Session, Error) {
if uploadUrl == "" {
return Session{}, SimpleError{code: JmapErrorInvalidSessionResponse, err: fmt.Errorf("JMAP session response does not provide an upload URL")}
}
+ downloadUrl := sessionResponse.DownloadUrl
+ if downloadUrl == "" {
+ return Session{}, SimpleError{code: JmapErrorInvalidSessionResponse, err: fmt.Errorf("JMAP session response does not provide an download URL")}
+ }
return Session{
Username: username,
DefaultMailAccountId: mailAccountId,
JmapUrl: *apiUrl,
UploadUrlTemplate: uploadUrl,
+ DownloadUrlTemplate: downloadUrl,
SessionResponse: sessionResponse,
}, nil
}
@@ -126,6 +137,9 @@ const (
logOffset = "offset"
logLimit = "limit"
logApiUrl = "apiurl"
+ logDownloadUrl = "downloadurl"
+ logBlobId = "blobId"
+ logUploadUrl = "downloadurl"
logSessionState = "session-state"
logSince = "since"
@@ -540,6 +554,25 @@ type UploadedBlob struct {
Sha512 string `json:"sha:512"`
}
+func (j *Client) UploadBlobStream(accountId string, session *Session, ctx context.Context, logger *log.Logger, contentType string, body io.Reader) (UploadedBlob, Error) {
+ aid := session.BlobAccountId(accountId)
+ // TODO(pbleser-oc) use a library for proper URL template parsing
+ uploadUrl := strings.ReplaceAll(session.UploadUrlTemplate, "{accountId}", aid)
+ return j.blob.UploadBinary(ctx, logger, session, uploadUrl, contentType, body)
+}
+
+func (j *Client) DownloadBlobStream(accountId string, blobId string, name string, typ string, session *Session, ctx context.Context, logger *log.Logger) (*BlobDownload, Error) {
+ aid := session.BlobAccountId(accountId)
+ // TODO(pbleser-oc) use a library for proper URL template parsing
+ downloadUrl := session.DownloadUrlTemplate
+ downloadUrl = strings.ReplaceAll(downloadUrl, "{accountId}", aid)
+ downloadUrl = strings.ReplaceAll(downloadUrl, "{blobId}", blobId)
+ downloadUrl = strings.ReplaceAll(downloadUrl, "{name}", name)
+ downloadUrl = strings.ReplaceAll(downloadUrl, "{type}", typ)
+ logger = &log.Logger{Logger: logger.With().Str(logDownloadUrl, downloadUrl).Str(logBlobId, blobId).Str(logAccountId, aid).Logger()}
+ return j.blob.DownloadBinary(ctx, logger, session, downloadUrl)
+}
+
func (j *Client) UploadBlob(accountId string, session *Session, ctx context.Context, logger *log.Logger, data []byte, contentType string) (UploadedBlob, Error) {
aid := session.MailAccountId(accountId)
diff --git a/pkg/jmap/jmap_api.go b/pkg/jmap/jmap_api.go
index 2a48b1024..e9d93618d 100644
--- a/pkg/jmap/jmap_api.go
+++ b/pkg/jmap/jmap_api.go
@@ -15,3 +15,8 @@ type ApiClient interface {
type SessionClient interface {
GetSession(username string, logger *log.Logger) (SessionResponse, Error)
}
+
+type BlobClient interface {
+ UploadBinary(ctx context.Context, logger *log.Logger, session *Session, uploadUrl string, contentType string, content io.Reader) (UploadedBlob, Error)
+ DownloadBinary(ctx context.Context, logger *log.Logger, session *Session, downloadUrl string) (*BlobDownload, Error)
+}
diff --git a/pkg/jmap/jmap_http.go b/pkg/jmap/jmap_http.go
index ba2fb23ee..13851d2f3 100644
--- a/pkg/jmap/jmap_http.go
+++ b/pkg/jmap/jmap_http.go
@@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
+ "strconv"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/version"
@@ -24,6 +25,7 @@ type HttpJmapApiClient struct {
var (
_ ApiClient = &HttpJmapApiClient{}
_ SessionClient = &HttpJmapApiClient{}
+ _ BlobClient = &HttpJmapApiClient{}
)
/*
@@ -153,3 +155,88 @@ func (h *HttpJmapApiClient) Command(ctx context.Context, logger *log.Logger, ses
return body, nil
}
+
+func (h *HttpJmapApiClient) UploadBinary(ctx context.Context, logger *log.Logger, session *Session, uploadUrl string, contentType string, body io.Reader) (UploadedBlob, Error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadUrl, body)
+ if err != nil {
+ logger.Error().Err(err).Msgf("failed to create POST request for %v", uploadUrl)
+ return UploadedBlob{}, SimpleError{code: JmapErrorCreatingRequest, err: err}
+ }
+ req.Header.Add("Content-Type", contentType)
+ req.Header.Add("User-Agent", h.userAgent)
+ h.auth(session.Username, logger, req)
+
+ res, err := h.client.Do(req)
+ if err != nil {
+ logger.Error().Err(err).Msgf("failed to perform POST %v", uploadUrl)
+ return UploadedBlob{}, SimpleError{code: JmapErrorSendingRequest, err: err}
+ }
+ if res.StatusCode < 200 || res.StatusCode > 299 {
+ logger.Error().Str("status", res.Status).Msg("HTTP response status code is not 2xx")
+ return UploadedBlob{}, SimpleError{code: JmapErrorServerResponse, err: err}
+ }
+ if res.Body != nil {
+ defer func(Body io.ReadCloser) {
+ err := Body.Close()
+ if err != nil {
+ logger.Error().Err(err).Msg("failed to close response body")
+ }
+ }(res.Body)
+ }
+
+ responseBody, err := io.ReadAll(res.Body)
+ if err != nil {
+ logger.Error().Err(err).Msg("failed to read response body")
+ return UploadedBlob{}, SimpleError{code: JmapErrorServerResponse, err: err}
+ }
+
+ var result UploadedBlob
+ err = json.Unmarshal(responseBody, &result)
+ if err != nil {
+ logger.Error().Str("url", uploadUrl).Err(err).Msg("failed to decode JSON payload from the upload response")
+ return UploadedBlob{}, SimpleError{code: JmapErrorDecodingResponseBody, err: err}
+ }
+
+ return result, nil
+}
+
+func (h *HttpJmapApiClient) DownloadBinary(ctx context.Context, logger *log.Logger, session *Session, downloadUrl string) (*BlobDownload, Error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadUrl, nil)
+ if err != nil {
+ logger.Error().Err(err).Msgf("failed to create GET request for %v", downloadUrl)
+ return nil, SimpleError{code: JmapErrorCreatingRequest, err: err}
+ }
+ req.Header.Add("User-Agent", h.userAgent)
+ h.auth(session.Username, logger, req)
+
+ res, err := h.client.Do(req)
+ if err != nil {
+ logger.Error().Err(err).Msgf("failed to perform GET %v", downloadUrl)
+ return nil, SimpleError{code: JmapErrorSendingRequest, err: err}
+ }
+ if res.StatusCode == http.StatusNotFound {
+ return nil, nil
+ }
+ if res.StatusCode < 200 || res.StatusCode > 299 {
+ logger.Error().Str("status", res.Status).Msg("HTTP response status code is not 2xx")
+ return nil, SimpleError{code: JmapErrorServerResponse, err: err}
+ }
+
+ sizeStr := res.Header.Get("Content-Length")
+ size := -1
+ if sizeStr != "" {
+ size, err = strconv.Atoi(sizeStr)
+ if err != nil {
+ logger.Warn().Err(err).Msgf("failed to parse Content-Length blob download response header value '%v'", sizeStr)
+ size = -1
+ }
+ }
+
+ return &BlobDownload{
+ Body: res.Body,
+ Size: size,
+ Type: res.Header.Get("Content-Type"),
+ ContentDisposition: res.Header.Get("Content-Disposition"),
+ CacheControl: res.Header.Get("Cache-Control"),
+ }, nil
+}
diff --git a/pkg/jmap/jmap_model.go b/pkg/jmap/jmap_model.go
index 851be0f62..3f90b3e3d 100644
--- a/pkg/jmap/jmap_model.go
+++ b/pkg/jmap/jmap_model.go
@@ -1,6 +1,7 @@
package jmap
import (
+ "io"
"time"
)
@@ -1297,6 +1298,14 @@ type BlobGetResponse struct {
NotFound []any `json:"notFound,omitempty"`
}
+type BlobDownload struct {
+ Body io.ReadCloser
+ Size int
+ Type string
+ ContentDisposition string
+ CacheControl string
+}
+
const (
BlobGet Command = "Blob/get"
BlobUpload Command = "Blob/upload"
diff --git a/pkg/jmap/jmap_test.go b/pkg/jmap/jmap_test.go
index 147a17a16..a0c2b7f29 100644
--- a/pkg/jmap/jmap_test.go
+++ b/pkg/jmap/jmap_test.go
@@ -2,15 +2,20 @@ package jmap
import (
"context"
+ "crypto/sha512"
+ "encoding/base64"
"encoding/json"
"fmt"
+ "io"
"math/rand"
"net/url"
"os"
"path/filepath"
+ "strings"
"testing"
"time"
+ "github.com/google/uuid"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/stretchr/testify/require"
)
@@ -57,6 +62,39 @@ func (t TestJmapApiClient) Close() error {
return nil
}
+type TestJmapBlobClient struct {
+ t *testing.T
+}
+
+func NewTestJmapBlobClient(t *testing.T) BlobClient {
+ return &TestJmapBlobClient{t: t}
+}
+
+func (t TestJmapBlobClient) UploadBinary(ctx context.Context, logger *log.Logger, session *Session, uploadUrl string, contentType string, body io.Reader) (UploadedBlob, Error) {
+ bytes, err := io.ReadAll(body)
+ if err != nil {
+ return UploadedBlob{}, SimpleError{code: 0, err: err}
+ }
+ hasher := sha512.New()
+ hasher.Write(bytes)
+ return UploadedBlob{
+ Id: uuid.NewString(),
+ Size: len(bytes),
+ Type: contentType,
+ Sha512: base64.StdEncoding.EncodeToString(hasher.Sum(nil)),
+ }, nil
+}
+
+func (h *TestJmapBlobClient) DownloadBinary(ctx context.Context, logger *log.Logger, session *Session, downloadUrl string) (*BlobDownload, Error) {
+ return &BlobDownload{
+ Body: io.NopCloser(strings.NewReader("")),
+ Size: -1,
+ Type: "text/plain",
+ ContentDisposition: "attachment; filename=\"file.txt\"",
+ CacheControl: "",
+ }, nil
+}
+
func serveTestFile(t *testing.T, name string) ([]byte, Error) {
cwd, _ := os.Getwd()
p := filepath.Join(cwd, "testdata", name)
@@ -102,9 +140,10 @@ func TestRequests(t *testing.T) {
require := require.New(t)
apiClient := NewTestJmapApiClient(t)
wkClient := NewTestJmapWellKnownClient(t)
+ blobClient := NewTestJmapBlobClient(t)
logger := log.NopLogger()
ctx := context.Background()
- client := NewClient(wkClient, apiClient)
+ client := NewClient(wkClient, apiClient, blobClient)
jmapUrl, err := url.Parse("http://localhost/jmap")
require.NoError(err)
diff --git a/services/groupware/pkg/groupware/groupware_api_blob.go b/services/groupware/pkg/groupware/groupware_api_blob.go
index 33e5ae23b..b205b82e8 100644
--- a/services/groupware/pkg/groupware/groupware_api_blob.go
+++ b/services/groupware/pkg/groupware/groupware_api_blob.go
@@ -2,11 +2,17 @@ package groupware
import (
"fmt"
+ "io"
"net/http"
+ "strconv"
"github.com/go-chi/chi/v5"
)
+const (
+ DefaultBlobDownloadType = "application/octet-stream"
+)
+
func (g Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) (any, string, *Error) {
blobId := chi.URLParam(req.r, UriParamBlobId)
@@ -23,3 +29,74 @@ func (g Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
return res, res.Digest(), req.apiErrorFromJmap(err)
})
}
+
+func (g Groupware) UploadBlob(w http.ResponseWriter, r *http.Request) {
+ g.respond(w, r, func(req Request) (any, string, *Error) {
+ contentType := r.Header.Get("Content-Type")
+ body := r.Body
+ if body != nil {
+ defer func(Body io.ReadCloser) {
+ err := Body.Close()
+ if err != nil {
+ req.logger.Error().Err(err).Msg("failed to close response body")
+ }
+ }(body)
+ }
+
+ resp, err := g.jmap.UploadBlobStream(req.GetAccountId(), req.session, req.ctx, req.logger, contentType, body)
+ if err != nil {
+ return resp, "", req.apiErrorFromJmap(err)
+ }
+
+ return resp, resp.Sha512, nil
+ })
+}
+
+func (g Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) {
+ g.stream(w, r, func(req Request, w http.ResponseWriter) *Error {
+ blobId := chi.URLParam(req.r, UriParamBlobId)
+ name := chi.URLParam(req.r, UriParamBlobName)
+ q := req.r.URL.Query()
+ tipe := q.Get(QueryParamBlobType)
+ if tipe == "" {
+ tipe = DefaultBlobDownloadType
+ }
+
+ blob, jerr := g.jmap.DownloadBlobStream(req.GetAccountId(), blobId, name, tipe, req.session, req.ctx, req.logger)
+ if blob != nil && blob.Body != nil {
+ defer func(Body io.ReadCloser) {
+ err := Body.Close()
+ if err != nil {
+ req.logger.Error().Err(err).Msg("failed to close response body")
+ }
+ }(blob.Body)
+ }
+ if jerr != nil {
+ return req.apiErrorFromJmap(jerr)
+ }
+ if blob == nil {
+ w.WriteHeader(http.StatusNotFound)
+ return nil
+ }
+
+ if blob.Type != "" {
+ w.Header().Add("Content-Type", blob.Type)
+ }
+ if blob.CacheControl != "" {
+ w.Header().Add("Cache-Control", blob.CacheControl)
+ }
+ if blob.ContentDisposition != "" {
+ w.Header().Add("Content-Disposition", blob.ContentDisposition)
+ }
+ if blob.Size >= 0 {
+ w.Header().Add("Content-Size", strconv.Itoa(blob.Size))
+ }
+
+ _, err := io.Copy(w, blob.Body)
+ if err != nil {
+ return apiError(req.errorId(), ErrorStreamingResponse)
+ }
+
+ return nil
+ })
+}
diff --git a/services/groupware/pkg/groupware/groupware_error.go b/services/groupware/pkg/groupware/groupware_error.go
index f4fd01225..c3ca52f55 100644
--- a/services/groupware/pkg/groupware/groupware_error.go
+++ b/services/groupware/pkg/groupware/groupware_error.go
@@ -150,6 +150,7 @@ const (
ErrorCodeForbiddenGeneric = "AUTFOR"
ErrorCodeInvalidRequest = "INVREQ"
ErrorCodeServerResponse = "SRVRSP"
+ ErrorCodeStreamingResponse = "SRVRST"
ErrorCodeServerReadingResponse = "SRVRRE"
ErrorCodeServerDecodingResponseBody = "SRVDRB"
ErrorCodeEncodingRequestBody = "ENCREQ"
@@ -205,6 +206,12 @@ var (
Title: "Server Response Body could not be decoded",
Detail: "The mail server response body could not be decoded.",
}
+ ErrorStreamingResponse = GroupwareError{
+ Status: http.StatusInternalServerError,
+ Code: ErrorCodeStreamingResponse,
+ Title: "Server Response Body could not be streamed",
+ Detail: "The mail server response body could not be streamed.",
+ }
ErrorProcessingResponse = GroupwareError{
Status: http.StatusInternalServerError,
Code: ErrorCodeServerResponse,
diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go
index a81153b46..cb1912367 100644
--- a/services/groupware/pkg/groupware/groupware_framework.go
+++ b/services/groupware/pkg/groupware/groupware_framework.go
@@ -116,7 +116,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
masterPassword,
)
- jmapClient := jmap.NewClient(api, api)
+ jmapClient := jmap.NewClient(api, api, api)
var sessionCache *ttlcache.Cache[string, cachedSession]
{
@@ -337,6 +337,53 @@ func (g Groupware) respond(w http.ResponseWriter, r *http.Request, handler func(
}
}
+func (g Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(r Request, w http.ResponseWriter) *Error) {
+ ctx := r.Context()
+ logger := g.logger.SubloggerWithRequestID(ctx)
+
+ username, ok, err := g.usernameProvider.GetUsername(r, ctx, &logger)
+ if err != nil {
+ g.serveError(w, r, apiError(errorId(r, ctx), ErrorInvalidAuthentication))
+ return
+ }
+ if !ok {
+ g.serveError(w, r, apiError(errorId(r, ctx), ErrorMissingAuthentication))
+ return
+ }
+
+ logger = log.Logger{Logger: logger.With().Str(logUsername, logstr(username)).Logger()}
+
+ session, ok, err := g.session(username, r, ctx, &logger)
+ if err != nil {
+ logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session")
+ render.Status(r, http.StatusInternalServerError)
+ return
+ }
+ if !ok {
+ // no session = authentication failed
+ logger.Warn().Err(err).Interface(logQuery, r.URL.Query()).Msg("could not authenticate")
+ render.Status(r, http.StatusForbidden)
+ return
+ }
+ logger = session.DecorateLogger(logger)
+
+ req := Request{
+ r: r,
+ ctx: ctx,
+ logger: &logger,
+ session: &session,
+ }
+
+ apierr := handler(req, w)
+ if apierr != nil {
+ g.log(apierr)
+ w.Header().Add("Content-Type", ContentTypeJsonApi)
+ render.Status(r, apierr.NumStatus)
+ w.WriteHeader(apierr.NumStatus)
+ render.Render(w, r, errorResponses(*apierr))
+ }
+}
+
/*
func (g Groupware) withSession(w http.ResponseWriter, r *http.Request, handler func(r Request) (any, string, error)) (any, string, error) {
ctx := r.Context()
diff --git a/services/groupware/pkg/groupware/groupware_route.go b/services/groupware/pkg/groupware/groupware_route.go
index a1f24d6d4..144ffdbd7 100644
--- a/services/groupware/pkg/groupware/groupware_route.go
+++ b/services/groupware/pkg/groupware/groupware_route.go
@@ -11,6 +11,8 @@ const (
QueryParamSize = "size"
UriParamMessagesId = "id"
UriParamBlobId = "blobid"
+ UriParamBlobName = "blobname"
+ QueryParamBlobType = "type"
QueryParamSince = "since"
HeaderSince = "if-none-match"
)
@@ -20,14 +22,21 @@ func (g Groupware) Route(r chi.Router) {
r.Get("/accounts", g.GetAccounts)
r.Route("/accounts/{account}", func(r chi.Router) {
r.Get("/", g.GetAccount)
- r.Get("/mailboxes", g.GetMailboxes) // ?name=&role=&subcribed=
- r.Get("/mailboxes/{mailbox}", g.GetMailbox)
- r.Get("/mailboxes/{mailbox}/messages", g.GetAllMessages)
- r.Get("/messages/{id}", g.GetMessagesById)
- r.Get("/messages", g.GetMessageUpdates)
r.Get("/identity", g.GetIdentity)
r.Get("/vacation", g.GetVacation)
- r.Get("/blobs/{blobid}", g.GetBlob)
+ r.Route("/mailboxes", func(r chi.Router) {
+ r.Get("/", g.GetMailboxes) // ?name=&role=&subcribed=
+ r.Get("/{mailbox}", g.GetMailbox)
+ r.Get("/{mailbox}/messages", g.GetAllMessages)
+ })
+ r.Route("/messages", func(r chi.Router) {
+ r.Get("/", g.GetMessageUpdates)
+ r.Get("/{id}", g.GetMessagesById)
+ })
+ r.Route("/blobs", func(r chi.Router) {
+ r.Get("/{blobid}", g.GetBlob)
+ r.Get("/{blobid}/{blobname}", g.DownloadBlob) // ?type=
+ })
})
r.NotFound(g.NotFound)
}