groupware: blob streaming (upload and download)

This commit is contained in:
Pascal Bleser
2025-08-04 17:49:18 +02:00
parent 5d14c966d5
commit 5c561dfdf1
9 changed files with 322 additions and 9 deletions

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/url"
"strings"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/rs/zerolog"
@@ -18,6 +19,7 @@ type SessionEventListener interface {
type Client struct {
wellKnown SessionClient
api ApiClient
blob BlobClient
sessionEventListeners *eventListeners[SessionEventListener]
io.Closer
}
@@ -26,10 +28,11 @@ func (j *Client) Close() error {
return j.api.Close()
}
func NewClient(wellKnown SessionClient, api ApiClient) Client {
func NewClient(wellKnown SessionClient, api ApiClient, blob BlobClient) Client {
return Client{
wellKnown: wellKnown,
api: api,
blob: blob,
sessionEventListeners: newEventListeners[SessionEventListener](),
}
}
@@ -63,6 +66,9 @@ type Session struct {
// The upload URL template
UploadUrlTemplate string
// The upload URL template
DownloadUrlTemplate string
// TODO
DefaultMailAccountId string
@@ -91,12 +97,17 @@ func newSession(sessionResponse SessionResponse) (Session, Error) {
if uploadUrl == "" {
return Session{}, SimpleError{code: JmapErrorInvalidSessionResponse, err: fmt.Errorf("JMAP session response does not provide an upload URL")}
}
downloadUrl := sessionResponse.DownloadUrl
if downloadUrl == "" {
return Session{}, SimpleError{code: JmapErrorInvalidSessionResponse, err: fmt.Errorf("JMAP session response does not provide an download URL")}
}
return Session{
Username: username,
DefaultMailAccountId: mailAccountId,
JmapUrl: *apiUrl,
UploadUrlTemplate: uploadUrl,
DownloadUrlTemplate: downloadUrl,
SessionResponse: sessionResponse,
}, nil
}
@@ -126,6 +137,9 @@ const (
logOffset = "offset"
logLimit = "limit"
logApiUrl = "apiurl"
logDownloadUrl = "downloadurl"
logBlobId = "blobId"
logUploadUrl = "downloadurl"
logSessionState = "session-state"
logSince = "since"
@@ -540,6 +554,25 @@ type UploadedBlob struct {
Sha512 string `json:"sha:512"`
}
func (j *Client) UploadBlobStream(accountId string, session *Session, ctx context.Context, logger *log.Logger, contentType string, body io.Reader) (UploadedBlob, Error) {
aid := session.BlobAccountId(accountId)
// TODO(pbleser-oc) use a library for proper URL template parsing
uploadUrl := strings.ReplaceAll(session.UploadUrlTemplate, "{accountId}", aid)
return j.blob.UploadBinary(ctx, logger, session, uploadUrl, contentType, body)
}
func (j *Client) DownloadBlobStream(accountId string, blobId string, name string, typ string, session *Session, ctx context.Context, logger *log.Logger) (*BlobDownload, Error) {
aid := session.BlobAccountId(accountId)
// TODO(pbleser-oc) use a library for proper URL template parsing
downloadUrl := session.DownloadUrlTemplate
downloadUrl = strings.ReplaceAll(downloadUrl, "{accountId}", aid)
downloadUrl = strings.ReplaceAll(downloadUrl, "{blobId}", blobId)
downloadUrl = strings.ReplaceAll(downloadUrl, "{name}", name)
downloadUrl = strings.ReplaceAll(downloadUrl, "{type}", typ)
logger = &log.Logger{Logger: logger.With().Str(logDownloadUrl, downloadUrl).Str(logBlobId, blobId).Str(logAccountId, aid).Logger()}
return j.blob.DownloadBinary(ctx, logger, session, downloadUrl)
}
func (j *Client) UploadBlob(accountId string, session *Session, ctx context.Context, logger *log.Logger, data []byte, contentType string) (UploadedBlob, Error) {
aid := session.MailAccountId(accountId)

View File

@@ -15,3 +15,8 @@ type ApiClient interface {
type SessionClient interface {
GetSession(username string, logger *log.Logger) (SessionResponse, Error)
}
type BlobClient interface {
UploadBinary(ctx context.Context, logger *log.Logger, session *Session, uploadUrl string, contentType string, content io.Reader) (UploadedBlob, Error)
DownloadBinary(ctx context.Context, logger *log.Logger, session *Session, downloadUrl string) (*BlobDownload, Error)
}

View File

@@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/version"
@@ -24,6 +25,7 @@ type HttpJmapApiClient struct {
var (
_ ApiClient = &HttpJmapApiClient{}
_ SessionClient = &HttpJmapApiClient{}
_ BlobClient = &HttpJmapApiClient{}
)
/*
@@ -153,3 +155,88 @@ func (h *HttpJmapApiClient) Command(ctx context.Context, logger *log.Logger, ses
return body, nil
}
func (h *HttpJmapApiClient) UploadBinary(ctx context.Context, logger *log.Logger, session *Session, uploadUrl string, contentType string, body io.Reader) (UploadedBlob, Error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadUrl, body)
if err != nil {
logger.Error().Err(err).Msgf("failed to create POST request for %v", uploadUrl)
return UploadedBlob{}, SimpleError{code: JmapErrorCreatingRequest, err: err}
}
req.Header.Add("Content-Type", contentType)
req.Header.Add("User-Agent", h.userAgent)
h.auth(session.Username, logger, req)
res, err := h.client.Do(req)
if err != nil {
logger.Error().Err(err).Msgf("failed to perform POST %v", uploadUrl)
return UploadedBlob{}, SimpleError{code: JmapErrorSendingRequest, err: err}
}
if res.StatusCode < 200 || res.StatusCode > 299 {
logger.Error().Str("status", res.Status).Msg("HTTP response status code is not 2xx")
return UploadedBlob{}, SimpleError{code: JmapErrorServerResponse, err: err}
}
if res.Body != nil {
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
logger.Error().Err(err).Msg("failed to close response body")
}
}(res.Body)
}
responseBody, err := io.ReadAll(res.Body)
if err != nil {
logger.Error().Err(err).Msg("failed to read response body")
return UploadedBlob{}, SimpleError{code: JmapErrorServerResponse, err: err}
}
var result UploadedBlob
err = json.Unmarshal(responseBody, &result)
if err != nil {
logger.Error().Str("url", uploadUrl).Err(err).Msg("failed to decode JSON payload from the upload response")
return UploadedBlob{}, SimpleError{code: JmapErrorDecodingResponseBody, err: err}
}
return result, nil
}
func (h *HttpJmapApiClient) DownloadBinary(ctx context.Context, logger *log.Logger, session *Session, downloadUrl string) (*BlobDownload, Error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadUrl, nil)
if err != nil {
logger.Error().Err(err).Msgf("failed to create GET request for %v", downloadUrl)
return nil, SimpleError{code: JmapErrorCreatingRequest, err: err}
}
req.Header.Add("User-Agent", h.userAgent)
h.auth(session.Username, logger, req)
res, err := h.client.Do(req)
if err != nil {
logger.Error().Err(err).Msgf("failed to perform GET %v", downloadUrl)
return nil, SimpleError{code: JmapErrorSendingRequest, err: err}
}
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if res.StatusCode < 200 || res.StatusCode > 299 {
logger.Error().Str("status", res.Status).Msg("HTTP response status code is not 2xx")
return nil, SimpleError{code: JmapErrorServerResponse, err: err}
}
sizeStr := res.Header.Get("Content-Length")
size := -1
if sizeStr != "" {
size, err = strconv.Atoi(sizeStr)
if err != nil {
logger.Warn().Err(err).Msgf("failed to parse Content-Length blob download response header value '%v'", sizeStr)
size = -1
}
}
return &BlobDownload{
Body: res.Body,
Size: size,
Type: res.Header.Get("Content-Type"),
ContentDisposition: res.Header.Get("Content-Disposition"),
CacheControl: res.Header.Get("Cache-Control"),
}, nil
}

View File

@@ -1,6 +1,7 @@
package jmap
import (
"io"
"time"
)
@@ -1297,6 +1298,14 @@ type BlobGetResponse struct {
NotFound []any `json:"notFound,omitempty"`
}
type BlobDownload struct {
Body io.ReadCloser
Size int
Type string
ContentDisposition string
CacheControl string
}
const (
BlobGet Command = "Blob/get"
BlobUpload Command = "Blob/upload"

View File

@@ -2,15 +2,20 @@ package jmap
import (
"context"
"crypto/sha512"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/url"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/google/uuid"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/stretchr/testify/require"
)
@@ -57,6 +62,39 @@ func (t TestJmapApiClient) Close() error {
return nil
}
type TestJmapBlobClient struct {
t *testing.T
}
func NewTestJmapBlobClient(t *testing.T) BlobClient {
return &TestJmapBlobClient{t: t}
}
func (t TestJmapBlobClient) UploadBinary(ctx context.Context, logger *log.Logger, session *Session, uploadUrl string, contentType string, body io.Reader) (UploadedBlob, Error) {
bytes, err := io.ReadAll(body)
if err != nil {
return UploadedBlob{}, SimpleError{code: 0, err: err}
}
hasher := sha512.New()
hasher.Write(bytes)
return UploadedBlob{
Id: uuid.NewString(),
Size: len(bytes),
Type: contentType,
Sha512: base64.StdEncoding.EncodeToString(hasher.Sum(nil)),
}, nil
}
func (h *TestJmapBlobClient) DownloadBinary(ctx context.Context, logger *log.Logger, session *Session, downloadUrl string) (*BlobDownload, Error) {
return &BlobDownload{
Body: io.NopCloser(strings.NewReader("")),
Size: -1,
Type: "text/plain",
ContentDisposition: "attachment; filename=\"file.txt\"",
CacheControl: "",
}, nil
}
func serveTestFile(t *testing.T, name string) ([]byte, Error) {
cwd, _ := os.Getwd()
p := filepath.Join(cwd, "testdata", name)
@@ -102,9 +140,10 @@ func TestRequests(t *testing.T) {
require := require.New(t)
apiClient := NewTestJmapApiClient(t)
wkClient := NewTestJmapWellKnownClient(t)
blobClient := NewTestJmapBlobClient(t)
logger := log.NopLogger()
ctx := context.Background()
client := NewClient(wkClient, apiClient)
client := NewClient(wkClient, apiClient, blobClient)
jmapUrl, err := url.Parse("http://localhost/jmap")
require.NoError(err)

View File

@@ -2,11 +2,17 @@ package groupware
import (
"fmt"
"io"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
)
const (
DefaultBlobDownloadType = "application/octet-stream"
)
func (g Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) (any, string, *Error) {
blobId := chi.URLParam(req.r, UriParamBlobId)
@@ -23,3 +29,74 @@ func (g Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
return res, res.Digest(), req.apiErrorFromJmap(err)
})
}
func (g Groupware) UploadBlob(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) (any, string, *Error) {
contentType := r.Header.Get("Content-Type")
body := r.Body
if body != nil {
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
req.logger.Error().Err(err).Msg("failed to close response body")
}
}(body)
}
resp, err := g.jmap.UploadBlobStream(req.GetAccountId(), req.session, req.ctx, req.logger, contentType, body)
if err != nil {
return resp, "", req.apiErrorFromJmap(err)
}
return resp, resp.Sha512, nil
})
}
func (g Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) {
g.stream(w, r, func(req Request, w http.ResponseWriter) *Error {
blobId := chi.URLParam(req.r, UriParamBlobId)
name := chi.URLParam(req.r, UriParamBlobName)
q := req.r.URL.Query()
tipe := q.Get(QueryParamBlobType)
if tipe == "" {
tipe = DefaultBlobDownloadType
}
blob, jerr := g.jmap.DownloadBlobStream(req.GetAccountId(), blobId, name, tipe, req.session, req.ctx, req.logger)
if blob != nil && blob.Body != nil {
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
req.logger.Error().Err(err).Msg("failed to close response body")
}
}(blob.Body)
}
if jerr != nil {
return req.apiErrorFromJmap(jerr)
}
if blob == nil {
w.WriteHeader(http.StatusNotFound)
return nil
}
if blob.Type != "" {
w.Header().Add("Content-Type", blob.Type)
}
if blob.CacheControl != "" {
w.Header().Add("Cache-Control", blob.CacheControl)
}
if blob.ContentDisposition != "" {
w.Header().Add("Content-Disposition", blob.ContentDisposition)
}
if blob.Size >= 0 {
w.Header().Add("Content-Size", strconv.Itoa(blob.Size))
}
_, err := io.Copy(w, blob.Body)
if err != nil {
return apiError(req.errorId(), ErrorStreamingResponse)
}
return nil
})
}

View File

@@ -150,6 +150,7 @@ const (
ErrorCodeForbiddenGeneric = "AUTFOR"
ErrorCodeInvalidRequest = "INVREQ"
ErrorCodeServerResponse = "SRVRSP"
ErrorCodeStreamingResponse = "SRVRST"
ErrorCodeServerReadingResponse = "SRVRRE"
ErrorCodeServerDecodingResponseBody = "SRVDRB"
ErrorCodeEncodingRequestBody = "ENCREQ"
@@ -205,6 +206,12 @@ var (
Title: "Server Response Body could not be decoded",
Detail: "The mail server response body could not be decoded.",
}
ErrorStreamingResponse = GroupwareError{
Status: http.StatusInternalServerError,
Code: ErrorCodeStreamingResponse,
Title: "Server Response Body could not be streamed",
Detail: "The mail server response body could not be streamed.",
}
ErrorProcessingResponse = GroupwareError{
Status: http.StatusInternalServerError,
Code: ErrorCodeServerResponse,

View File

@@ -116,7 +116,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
masterPassword,
)
jmapClient := jmap.NewClient(api, api)
jmapClient := jmap.NewClient(api, api, api)
var sessionCache *ttlcache.Cache[string, cachedSession]
{
@@ -337,6 +337,53 @@ func (g Groupware) respond(w http.ResponseWriter, r *http.Request, handler func(
}
}
func (g Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(r Request, w http.ResponseWriter) *Error) {
ctx := r.Context()
logger := g.logger.SubloggerWithRequestID(ctx)
username, ok, err := g.usernameProvider.GetUsername(r, ctx, &logger)
if err != nil {
g.serveError(w, r, apiError(errorId(r, ctx), ErrorInvalidAuthentication))
return
}
if !ok {
g.serveError(w, r, apiError(errorId(r, ctx), ErrorMissingAuthentication))
return
}
logger = log.Logger{Logger: logger.With().Str(logUsername, logstr(username)).Logger()}
session, ok, err := g.session(username, r, ctx, &logger)
if err != nil {
logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session")
render.Status(r, http.StatusInternalServerError)
return
}
if !ok {
// no session = authentication failed
logger.Warn().Err(err).Interface(logQuery, r.URL.Query()).Msg("could not authenticate")
render.Status(r, http.StatusForbidden)
return
}
logger = session.DecorateLogger(logger)
req := Request{
r: r,
ctx: ctx,
logger: &logger,
session: &session,
}
apierr := handler(req, w)
if apierr != nil {
g.log(apierr)
w.Header().Add("Content-Type", ContentTypeJsonApi)
render.Status(r, apierr.NumStatus)
w.WriteHeader(apierr.NumStatus)
render.Render(w, r, errorResponses(*apierr))
}
}
/*
func (g Groupware) withSession(w http.ResponseWriter, r *http.Request, handler func(r Request) (any, string, error)) (any, string, error) {
ctx := r.Context()

View File

@@ -11,6 +11,8 @@ const (
QueryParamSize = "size"
UriParamMessagesId = "id"
UriParamBlobId = "blobid"
UriParamBlobName = "blobname"
QueryParamBlobType = "type"
QueryParamSince = "since"
HeaderSince = "if-none-match"
)
@@ -20,14 +22,21 @@ func (g Groupware) Route(r chi.Router) {
r.Get("/accounts", g.GetAccounts)
r.Route("/accounts/{account}", func(r chi.Router) {
r.Get("/", g.GetAccount)
r.Get("/mailboxes", g.GetMailboxes) // ?name=&role=&subcribed=
r.Get("/mailboxes/{mailbox}", g.GetMailbox)
r.Get("/mailboxes/{mailbox}/messages", g.GetAllMessages)
r.Get("/messages/{id}", g.GetMessagesById)
r.Get("/messages", g.GetMessageUpdates)
r.Get("/identity", g.GetIdentity)
r.Get("/vacation", g.GetVacation)
r.Get("/blobs/{blobid}", g.GetBlob)
r.Route("/mailboxes", func(r chi.Router) {
r.Get("/", g.GetMailboxes) // ?name=&role=&subcribed=
r.Get("/{mailbox}", g.GetMailbox)
r.Get("/{mailbox}/messages", g.GetAllMessages)
})
r.Route("/messages", func(r chi.Router) {
r.Get("/", g.GetMessageUpdates)
r.Get("/{id}", g.GetMessagesById)
})
r.Route("/blobs", func(r chi.Router) {
r.Get("/{blobid}", g.GetBlob)
r.Get("/{blobid}/{blobname}", g.DownloadBlob) // ?type=
})
})
r.NotFound(g.NotFound)
}