mirror of
https://github.com/syncthing/syncthing.git
synced 2026-05-18 13:52:00 -04:00
fix(model): correct bufferpool handling; simplify (#10113)
The copier routine refactor resulted in bad buffer pool handling, putting a buffer back into the pool twice. This simplifies and removes the danger prone Upgrade() method.
This commit is contained in:
@@ -1286,11 +1286,6 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
|
||||
// copierRoutine reads copierStates until the in channel closes and performs
|
||||
// the relevant copies when possible, or passes it to the puller routine.
|
||||
func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
|
||||
buf := protocol.BufferPool.Get(protocol.MinBlockSize)
|
||||
defer func() {
|
||||
protocol.BufferPool.Put(buf)
|
||||
}()
|
||||
|
||||
otherFolderFilesystems := make(map[string]fs.Filesystem)
|
||||
for folder, cfg := range f.model.cfg.Folders() {
|
||||
if folder == f.ID {
|
||||
@@ -1333,7 +1328,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
||||
continue
|
||||
}
|
||||
|
||||
if f.copyBlock(block, state, otherFolderFilesystems, buf) {
|
||||
if f.copyBlock(block, state, otherFolderFilesystems) {
|
||||
state.copyDone(block)
|
||||
continue
|
||||
}
|
||||
@@ -1362,8 +1357,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
|
||||
}
|
||||
|
||||
// Returns true when the block was successfully copied.
|
||||
func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem, buf []byte) bool {
|
||||
buf = protocol.BufferPool.Upgrade(buf, int(block.Size))
|
||||
func (f *sendReceiveFolder) copyBlock(block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
|
||||
buf := protocol.BufferPool.Get(int(block.Size))
|
||||
defer protocol.BufferPool.Put(buf)
|
||||
|
||||
// Hope that it's usually in the same folder, so start with that
|
||||
// one. Also possibly more efficient copy (same filesystem).
|
||||
@@ -1978,6 +1974,9 @@ func (f *sendReceiveFolder) deleteDirOnDiskHandleChildren(dir string, scanChan c
|
||||
return nil
|
||||
}
|
||||
cf, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch {
|
||||
case !ok || cf.IsDeleted():
|
||||
// Something appeared in the dir that we either are not
|
||||
|
||||
@@ -59,7 +59,7 @@ func (p *bufferPool) Get(size int) []byte {
|
||||
}
|
||||
|
||||
// Put makes the given byte slice available again in the global pool.
|
||||
// You must only Put() slices that were returned by Get() or Upgrade().
|
||||
// You must only Put() slices that were returned by Get().
|
||||
func (p *bufferPool) Put(bs []byte) {
|
||||
// Don't buffer slices outside of our pool range
|
||||
if cap(bs) > MaxBlockSize || cap(bs) < MinBlockSize {
|
||||
@@ -72,20 +72,6 @@ func (p *bufferPool) Put(bs []byte) {
|
||||
p.pools[bkt].Put(&bs)
|
||||
}
|
||||
|
||||
// Upgrade grows the buffer to the requested size, while attempting to reuse
|
||||
// it if possible.
|
||||
func (p *bufferPool) Upgrade(bs []byte, size int) []byte {
|
||||
if cap(bs) >= size {
|
||||
// Reslicing is enough, lets go!
|
||||
return bs[:size]
|
||||
}
|
||||
|
||||
// It was too small. But it pack into the pool and try to get another
|
||||
// buffer.
|
||||
p.Put(bs)
|
||||
return p.Get(size)
|
||||
}
|
||||
|
||||
// getBucketForLen returns the bucket where we should get a slice of a
|
||||
// certain length. Each bucket is guaranteed to hold slices that are
|
||||
// precisely the block size for that bucket, so if the block size is larger
|
||||
|
||||
@@ -512,8 +512,9 @@ func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []by
|
||||
// Then comes the message
|
||||
|
||||
buf := BufferPool.Get(int(msgLen))
|
||||
defer BufferPool.Put(buf)
|
||||
|
||||
if _, err := io.ReadFull(c.cr, buf); err != nil {
|
||||
BufferPool.Put(buf)
|
||||
return nil, fmt.Errorf("reading message: %w", err)
|
||||
}
|
||||
|
||||
@@ -525,7 +526,6 @@ func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []by
|
||||
|
||||
case bep.MessageCompression_MESSAGE_COMPRESSION_LZ4:
|
||||
decomp, err := lz4Decompress(buf)
|
||||
BufferPool.Put(buf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decompressing message: %w", err)
|
||||
}
|
||||
@@ -541,14 +541,11 @@ func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []by
|
||||
|
||||
msg, err := newMessage(hdr.Type)
|
||||
if err != nil {
|
||||
BufferPool.Put(buf)
|
||||
return nil, err
|
||||
}
|
||||
if err := proto.Unmarshal(buf, msg); err != nil {
|
||||
BufferPool.Put(buf)
|
||||
return nil, fmt.Errorf("unmarshalling message: %w", err)
|
||||
}
|
||||
BufferPool.Put(buf)
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
@@ -567,16 +564,16 @@ func (c *rawConnection) readHeader(fourByteBuf []byte) (*bep.Header, error) {
|
||||
// Then comes the header
|
||||
|
||||
buf := BufferPool.Get(int(hdrLen))
|
||||
defer BufferPool.Put(buf)
|
||||
|
||||
if _, err := io.ReadFull(c.cr, buf); err != nil {
|
||||
BufferPool.Put(buf)
|
||||
return nil, fmt.Errorf("reading header: %w", err)
|
||||
}
|
||||
|
||||
var hdr bep.Header
|
||||
err := proto.Unmarshal(buf, &hdr)
|
||||
BufferPool.Put(buf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshalling header: %w %x", err, buf)
|
||||
return nil, fmt.Errorf("unmarshalling header: %w", err)
|
||||
}
|
||||
|
||||
metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(2 + len(buf)))
|
||||
|
||||
Reference in New Issue
Block a user