server: pre-read request body to fix HTTP/2 deadlock

Fixes #538 (hopefully)
This commit is contained in:
Jarek Kowalski
2020-08-15 19:22:37 -07:00
parent 923c91b5a4
commit 2b029a418e
7 changed files with 51 additions and 45 deletions

View File

@@ -3,7 +3,6 @@
import (
"context"
"errors"
"io/ioutil"
"net/http"
"github.com/gorilla/mux"
@@ -13,7 +12,7 @@
"github.com/kopia/kopia/repo/content"
)
func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleContentGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
@@ -29,7 +28,7 @@ func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interfa
return data, nil
}
func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleContentInfo(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
@@ -50,7 +49,7 @@ func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interf
}
}
func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleContentPut(ctx context.Context, r *http.Request, data []byte) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return nil, notFoundError("content not found")
@@ -59,11 +58,6 @@ func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interfa
cid := content.ID(mux.Vars(r)["contentID"])
prefix := cid.Prefix()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
}
actualCID, err := dr.Content.WriteContent(ctx, data, prefix)
if err != nil {
return nil, internalServerError(err)

View File

@@ -13,7 +13,7 @@
"github.com/kopia/kopia/repo/manifest"
)
func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleManifestGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
// password already validated by a wrapper, no need to check here.
userAtHost, _, _ := r.BasicAuth()
@@ -40,7 +40,7 @@ func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interf
}, nil
}
func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
mid := manifest.ID(mux.Vars(r)["manifestID"])
err := s.rep.DeleteManifest(ctx, mid)
@@ -55,7 +55,7 @@ func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (int
return &serverapi.Empty{}, nil
}
func (s *Server) handleManifestList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleManifestList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
// password already validated by a wrapper, no need to check here.
userAtHost, _, _ := r.BasicAuth()
@@ -95,10 +95,10 @@ func filterManifests(manifests []*manifest.EntryMetadata, userAtHost string) []*
return result
}
func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
var req remoterepoapi.ManifestWithMetadata
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
if err := json.Unmarshal(body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request")
}

View File

@@ -12,7 +12,7 @@
"github.com/kopia/kopia/snapshot/policy"
)
func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handlePolicyList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
policies, err := policy.ListPolicies(ctx, s.rep)
if err != nil {
return nil, internalServerError(err)
@@ -50,7 +50,7 @@ func getPolicyTargetFromURL(u *url.URL) snapshot.SourceInfo {
}
}
func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
pol, err := policy.GetDefinedPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL))
if errors.Is(err, policy.ErrPolicyNotFound) {
return nil, requestError(serverapi.ErrorNotFound, "policy not found")
@@ -59,7 +59,7 @@ func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interfac
return pol, nil
}
func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if err := policy.RemovePolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)); err != nil {
return nil, internalServerError(err)
}
@@ -71,9 +71,9 @@ func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (inter
return &serverapi.Empty{}, nil
}
func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
newPolicy := &policy.Policy{}
if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil {
if err := json.Unmarshal(body, newPolicy); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
}

View File

