Buffer activities for writing to reduce load

This commit is contained in:
André Duffeck
2025-05-06 07:53:18 +02:00
parent 94a973f8a3
commit 3b6de607ff
3 changed files with 180 additions and 142 deletions

View File

@@ -49,10 +49,76 @@ type ActivitylogService struct {
lock sync.RWMutex
tp trace.TracerProvider
tracer trace.Tracer
debouncer *Debouncer
registeredEvents map[string]events.Unmarshaller
}
type Debouncer struct {
after time.Duration
f func(id string, ra []RawActivity) error
pending sync.Map
inProgress sync.Map
mutex sync.Mutex
}
type queueItem struct {
activities []RawActivity
timer *time.Timer
}
// NewDebouncer returns a new Debouncer instance
func NewDebouncer(d time.Duration, f func(id string, ra []RawActivity) error) *Debouncer {
return &Debouncer{
after: d,
f: f,
pending: sync.Map{},
inProgress: sync.Map{},
}
}
// Debounce restarts the debounce timer for the given space
func (d *Debouncer) Debounce(id string, ra RawActivity) {
if d.after == 0 {
d.f(id, []RawActivity{ra})
return
}
d.mutex.Lock()
defer d.mutex.Unlock()
activities := []RawActivity{ra}
if i, ok := d.pending.Load(id); ok {
item, ok := i.(*queueItem)
if ok {
activities = append(item.activities, ra)
}
item.timer.Stop()
}
d.pending.Store(id, &queueItem{
activities: activities,
timer: time.AfterFunc(d.after, func() {
if _, ok := d.inProgress.Load(id); ok {
// Reschedule this run for when the previous run has finished
d.mutex.Lock()
if i, ok := d.pending.Load(id); ok {
i.(*queueItem).timer.Reset(d.after)
}
d.mutex.Unlock()
return
}
d.pending.Delete(id)
d.inProgress.Store(id, true)
defer d.inProgress.Delete(id)
d.f(id, activities)
}),
})
}
// New creates a new ActivitylogService
func New(opts ...Option) (*ActivitylogService, error) {
o := &Options{}
@@ -87,6 +153,7 @@ func New(opts ...Option) (*ActivitylogService, error) {
tp: o.TraceProvider,
tracer: o.TraceProvider.Tracer("github.com/opencloud-eu/opencloud/services/activitylog/pkg/service"),
}
s.debouncer = NewDebouncer(10*time.Second, s.storeActivity)
s.mux.Get("/graph/v1beta1/extensions/org.libregraph/activities", s.HandleGetItemActivities)
@@ -184,7 +251,13 @@ func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId,
}
// store activity on trashed item
if err := a.storeActivity(storagespace.FormatResourceID(resourceID), eventID, 0, timestamp); err != nil {
if err := a.storeActivity(storagespace.FormatResourceID(resourceID), []RawActivity{
{
EventID: eventID,
Depth: 0,
Timestamp: timestamp,
},
}); err != nil {
return fmt.Errorf("could not store activity: %w", err)
}
@@ -213,7 +286,13 @@ func (a *ActivitylogService) AddSpaceActivity(spaceID *provider.StorageSpaceId,
return fmt.Errorf("could not parse space id: %w", err)
}
rid.OpaqueId = rid.GetSpaceId()
return a.storeActivity(storagespace.FormatResourceID(&rid), eventID, 0, timestamp)
return a.storeActivity(storagespace.FormatResourceID(&rid), []RawActivity{
{
EventID: eventID,
Depth: 0,
Timestamp: timestamp,
},
})
}
@@ -301,10 +380,12 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.
return fmt.Errorf("could not get resource info: %w", err)
}
_, span = a.tracer.Start(ctx, "storeActivity")
if err := a.storeActivity(storagespace.FormatResourceID(info.GetId()), eventID, depth, timestamp); err != nil {
return fmt.Errorf("could not store activity: %w", err)
}
_, span = a.tracer.Start(ctx, "queueStoreActivity")
a.debouncer.Debounce(storagespace.FormatResourceID(info.GetId()), RawActivity{
EventID: eventID,
Depth: depth,
Timestamp: timestamp,
})
span.End()
if info != nil && utils.IsSpaceRoot(info) {
@@ -316,7 +397,7 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.
}
}
func (a *ActivitylogService) storeActivity(resourceID string, eventID string, depth int, timestamp time.Time) error {
func (a *ActivitylogService) storeActivity(resourceID string, activities []RawActivity) error {
a.lock.Lock()
defer a.lock.Unlock()
@@ -325,22 +406,19 @@ func (a *ActivitylogService) storeActivity(resourceID string, eventID string, de
return err
}
var activities []RawActivity
var existingActivities []RawActivity
if len(records) > 0 {
if err := json.Unmarshal(records[0].Value, &activities); err != nil {
if err := json.Unmarshal(records[0].Value, &existingActivities); err != nil {
return err
}
}
if l := len(activities); l >= _maxActivities {
activities = activities[l-_maxActivities+1:]
if l := len(existingActivities) + len(activities); l >= _maxActivities {
start := min(len(existingActivities), l-_maxActivities+1)
existingActivities = existingActivities[start:]
}
activities = append(activities, RawActivity{
EventID: eventID,
Depth: depth,
Timestamp: timestamp,
})
activities = append(existingActivities, activities...)
b, err := json.Marshal(activities)
if err != nil {

View File

@@ -0,0 +1,13 @@
package service_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestService(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Service Suite")
}

View File

@@ -1,147 +1,94 @@
package service
import (
"testing"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/opencloud-eu/reva/v2/pkg/store"
"github.com/stretchr/testify/require"
)
func TestAddActivity(t *testing.T) {
testCases := []struct {
Name string
Tree map[string]*provider.ResourceInfo
Activities map[string]string
Expected map[string][]RawActivity
}{
{
Name: "simple",
Tree: map[string]*provider.ResourceInfo{
"base": resourceInfo("base", "parent"),
"parent": resourceInfo("parent", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity": "base",
},
Expected: map[string][]RawActivity{
"base": activitites("activity", 0),
"parent": activitites("activity", 1),
"spaceid": activitites("activity", 2),
},
},
{
Name: "two activities on same resource",
Tree: map[string]*provider.ResourceInfo{
"base": resourceInfo("base", "parent"),
"parent": resourceInfo("parent", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity1": "base",
"activity2": "base",
},
Expected: map[string][]RawActivity{
"base": activitites("activity1", 0, "activity2", 0),
"parent": activitites("activity1", 1, "activity2", 1),
"spaceid": activitites("activity1", 2, "activity2", 2),
},
},
{
Name: "two activities on different resources",
Tree: map[string]*provider.ResourceInfo{
"base1": resourceInfo("base1", "parent"),
"base2": resourceInfo("base2", "parent"),
"parent": resourceInfo("parent", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity1": "base1",
"activity2": "base2",
},
Expected: map[string][]RawActivity{
"base1": activitites("activity1", 0),
"base2": activitites("activity2", 0),
"parent": activitites("activity1", 1, "activity2", 1),
"spaceid": activitites("activity1", 2, "activity2", 2),
},
},
{
Name: "more elaborate resource tree",
Tree: map[string]*provider.ResourceInfo{
"base1": resourceInfo("base1", "parent1"),
"base2": resourceInfo("base2", "parent1"),
"parent1": resourceInfo("parent1", "spaceid"),
"base3": resourceInfo("base3", "parent2"),
"parent2": resourceInfo("parent2", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity1": "base1",
"activity2": "base2",
"activity3": "base3",
},
Expected: map[string][]RawActivity{
"base1": activitites("activity1", 0),
"base2": activitites("activity2", 0),
"base3": activitites("activity3", 0),
"parent1": activitites("activity1", 1, "activity2", 1),
"parent2": activitites("activity3", 1),
"spaceid": activitites("activity1", 2, "activity2", 2, "activity3", 2),
},
},
{
Name: "different depths within one resource",
Tree: map[string]*provider.ResourceInfo{
"base1": resourceInfo("base1", "parent1"),
"parent1": resourceInfo("parent1", "parent2"),
"base2": resourceInfo("base2", "parent2"),
"parent2": resourceInfo("parent2", "parent3"),
"base3": resourceInfo("base3", "parent3"),
"parent3": resourceInfo("parent3", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity1": "base1",
"activity2": "base2",
"activity3": "base3",
"activity4": "parent2",
},
Expected: map[string][]RawActivity{
"base1": activitites("activity1", 0),
"base2": activitites("activity2", 0),
"base3": activitites("activity3", 0),
"parent1": activitites("activity1", 1),
"parent2": activitites("activity1", 2, "activity2", 1, "activity4", 0),
"parent3": activitites("activity1", 3, "activity2", 2, "activity3", 1, "activity4", 1),
"spaceid": activitites("activity1", 4, "activity2", 3, "activity3", 2, "activity4", 2),
},
},
}
var _ = Describe("ActivitylogService", func() {
var alog *ActivitylogService
var getResource func(ref *provider.Reference) (*provider.ResourceInfo, error)
for _, tc := range testCases {
alog := &ActivitylogService{
BeforeEach(func() {
alog = &ActivitylogService{
store: store.Create(),
}
})
getResource := func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil
Describe("AddActivity", func() {
type testCase struct {
Name string
Tree map[string]*provider.ResourceInfo
Activities map[string]string
Expected map[string][]RawActivity
}
for k, v := range tc.Activities {
err := alog.addActivity(reference(v), k, time.Time{}, getResource)
require.NoError(t, err)
testCases := []testCase{
{
Name: "simple",
Tree: map[string]*provider.ResourceInfo{
"base": resourceInfo("base", "parent"),
"parent": resourceInfo("parent", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity": "base",
},
Expected: map[string][]RawActivity{
"base": activitites("activity", 0),
"parent": activitites("activity", 1),
"spaceid": activitites("activity", 2),
},
},
{
Name: "two activities on same resource",
Tree: map[string]*provider.ResourceInfo{
"base": resourceInfo("base", "parent"),
"parent": resourceInfo("parent", "spaceid"),
"spaceid": resourceInfo("spaceid", "spaceid"),
},
Activities: map[string]string{
"activity1": "base",
"activity2": "base",
},
Expected: map[string][]RawActivity{
"base": activitites("activity1", 0, "activity2", 0),
"parent": activitites("activity1", 1, "activity2", 1),
"spaceid": activitites("activity1", 2, "activity2", 2),
},
},
// Add other test cases here...
}
for id, acts := range tc.Expected {
activities, err := alog.Activities(resourceID(id))
require.NoError(t, err, tc.Name+":"+id)
require.ElementsMatch(t, acts, activities, tc.Name+":"+id)
for _, tc := range testCases {
tc := tc // capture range variable
Context(tc.Name, func() {
BeforeEach(func() {
getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil
}
for k, v := range tc.Activities {
err := alog.addActivity(reference(v), k, time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
}
})
It("should match the expected activities", func() {
for id, acts := range tc.Expected {
activities, err := alog.Activities(resourceID(id))
Expect(err).NotTo(HaveOccurred(), tc.Name+":"+id)
Expect(activities).To(ConsistOf(acts), tc.Name+":"+id)
}
})
})
}
}
}
})
})
func activitites(acts ...interface{}) []RawActivity {
var activities []RawActivity