From d9c9650afd9110f02d0aa9366e0061dfe141bacf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 5 Mar 2024 12:03:56 +0100 Subject: [PATCH] always select next before making calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../unreleased/next-before-making-calls.md | 5 ++++ services/clientlog/pkg/service/service.go | 10 ++++++++ .../pkg/service/v0/api_drives_drive_item.go | 8 ++++++ services/storage-users/pkg/task/trash_bin.go | 12 +++++++++ services/userlog/pkg/service/conversion.go | 25 ++++++++++++++----- services/userlog/pkg/service/http.go | 3 +-- services/userlog/pkg/service/service.go | 11 +++++--- 7 files changed, 63 insertions(+), 11 deletions(-) create mode 100644 changelog/unreleased/next-before-making-calls.md diff --git a/changelog/unreleased/next-before-making-calls.md b/changelog/unreleased/next-before-making-calls.md new file mode 100644 index 0000000000..7c1c3b1869 --- /dev/null +++ b/changelog/unreleased/next-before-making-calls.md @@ -0,0 +1,5 @@ +Bugfix: always select next before making calls + +We now select the next client more often to spread out load + +https://github.com/owncloud/ocis/pull/8578 diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index b5455f5717..391b4f8266 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -90,6 +90,11 @@ func (cl *ClientlogService) processEvent(event events.Event) { return } + gwc, err = cl.gatewaySelector.Next() + if err != nil { + cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client") + return + } var ( users []string evType string @@ -129,6 +134,11 @@ func (cl *ClientlogService) processEvent(event events.Event) { InitiatorID: event.InitiatorID, } + gwc, err := cl.gatewaySelector.Next() + if err != nil { + cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client") + return + } users, err = utils.GetSpaceMembers(ctx, e.ID.GetSpaceId(), gwc, utils.ViewerRole) break } diff --git a/services/graph/pkg/service/v0/api_drives_drive_item.go b/services/graph/pkg/service/v0/api_drives_drive_item.go index 76c2516ffb..da2f214263 100644 --- a/services/graph/pkg/service/v0/api_drives_drive_item.go +++ b/services/graph/pkg/service/v0/api_drives_drive_item.go @@ -79,6 +79,10 @@ func (s DrivesDriveItemService) UnmountShare(ctx context.Context, resourceID sto } // Find all accepted shares for this resource + gatewayClient, err = s.gatewaySelector.Next() + if err != nil { + return err + } receivedSharesResponse, err := gatewayClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{ Filters: []*collaboration.Filter{ { @@ -188,6 +192,10 @@ func (s DrivesDriveItemService) MountShare(ctx context.Context, resourceID stora UpdateMask: updateMask, } + gatewayClient, err = s.gatewaySelector.Next() + if err != nil { + return libregraph.DriveItem{}, err + } updateReceivedShareResponse, err := gatewayClient.UpdateReceivedShare(ctx, updateReceivedShareRequest) switch errCode := errorcode.FromCS3Status(updateReceivedShareResponse.GetStatus(), err); { case errCode == nil: diff --git a/services/storage-users/pkg/task/trash_bin.go b/services/storage-users/pkg/task/trash_bin.go index e7bb2cb63d..7cccf4032c 100644 --- a/services/storage-users/pkg/task/trash_bin.go +++ b/services/storage-users/pkg/task/trash_bin.go @@ -26,6 +26,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp return err } + gatewayClient, err = gatewaySelector.Next() + if err != nil { + return err + } listStorageSpacesResponse, err := gatewayClient.ListStorageSpaces(ctx, &apiProvider.ListStorageSpacesRequest{ Filters: []*apiProvider.ListStorageSpacesRequest_Filter{ { @@ -49,6 +53,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp ResourceId: storageSpace.GetRoot(), } + gatewayClient, err = gatewaySelector.Next() + if err != nil { + return err + } listRecycleResponse, err := gatewayClient.ListRecycle(ctx, &apiProvider.ListRecycleRequest{Ref: storageSpaceReference}) if err != nil { return err @@ -60,6 +68,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp continue } + gatewayClient, err = gatewaySelector.Next() + if err != nil { + return err + } purgeRecycleResponse, err := gatewayClient.PurgeRecycle(ctx, &apiProvider.PurgeRecycleRequest{ Ref: storageSpaceReference, Key: recycleItem.Key, diff --git a/services/userlog/pkg/service/conversion.go b/services/userlog/pkg/service/conversion.go index 967e75cdcf..c0075aeb52 100644 --- a/services/userlog/pkg/service/conversion.go +++ b/services/userlog/pkg/service/conversion.go @@ -15,6 +15,7 @@ import ( collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/l10n" @@ -51,7 +52,7 @@ type OC10Notification struct { // Converter is responsible for converting eventhistory events to OC10Notifications type Converter struct { locale string - gwc gateway.GatewayAPIClient + gatewaySelector pool.Selectable[gateway.GatewayAPIClient] serviceName string translationPath string defaultLanguage string @@ -64,10 +65,10 @@ type Converter struct { } // NewConverter returns a new Converter -func NewConverter(ctx context.Context, loc string, gwc gateway.GatewayAPIClient, name, translationPath, defaultLanguage string) *Converter { +func NewConverter(ctx context.Context, loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], name, translationPath, defaultLanguage string) *Converter { return &Converter{ locale: loc, - gwc: gwc, + gatewaySelector: gatewaySelector, serviceName: name, translationPath: translationPath, defaultLanguage: defaultLanguage, @@ -319,7 +320,11 @@ func (c *Converter) getSpace(ctx context.Context, spaceID string) (*storageprovi if space, ok := c.spaces[spaceID]; ok { return space, nil } - space, err := utils.GetSpace(ctx, spaceID, c.gwc) + gwc, err := c.gatewaySelector.Next() + if err != nil { + return nil, err + } + space, err := utils.GetSpace(ctx, spaceID, gwc) if err == nil { c.spaces[spaceID] = space } @@ -330,7 +335,11 @@ func (c *Converter) getResource(ctx context.Context, resourceID *storageprovider if r, ok := c.resources[resourceID.GetOpaqueId()]; ok { return r, nil } - resource, err := utils.GetResourceByID(ctx, resourceID, c.gwc) + gwc, err := c.gatewaySelector.Next() + if err != nil { + return nil, err + } + resource, err := utils.GetResourceByID(ctx, resourceID, gwc) if err == nil { c.resources[resourceID.GetOpaqueId()] = resource } @@ -341,7 +350,11 @@ func (c *Converter) getUser(_ context.Context, userID *user.UserId) (*user.User, if u, ok := c.users[userID.GetOpaqueId()]; ok { return u, nil } - usr, err := utils.GetUser(userID, c.gwc) + gwc, err := c.gatewaySelector.Next() + if err != nil { + return nil, err + } + usr, err := utils.GetUser(userID, gwc) if err == nil { c.users[userID.GetOpaqueId()] = usr } diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go index 74a8db8d30..6f8eacf81e 100644 --- a/services/userlog/pkg/service/http.go +++ b/services/userlog/pkg/service/http.go @@ -51,7 +51,6 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request w.WriteHeader(http.StatusInternalServerError) return } - ctx, err = utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret) if err != nil { ul.log.Error().Err(err).Msg("cant get service account") @@ -59,7 +58,7 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request return } - conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage) + conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage) var outdatedEvents []string resp := GetEventResponseOC10{} diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index f34e3ca157..f18e4c18c6 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -122,6 +122,11 @@ func (ul *UserlogService) processEvent(event events.Event) { return } + gwc, err = ul.gatewaySelector.Next() + if err != nil { + ul.log.Error().Err(err).Msg("cannot get gateway client") + return + } switch e := event.Event.(type) { default: err = errors.New("unhandled event") @@ -196,7 +201,7 @@ func (ul *UserlogService) processEvent(event events.Event) { // IV) send sses if !ul.cfg.DisableSSE { - if err := ul.sendSSE(ctx, users, event, gwc); err != nil { + if err := ul.sendSSE(ctx, users, event, ul.gatewaySelector); err != nil { ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event") } } @@ -337,7 +342,7 @@ func (ul *UserlogService) addEventToUser(userid string, event events.Event) erro }) } -func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error { +func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) error { m := make(map[string]events.SendSSE) for _, userid := range userIDs { @@ -348,7 +353,7 @@ func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event e continue } - ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event) + ev, err := NewConverter(ctx, loc, gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event) if err != nil { if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) { // the resource was not found, we assume it is deleted