From 964c8d7d6522a3bebcf5e8de61f2bb914f987e50 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Fri, 16 May 2025 22:50:13 +0200 Subject: [PATCH] fix(model): correct bufferpool handling; simplify (#10113) The copier routine refactor resulted in bad buffer pool handling, putting a buffer back into the pool twice. This simplifies and removes the danger prone Upgrade() method. --- lib/model/folder_sendrecv.go | 15 +++++++-------- lib/protocol/bufferpool.go | 16 +--------------- lib/protocol/protocol.go | 13 +++++-------- 3 files changed, 13 insertions(+), 31 deletions(-) diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index eb1cf83b4..8ab78400f 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -1286,11 +1286,6 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch // copierRoutine reads copierStates until the in channel closes and performs // the relevant copies when possible, or passes it to the puller routine. func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { - buf := protocol.BufferPool.Get(protocol.MinBlockSize) - defer func() { - protocol.BufferPool.Put(buf) - }() - otherFolderFilesystems := make(map[string]fs.Filesystem) for folder, cfg := range f.model.cfg.Folders() { if folder == f.ID { @@ -1333,7 +1328,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch continue } - if f.copyBlock(block, state, otherFolderFilesystems, buf) { + if f.copyBlock(block, state, otherFolderFilesystems) { state.copyDone(block) continue } @@ -1362,8 +1357,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch } // Returns true when the block was successfully copied. -func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem, buf []byte) bool { - buf = protocol.BufferPool.Upgrade(buf, int(block.Size)) +func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool { + buf := protocol.BufferPool.Get(int(block.Size)) + defer protocol.BufferPool.Put(buf) // Hope that it's usually in the same folder, so start with that // one. Also possibly more efficient copy (same filesystem). @@ -1978,6 +1974,9 @@ func (f *sendReceiveFolder) deleteDirOnDiskHandleChildren(dir string, scanChan c return nil } cf, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, path) + if err != nil { + return err + } switch { case !ok || cf.IsDeleted(): // Something appeared in the dir that we either are not diff --git a/lib/protocol/bufferpool.go b/lib/protocol/bufferpool.go index cf7b797b6..fa8722d65 100644 --- a/lib/protocol/bufferpool.go +++ b/lib/protocol/bufferpool.go @@ -59,7 +59,7 @@ func (p *bufferPool) Get(size int) []byte { } // Put makes the given byte slice available again in the global pool. -// You must only Put() slices that were returned by Get() or Upgrade(). +// You must only Put() slices that were returned by Get(). func (p *bufferPool) Put(bs []byte) { // Don't buffer slices outside of our pool range if cap(bs) > MaxBlockSize || cap(bs) < MinBlockSize { @@ -72,20 +72,6 @@ func (p *bufferPool) Put(bs []byte) { p.pools[bkt].Put(&bs) } -// Upgrade grows the buffer to the requested size, while attempting to reuse -// it if possible. -func (p *bufferPool) Upgrade(bs []byte, size int) []byte { - if cap(bs) >= size { - // Reslicing is enough, lets go! - return bs[:size] - } - - // It was too small. But it pack into the pool and try to get another - // buffer. - p.Put(bs) - return p.Get(size) -} - // getBucketForLen returns the bucket where we should get a slice of a // certain length. Each bucket is guaranteed to hold slices that are // precisely the block size for that bucket, so if the block size is larger diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index bf35f50b6..5a0390172 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -512,8 +512,9 @@ func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []by // Then comes the message buf := BufferPool.Get(int(msgLen)) + defer BufferPool.Put(buf) + if _, err := io.ReadFull(c.cr, buf); err != nil { - BufferPool.Put(buf) return nil, fmt.Errorf("reading message: %w", err) } @@ -525,7 +526,6 @@ func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []by case bep.MessageCompression_MESSAGE_COMPRESSION_LZ4: decomp, err := lz4Decompress(buf) - BufferPool.Put(buf) if err != nil { return nil, fmt.Errorf("decompressing message: %w", err) } @@ -541,14 +541,11 @@ func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []by msg, err := newMessage(hdr.Type) if err != nil { - BufferPool.Put(buf) return nil, err } if err := proto.Unmarshal(buf, msg); err != nil { - BufferPool.Put(buf) return nil, fmt.Errorf("unmarshalling message: %w", err) } - BufferPool.Put(buf) return msg, nil } @@ -567,16 +564,16 @@ func (c *rawConnection) readHeader(fourByteBuf []byte) (*bep.Header, error) { // Then comes the header buf := BufferPool.Get(int(hdrLen)) + defer BufferPool.Put(buf) + if _, err := io.ReadFull(c.cr, buf); err != nil { - BufferPool.Put(buf) return nil, fmt.Errorf("reading header: %w", err) } var hdr bep.Header err := proto.Unmarshal(buf, &hdr) - BufferPool.Put(buf) if err != nil { - return nil, fmt.Errorf("unmarshalling header: %w %x", err, buf) + return nil, fmt.Errorf("unmarshalling header: %w", err) } metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(2 + len(buf)))