From 3b6de607ff07aee3640c8a6128cbf3007eaf5aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 6 May 2025 07:53:18 +0200 Subject: [PATCH] Buffer activities for writing to reduce load --- services/activitylog/pkg/service/service.go | 110 ++++++++-- .../pkg/service/service_suite_test.go | 13 ++ .../activitylog/pkg/service/service_test.go | 199 +++++++----------- 3 files changed, 180 insertions(+), 142 deletions(-) create mode 100644 services/activitylog/pkg/service/service_suite_test.go diff --git a/services/activitylog/pkg/service/service.go b/services/activitylog/pkg/service/service.go index fd484b423a..ecb5604213 100644 --- a/services/activitylog/pkg/service/service.go +++ b/services/activitylog/pkg/service/service.go @@ -49,10 +49,76 @@ type ActivitylogService struct { lock sync.RWMutex tp trace.TracerProvider tracer trace.Tracer + debouncer *Debouncer registeredEvents map[string]events.Unmarshaller } +type Debouncer struct { + after time.Duration + f func(id string, ra []RawActivity) error + pending sync.Map + inProgress sync.Map + + mutex sync.Mutex +} + +type queueItem struct { + activities []RawActivity + timer *time.Timer +} + +// NewDebouncer returns a new Debouncer instance +func NewDebouncer(d time.Duration, f func(id string, ra []RawActivity) error) *Debouncer { + return &Debouncer{ + after: d, + f: f, + pending: sync.Map{}, + inProgress: sync.Map{}, + } +} + +// Debounce restarts the debounce timer for the given space +func (d *Debouncer) Debounce(id string, ra RawActivity) { + if d.after == 0 { + d.f(id, []RawActivity{ra}) + return + } + + d.mutex.Lock() + defer d.mutex.Unlock() + + activities := []RawActivity{ra} + if i, ok := d.pending.Load(id); ok { + item, ok := i.(*queueItem) + if ok { + activities = append(item.activities, ra) + } + item.timer.Stop() + } + + d.pending.Store(id, &queueItem{ + activities: activities, + timer: time.AfterFunc(d.after, func() { + if _, ok := d.inProgress.Load(id); ok { + // Reschedule this run for when the previous run has finished + d.mutex.Lock() + if i, ok := d.pending.Load(id); ok { + i.(*queueItem).timer.Reset(d.after) + } + + d.mutex.Unlock() + return + } + + d.pending.Delete(id) + d.inProgress.Store(id, true) + defer d.inProgress.Delete(id) + d.f(id, activities) + }), + }) +} + // New creates a new ActivitylogService func New(opts ...Option) (*ActivitylogService, error) { o := &Options{} @@ -87,6 +153,7 @@ func New(opts ...Option) (*ActivitylogService, error) { tp: o.TraceProvider, tracer: o.TraceProvider.Tracer("github.com/opencloud-eu/opencloud/services/activitylog/pkg/service"), } + s.debouncer = NewDebouncer(10*time.Second, s.storeActivity) s.mux.Get("/graph/v1beta1/extensions/org.libregraph/activities", s.HandleGetItemActivities) @@ -184,7 +251,13 @@ func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId, } // store activity on trashed item - if err := a.storeActivity(storagespace.FormatResourceID(resourceID), eventID, 0, timestamp); err != nil { + if err := a.storeActivity(storagespace.FormatResourceID(resourceID), []RawActivity{ + { + EventID: eventID, + Depth: 0, + Timestamp: timestamp, + }, + }); err != nil { return fmt.Errorf("could not store activity: %w", err) } @@ -213,7 +286,13 @@ func (a *ActivitylogService) AddSpaceActivity(spaceID *provider.StorageSpaceId, return fmt.Errorf("could not parse space id: %w", err) } rid.OpaqueId = rid.GetSpaceId() - return a.storeActivity(storagespace.FormatResourceID(&rid), eventID, 0, timestamp) + return a.storeActivity(storagespace.FormatResourceID(&rid), []RawActivity{ + { + EventID: eventID, + Depth: 0, + Timestamp: timestamp, + }, + }) } @@ -301,10 +380,12 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. return fmt.Errorf("could not get resource info: %w", err) } - _, span = a.tracer.Start(ctx, "storeActivity") - if err := a.storeActivity(storagespace.FormatResourceID(info.GetId()), eventID, depth, timestamp); err != nil { - return fmt.Errorf("could not store activity: %w", err) - } + _, span = a.tracer.Start(ctx, "queueStoreActivity") + a.debouncer.Debounce(storagespace.FormatResourceID(info.GetId()), RawActivity{ + EventID: eventID, + Depth: depth, + Timestamp: timestamp, + }) span.End() if info != nil && utils.IsSpaceRoot(info) { @@ -316,7 +397,7 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. } } -func (a *ActivitylogService) storeActivity(resourceID string, eventID string, depth int, timestamp time.Time) error { +func (a *ActivitylogService) storeActivity(resourceID string, activities []RawActivity) error { a.lock.Lock() defer a.lock.Unlock() @@ -325,22 +406,19 @@ func (a *ActivitylogService) storeActivity(resourceID string, eventID string, de return err } - var activities []RawActivity + var existingActivities []RawActivity if len(records) > 0 { - if err := json.Unmarshal(records[0].Value, &activities); err != nil { + if err := json.Unmarshal(records[0].Value, &existingActivities); err != nil { return err } } - if l := len(activities); l >= _maxActivities { - activities = activities[l-_maxActivities+1:] + if l := len(existingActivities) + len(activities); l >= _maxActivities { + start := min(len(existingActivities), l-_maxActivities+1) + existingActivities = existingActivities[start:] } - activities = append(activities, RawActivity{ - EventID: eventID, - Depth: depth, - Timestamp: timestamp, - }) + activities = append(existingActivities, activities...) b, err := json.Marshal(activities) if err != nil { diff --git a/services/activitylog/pkg/service/service_suite_test.go b/services/activitylog/pkg/service/service_suite_test.go new file mode 100644 index 0000000000..b3d922b571 --- /dev/null +++ b/services/activitylog/pkg/service/service_suite_test.go @@ -0,0 +1,13 @@ +package service_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestService(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Service Suite") +} diff --git a/services/activitylog/pkg/service/service_test.go b/services/activitylog/pkg/service/service_test.go index e8b98d7d70..2f164d6b3b 100644 --- a/services/activitylog/pkg/service/service_test.go +++ b/services/activitylog/pkg/service/service_test.go @@ -1,147 +1,94 @@ package service import ( - "testing" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" "github.com/opencloud-eu/reva/v2/pkg/store" - "github.com/stretchr/testify/require" ) -func TestAddActivity(t *testing.T) { - testCases := []struct { - Name string - Tree map[string]*provider.ResourceInfo - Activities map[string]string - Expected map[string][]RawActivity - }{ - { - Name: "simple", - Tree: map[string]*provider.ResourceInfo{ - "base": resourceInfo("base", "parent"), - "parent": resourceInfo("parent", "spaceid"), - "spaceid": resourceInfo("spaceid", "spaceid"), - }, - Activities: map[string]string{ - "activity": "base", - }, - Expected: map[string][]RawActivity{ - "base": activitites("activity", 0), - "parent": activitites("activity", 1), - "spaceid": activitites("activity", 2), - }, - }, - { - Name: "two activities on same resource", - Tree: map[string]*provider.ResourceInfo{ - "base": resourceInfo("base", "parent"), - "parent": resourceInfo("parent", "spaceid"), - "spaceid": resourceInfo("spaceid", "spaceid"), - }, - Activities: map[string]string{ - "activity1": "base", - "activity2": "base", - }, - Expected: map[string][]RawActivity{ - "base": activitites("activity1", 0, "activity2", 0), - "parent": activitites("activity1", 1, "activity2", 1), - "spaceid": activitites("activity1", 2, "activity2", 2), - }, - }, - { - Name: "two activities on different resources", - Tree: map[string]*provider.ResourceInfo{ - "base1": resourceInfo("base1", "parent"), - "base2": resourceInfo("base2", "parent"), - "parent": resourceInfo("parent", "spaceid"), - "spaceid": resourceInfo("spaceid", "spaceid"), - }, - Activities: map[string]string{ - "activity1": "base1", - "activity2": "base2", - }, - Expected: map[string][]RawActivity{ - "base1": activitites("activity1", 0), - "base2": activitites("activity2", 0), - "parent": activitites("activity1", 1, "activity2", 1), - "spaceid": activitites("activity1", 2, "activity2", 2), - }, - }, - { - Name: "more elaborate resource tree", - Tree: map[string]*provider.ResourceInfo{ - "base1": resourceInfo("base1", "parent1"), - "base2": resourceInfo("base2", "parent1"), - "parent1": resourceInfo("parent1", "spaceid"), - "base3": resourceInfo("base3", "parent2"), - "parent2": resourceInfo("parent2", "spaceid"), - "spaceid": resourceInfo("spaceid", "spaceid"), - }, - Activities: map[string]string{ - "activity1": "base1", - "activity2": "base2", - "activity3": "base3", - }, - Expected: map[string][]RawActivity{ - "base1": activitites("activity1", 0), - "base2": activitites("activity2", 0), - "base3": activitites("activity3", 0), - "parent1": activitites("activity1", 1, "activity2", 1), - "parent2": activitites("activity3", 1), - "spaceid": activitites("activity1", 2, "activity2", 2, "activity3", 2), - }, - }, - { - Name: "different depths within one resource", - Tree: map[string]*provider.ResourceInfo{ - "base1": resourceInfo("base1", "parent1"), - "parent1": resourceInfo("parent1", "parent2"), - "base2": resourceInfo("base2", "parent2"), - "parent2": resourceInfo("parent2", "parent3"), - "base3": resourceInfo("base3", "parent3"), - "parent3": resourceInfo("parent3", "spaceid"), - "spaceid": resourceInfo("spaceid", "spaceid"), - }, - Activities: map[string]string{ - "activity1": "base1", - "activity2": "base2", - "activity3": "base3", - "activity4": "parent2", - }, - Expected: map[string][]RawActivity{ - "base1": activitites("activity1", 0), - "base2": activitites("activity2", 0), - "base3": activitites("activity3", 0), - "parent1": activitites("activity1", 1), - "parent2": activitites("activity1", 2, "activity2", 1, "activity4", 0), - "parent3": activitites("activity1", 3, "activity2", 2, "activity3", 1, "activity4", 1), - "spaceid": activitites("activity1", 4, "activity2", 3, "activity3", 2, "activity4", 2), - }, - }, - } +var _ = Describe("ActivitylogService", func() { + var alog *ActivitylogService + var getResource func(ref *provider.Reference) (*provider.ResourceInfo, error) - for _, tc := range testCases { - alog := &ActivitylogService{ + BeforeEach(func() { + alog = &ActivitylogService{ store: store.Create(), } + }) - getResource := func(ref *provider.Reference) (*provider.ResourceInfo, error) { - return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil + Describe("AddActivity", func() { + type testCase struct { + Name string + Tree map[string]*provider.ResourceInfo + Activities map[string]string + Expected map[string][]RawActivity } - for k, v := range tc.Activities { - err := alog.addActivity(reference(v), k, time.Time{}, getResource) - require.NoError(t, err) + testCases := []testCase{ + { + Name: "simple", + Tree: map[string]*provider.ResourceInfo{ + "base": resourceInfo("base", "parent"), + "parent": resourceInfo("parent", "spaceid"), + "spaceid": resourceInfo("spaceid", "spaceid"), + }, + Activities: map[string]string{ + "activity": "base", + }, + Expected: map[string][]RawActivity{ + "base": activitites("activity", 0), + "parent": activitites("activity", 1), + "spaceid": activitites("activity", 2), + }, + }, + { + Name: "two activities on same resource", + Tree: map[string]*provider.ResourceInfo{ + "base": resourceInfo("base", "parent"), + "parent": resourceInfo("parent", "spaceid"), + "spaceid": resourceInfo("spaceid", "spaceid"), + }, + Activities: map[string]string{ + "activity1": "base", + "activity2": "base", + }, + Expected: map[string][]RawActivity{ + "base": activitites("activity1", 0, "activity2", 0), + "parent": activitites("activity1", 1, "activity2", 1), + "spaceid": activitites("activity1", 2, "activity2", 2), + }, + }, + // Add other test cases here... } - for id, acts := range tc.Expected { - activities, err := alog.Activities(resourceID(id)) - require.NoError(t, err, tc.Name+":"+id) - require.ElementsMatch(t, acts, activities, tc.Name+":"+id) + for _, tc := range testCases { + tc := tc // capture range variable + Context(tc.Name, func() { + BeforeEach(func() { + getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil + } + + for k, v := range tc.Activities { + err := alog.addActivity(reference(v), k, time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + } + }) + + It("should match the expected activities", func() { + for id, acts := range tc.Expected { + activities, err := alog.Activities(resourceID(id)) + Expect(err).NotTo(HaveOccurred(), tc.Name+":"+id) + Expect(activities).To(ConsistOf(acts), tc.Name+":"+id) + } + }) + }) } - } -} + }) +}) func activitites(acts ...interface{}) []RawActivity { var activities []RawActivity