@@ -20,7 +20,7 @@
"github.com/kopia/kopia/snapshot/policy"
)
func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
dr, ok := s.rep.(*repo.DirectRepository)
if !ok {
return &serverapi.StatusResponse{
@@ -37,7 +37,7 @@ func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (int
return rp, nil
}
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if s.rep == nil {
return &serverapi.StatusResponse{
Connected: false,
@@ -79,14 +79,14 @@ func maybeDecodeToken(req *serverapi.ConnectRepositoryRequest) *apiError {
return nil
}
func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if s.rep != nil {
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
}
var req serverapi.CreateRepositoryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
if err := json.Unmarshal(body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
}
@@ -125,17 +125,17 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa
return nil, internalServerError(errors.Wrap(err, "flush"))
}
return s.handleRepoStatus(ctx, r)
return s.handleRepoStatus(ctx, r, nil)
}
func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if s.rep != nil {
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
}
var req serverapi.ConnectRepositoryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
if err := json.Unmarshal(body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
}
@@ -147,10 +147,10 @@ func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interf
return nil, err
}
return s.handleRepoStatus(ctx, r)
return s.handleRepoStatus(ctx, r, nil)
}
func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
res := &serverapi.SupportedAlgorithmsResponse{
DefaultHashAlgorithm: hashing.DefaultAlgorithm,
HashAlgorithms: hashing.SupportedAlgorithms(),
@@ -200,7 +200,7 @@ func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, p
return nil
}
func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
// release shared lock so that SetRepository can acquire exclusive lock
s.mu.RUnlock()
err := s.SetRepository(ctx, nil)
@@ -217,7 +217,7 @@ func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (int
return &serverapi.Empty{}, nil
}
func (s *Server) handleRepoSync(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRepoSync(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if err := s.rep.Refresh(ctx); err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to refresh repository"))
}

View File

@@ -10,7 +10,7 @@
"github.com/kopia/kopia/snapshot/policy"
)
func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil)
if err != nil {
return nil, internalServerError(err)

View File

@@ -15,7 +15,7 @@
"github.com/kopia/kopia/snapshot/policy"
)
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
resp := &serverapi.SourcesResponse{
Sources: []*serverapi.SourceStatus{},
LocalHost: s.rep.Hostname(),
@@ -37,10 +37,10 @@ func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interf
return resp, nil
}
func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
var req serverapi.CreateSnapshotSourceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
if err := json.Unmarshal(body, &req); err != nil {
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
}

View File

@@ -4,6 +4,7 @@
import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"sync"
@@ -25,6 +26,8 @@
const maintenanceAttemptFrequency = 10 * time.Minute
type apiRequestFunc func(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError)
// Server exposes simple HTTP API for programmatically accessing Kopia features.
type Server struct {
OnShutdown func(ctx context.Context) error
@@ -85,30 +88,39 @@ func (s *Server) APIHandlers() http.Handler {
return m
}
func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc {
return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleAPI(f apiRequestFunc) http.HandlerFunc {
return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if s.rep == nil {
return nil, requestError(serverapi.ErrorNotConnected, "not connected")
}
return f(ctx, r)
return f(ctx, r, body)
})
}
func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc {
func (s *Server) handleAPIPossiblyNotConnected(f apiRequestFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// we must pre-read request body before acquiring the lock as it sometimes leads to deadlock
// in HTTP/2 server.
// See https://github.com/golang/go/issues/40816
body, berr := ioutil.ReadAll(r.Body)
if berr != nil {
http.Error(w, "error reading request body", http.StatusInternalServerError)
return
}
s.mu.RLock()
defer s.mu.RUnlock()
ctx := r.Context()
log(ctx).Debugf("request %v", r.URL)
log(ctx).Debugf("request %v (%v bytes)", r.URL, len(body))
w.Header().Set("Content-Type", "application/json")
e := json.NewEncoder(w)
e.SetIndent("", " ")
v, err := f(ctx, r)
v, err := f(ctx, r, body)
if err == nil {
if b, ok := v.([]byte); ok {
@@ -134,7 +146,7 @@ func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *ht
}
}
func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleRefresh(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if err := s.rep.Refresh(ctx); err != nil {
return nil, internalServerError(err)
}
@@ -142,7 +154,7 @@ func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{
return &serverapi.Empty{}, nil
}
func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleFlush(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
if err := s.rep.Flush(ctx); err != nil {
return nil, internalServerError(err)
}
@@ -150,7 +162,7 @@ func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{},
return &serverapi.Empty{}, nil
}
func (s *Server) handleShutdown(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleShutdown(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
log(ctx).Infof("shutting down due to API request")
if f := s.OnShutdown; f != nil {
@@ -180,11 +192,11 @@ func (s *Server) forAllSourceManagersMatchingURLFilter(ctx context.Context, c fu
return resp, nil
}
func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleUpload(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
return s.forAllSourceManagersMatchingURLFilter(ctx, (*sourceManager).upload, r.URL.Query())
}
func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) {
func (s *Server) handleCancel(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
return s.forAllSourceManagersMatchingURLFilter(ctx, (*sourceManager).cancel, r.URL.Query())
}