always select next before making calls

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2024-03-05 12:03:56 +01:00
parent 88f1c329a3
commit d9c9650afd
7 changed files with 63 additions and 11 deletions

View File

@@ -0,0 +1,5 @@
Bugfix: always select next before making calls
We now select the next client more often to spread out load
https://github.com/owncloud/ocis/pull/8578

View File

@@ -90,6 +90,11 @@ func (cl *ClientlogService) processEvent(event events.Event) {
return
}
gwc, err = cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}
var (
users []string
evType string
@@ -129,6 +134,11 @@ func (cl *ClientlogService) processEvent(event events.Event) {
InitiatorID: event.InitiatorID,
}
gwc, err := cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}
users, err = utils.GetSpaceMembers(ctx, e.ID.GetSpaceId(), gwc, utils.ViewerRole)
break
}

View File

@@ -79,6 +79,10 @@ func (s DrivesDriveItemService) UnmountShare(ctx context.Context, resourceID sto
}
// Find all accepted shares for this resource
gatewayClient, err = s.gatewaySelector.Next()
if err != nil {
return err
}
receivedSharesResponse, err := gatewayClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{
Filters: []*collaboration.Filter{
{
@@ -188,6 +192,10 @@ func (s DrivesDriveItemService) MountShare(ctx context.Context, resourceID stora
UpdateMask: updateMask,
}
gatewayClient, err = s.gatewaySelector.Next()
if err != nil {
return libregraph.DriveItem{}, err
}
updateReceivedShareResponse, err := gatewayClient.UpdateReceivedShare(ctx, updateReceivedShareRequest)
switch errCode := errorcode.FromCS3Status(updateReceivedShareResponse.GetStatus(), err); {
case errCode == nil:

View File

@@ -26,6 +26,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
return err
}
gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
listStorageSpacesResponse, err := gatewayClient.ListStorageSpaces(ctx, &apiProvider.ListStorageSpacesRequest{
Filters: []*apiProvider.ListStorageSpacesRequest_Filter{
{
@@ -49,6 +53,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
ResourceId: storageSpace.GetRoot(),
}
gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
listRecycleResponse, err := gatewayClient.ListRecycle(ctx, &apiProvider.ListRecycleRequest{Ref: storageSpaceReference})
if err != nil {
return err
@@ -60,6 +68,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
continue
}
gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
purgeRecycleResponse, err := gatewayClient.PurgeRecycle(ctx, &apiProvider.PurgeRecycleRequest{
Ref: storageSpaceReference,
Key: recycleItem.Key,

View File

@@ -15,6 +15,7 @@ import (
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/l10n"
@@ -51,7 +52,7 @@ type OC10Notification struct {
// Converter is responsible for converting eventhistory events to OC10Notifications
type Converter struct {
locale string
gwc gateway.GatewayAPIClient
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
serviceName string
translationPath string
defaultLanguage string
@@ -64,10 +65,10 @@ type Converter struct {
}
// NewConverter returns a new Converter
func NewConverter(ctx context.Context, loc string, gwc gateway.GatewayAPIClient, name, translationPath, defaultLanguage string) *Converter {
func NewConverter(ctx context.Context, loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], name, translationPath, defaultLanguage string) *Converter {
return &Converter{
locale: loc,
gwc: gwc,
gatewaySelector: gatewaySelector,
serviceName: name,
translationPath: translationPath,
defaultLanguage: defaultLanguage,
@@ -319,7 +320,11 @@ func (c *Converter) getSpace(ctx context.Context, spaceID string) (*storageprovi
if space, ok := c.spaces[spaceID]; ok {
return space, nil
}
space, err := utils.GetSpace(ctx, spaceID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
space, err := utils.GetSpace(ctx, spaceID, gwc)
if err == nil {
c.spaces[spaceID] = space
}
@@ -330,7 +335,11 @@ func (c *Converter) getResource(ctx context.Context, resourceID *storageprovider
if r, ok := c.resources[resourceID.GetOpaqueId()]; ok {
return r, nil
}
resource, err := utils.GetResourceByID(ctx, resourceID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
resource, err := utils.GetResourceByID(ctx, resourceID, gwc)
if err == nil {
c.resources[resourceID.GetOpaqueId()] = resource
}
@@ -341,7 +350,11 @@ func (c *Converter) getUser(_ context.Context, userID *user.UserId) (*user.User,
if u, ok := c.users[userID.GetOpaqueId()]; ok {
return u, nil
}
usr, err := utils.GetUser(userID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
usr, err := utils.GetUser(userID, gwc)
if err == nil {
c.users[userID.GetOpaqueId()] = usr
}

View File

@@ -51,7 +51,6 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.WriteHeader(http.StatusInternalServerError)
return
}
ctx, err = utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cant get service account")
@@ -59,7 +58,7 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
return
}
conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage)
conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage)
var outdatedEvents []string
resp := GetEventResponseOC10{}

View File

@@ -122,6 +122,11 @@ func (ul *UserlogService) processEvent(event events.Event) {
return
}
gwc, err = ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cannot get gateway client")
return
}
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
@@ -196,7 +201,7 @@ func (ul *UserlogService) processEvent(event events.Event) {
// IV) send sses
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(ctx, users, event, gwc); err != nil {
if err := ul.sendSSE(ctx, users, event, ul.gatewaySelector); err != nil {
ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
@@ -337,7 +342,7 @@ func (ul *UserlogService) addEventToUser(userid string, event events.Event) erro
})
}
func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error {
func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) error {
m := make(map[string]events.SendSSE)
for _, userid := range userIDs {
@@ -348,7 +353,7 @@ func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event e
continue
}
ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
ev, err := NewConverter(ctx, loc, gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
if err != nil {
if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) {
// the resource was not found, we assume it is deleted