mirror of
https://github.com/kopia/kopia.git
synced 2026-03-27 10:32:08 -04:00
server: pre-read request body to fix HTTP/2 deadlock
Fixes #538 (hopefully)
This commit is contained in:
@@ -3,7 +3,6 @@
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@@ -13,7 +12,7 @@
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleContentGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
dr, ok := s.rep.(*repo.DirectRepository)
|
||||
if !ok {
|
||||
return nil, notFoundError("content not found")
|
||||
@@ -29,7 +28,7 @@ func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interfa
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleContentInfo(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
dr, ok := s.rep.(*repo.DirectRepository)
|
||||
if !ok {
|
||||
return nil, notFoundError("content not found")
|
||||
@@ -50,7 +49,7 @@ func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interf
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleContentPut(ctx context.Context, r *http.Request, data []byte) (interface{}, *apiError) {
|
||||
dr, ok := s.rep.(*repo.DirectRepository)
|
||||
if !ok {
|
||||
return nil, notFoundError("content not found")
|
||||
@@ -59,11 +58,6 @@ func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interfa
|
||||
cid := content.ID(mux.Vars(r)["contentID"])
|
||||
prefix := cid.Prefix()
|
||||
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
|
||||
}
|
||||
|
||||
actualCID, err := dr.Content.WriteContent(ctx, data, prefix)
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
)
|
||||
|
||||
func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleManifestGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
// password already validated by a wrapper, no need to check here.
|
||||
userAtHost, _, _ := r.BasicAuth()
|
||||
|
||||
@@ -40,7 +40,7 @@ func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interf
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
mid := manifest.ID(mux.Vars(r)["manifestID"])
|
||||
|
||||
err := s.rep.DeleteManifest(ctx, mid)
|
||||
@@ -55,7 +55,7 @@ func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (int
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleManifestList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleManifestList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
// password already validated by a wrapper, no need to check here.
|
||||
userAtHost, _, _ := r.BasicAuth()
|
||||
|
||||
@@ -95,10 +95,10 @@ func filterManifests(manifests []*manifest.EntryMetadata, userAtHost string) []*
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
var req remoterepoapi.ManifestWithMetadata
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
if err := json.Unmarshal(body, &req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request")
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handlePolicyList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
policies, err := policy.ListPolicies(ctx, s.rep)
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
@@ -50,7 +50,7 @@ func getPolicyTargetFromURL(u *url.URL) snapshot.SourceInfo {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
pol, err := policy.GetDefinedPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL))
|
||||
if errors.Is(err, policy.ErrPolicyNotFound) {
|
||||
return nil, requestError(serverapi.ErrorNotFound, "policy not found")
|
||||
@@ -59,7 +59,7 @@ func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interfac
|
||||
return pol, nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if err := policy.RemovePolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
@@ -71,9 +71,9 @@ func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (inter
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
newPolicy := &policy.Policy{}
|
||||
if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil {
|
||||
if err := json.Unmarshal(body, newPolicy); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
dr, ok := s.rep.(*repo.DirectRepository)
|
||||
if !ok {
|
||||
return &serverapi.StatusResponse{
|
||||
@@ -37,7 +37,7 @@ func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (int
|
||||
return rp, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if s.rep == nil {
|
||||
return &serverapi.StatusResponse{
|
||||
Connected: false,
|
||||
@@ -79,14 +79,14 @@ func maybeDecodeToken(req *serverapi.ConnectRepositoryRequest) *apiError {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if s.rep != nil {
|
||||
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
|
||||
}
|
||||
|
||||
var req serverapi.CreateRepositoryRequest
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
if err := json.Unmarshal(body, &req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
|
||||
}
|
||||
|
||||
@@ -125,17 +125,17 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa
|
||||
return nil, internalServerError(errors.Wrap(err, "flush"))
|
||||
}
|
||||
|
||||
return s.handleRepoStatus(ctx, r)
|
||||
return s.handleRepoStatus(ctx, r, nil)
|
||||
}
|
||||
|
||||
func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if s.rep != nil {
|
||||
return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected")
|
||||
}
|
||||
|
||||
var req serverapi.ConnectRepositoryRequest
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
if err := json.Unmarshal(body, &req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error())
|
||||
}
|
||||
|
||||
@@ -147,10 +147,10 @@ func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interf
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.handleRepoStatus(ctx, r)
|
||||
return s.handleRepoStatus(ctx, r, nil)
|
||||
}
|
||||
|
||||
func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
res := &serverapi.SupportedAlgorithmsResponse{
|
||||
DefaultHashAlgorithm: hashing.DefaultAlgorithm,
|
||||
HashAlgorithms: hashing.SupportedAlgorithms(),
|
||||
@@ -200,7 +200,7 @@ func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, p
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
// release shared lock so that SetRepository can acquire exclusive lock
|
||||
s.mu.RUnlock()
|
||||
err := s.SetRepository(ctx, nil)
|
||||
@@ -217,7 +217,7 @@ func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (int
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleRepoSync(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRepoSync(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if err := s.rep.Refresh(ctx); err != nil {
|
||||
return nil, internalServerError(errors.Wrap(err, "unable to refresh repository"))
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil)
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
)
|
||||
|
||||
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleSourcesList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
resp := &serverapi.SourcesResponse{
|
||||
Sources: []*serverapi.SourceStatus{},
|
||||
LocalHost: s.rep.Hostname(),
|
||||
@@ -37,10 +37,10 @@ func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interf
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
var req serverapi.CreateSnapshotSourceRequest
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
if err := json.Unmarshal(body, &req); err != nil {
|
||||
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
@@ -25,6 +26,8 @@
|
||||
|
||||
const maintenanceAttemptFrequency = 10 * time.Minute
|
||||
|
||||
type apiRequestFunc func(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError)
|
||||
|
||||
// Server exposes simple HTTP API for programmatically accessing Kopia features.
|
||||
type Server struct {
|
||||
OnShutdown func(ctx context.Context) error
|
||||
@@ -85,30 +88,39 @@ func (s *Server) APIHandlers() http.Handler {
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc {
|
||||
return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleAPI(f apiRequestFunc) http.HandlerFunc {
|
||||
return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if s.rep == nil {
|
||||
return nil, requestError(serverapi.ErrorNotConnected, "not connected")
|
||||
}
|
||||
|
||||
return f(ctx, r)
|
||||
return f(ctx, r, body)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc {
|
||||
func (s *Server) handleAPIPossiblyNotConnected(f apiRequestFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
// we must pre-read request body before acquiring the lock as it sometimes leads to deadlock
|
||||
// in HTTP/2 server.
|
||||
// See https://github.com/golang/go/issues/40816
|
||||
body, berr := ioutil.ReadAll(r.Body)
|
||||
if berr != nil {
|
||||
http.Error(w, "error reading request body", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
log(ctx).Debugf("request %v", r.URL)
|
||||
log(ctx).Debugf("request %v (%v bytes)", r.URL, len(body))
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
e := json.NewEncoder(w)
|
||||
e.SetIndent("", " ")
|
||||
|
||||
v, err := f(ctx, r)
|
||||
v, err := f(ctx, r, body)
|
||||
|
||||
if err == nil {
|
||||
if b, ok := v.([]byte); ok {
|
||||
@@ -134,7 +146,7 @@ func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *ht
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleRefresh(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if err := s.rep.Refresh(ctx); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
@@ -142,7 +154,7 @@ func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleFlush(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
if err := s.rep.Flush(ctx); err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
@@ -150,7 +162,7 @@ func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{},
|
||||
return &serverapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleShutdown(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleShutdown(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
log(ctx).Infof("shutting down due to API request")
|
||||
|
||||
if f := s.OnShutdown; f != nil {
|
||||
@@ -180,11 +192,11 @@ func (s *Server) forAllSourceManagersMatchingURLFilter(ctx context.Context, c fu
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleUpload(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
return s.forAllSourceManagersMatchingURLFilter(ctx, (*sourceManager).upload, r.URL.Query())
|
||||
}
|
||||
|
||||
func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) {
|
||||
func (s *Server) handleCancel(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) {
|
||||
return s.forAllSourceManagersMatchingURLFilter(ctx, (*sourceManager).cancel, r.URL.Query())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user