mirror of
https://github.com/syncthing/syncthing.git
synced 2025-12-23 22:18:14 -05:00
1152 lines
31 KiB
Go
1152 lines
31 KiB
Go
// Copyright (C) 2014 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
//go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
|
|
|
|
// Prevents import loop, for internal testing
|
|
//go:generate counterfeiter -o mocked_connection_info_test.go --fake-name mockedConnectionInfo . ConnectionInfo
|
|
//go:generate go run ../../script/prune_mocks.go -t mocked_connection_info_test.go
|
|
|
|
//go:generate counterfeiter -o mocks/connection_info.go --fake-name ConnectionInfo . ConnectionInfo
|
|
//go:generate counterfeiter -o mocks/connection.go --fake-name Connection . Connection
|
|
|
|
package protocol
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
lz4 "github.com/pierrec/lz4/v4"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/syncthing/syncthing/internal/gen/bep"
|
|
"github.com/syncthing/syncthing/internal/protoutil"
|
|
)
|
|
|
|
const (
|
|
// Shifts
|
|
KiB = 10
|
|
MiB = 20
|
|
GiB = 30
|
|
)
|
|
|
|
const (
|
|
// MaxMessageLen is the largest message size allowed on the wire. (500 MB)
|
|
MaxMessageLen = 500 * 1000 * 1000
|
|
|
|
// MinBlockSize is the minimum block size allowed
|
|
MinBlockSize = 128 << KiB
|
|
|
|
// MaxBlockSize is the maximum block size allowed
|
|
MaxBlockSize = 16 << MiB
|
|
|
|
// DesiredPerFileBlocks is the number of blocks we aim for per file
|
|
DesiredPerFileBlocks = 2000
|
|
|
|
SyntheticDirectorySize = 128
|
|
|
|
// don't bother compressing messages smaller than this many bytes
|
|
compressionThreshold = 128
|
|
)
|
|
|
|
var errNotCompressible = errors.New("not compressible")
|
|
|
|
const (
|
|
stateInitial = iota
|
|
stateReady
|
|
)
|
|
|
|
var (
|
|
ErrClosed = errors.New("connection closed")
|
|
ErrTimeout = errors.New("read timeout")
|
|
errUnknownMessage = errors.New("unknown message")
|
|
errInvalidFilename = errors.New("filename is invalid")
|
|
errUncleanFilename = errors.New("filename not in canonical format")
|
|
errDeletedHasBlocks = errors.New("deleted file with non-empty block list")
|
|
errDirectoryHasBlocks = errors.New("directory with non-empty block list")
|
|
errFileHasNoBlocks = errors.New("file with empty block list")
|
|
)
|
|
|
|
type Model interface {
|
|
// An index was received from the peer device
|
|
Index(conn Connection, idx *Index) error
|
|
// An index update was received from the peer device
|
|
IndexUpdate(conn Connection, idxUp *IndexUpdate) error
|
|
// A request was made by the peer device
|
|
Request(conn Connection, req *Request) (RequestResponse, error)
|
|
// A cluster configuration message was received
|
|
ClusterConfig(conn Connection, config *ClusterConfig) error
|
|
// The peer device closed the connection or an error occurred
|
|
Closed(conn Connection, err error)
|
|
// The peer device sent progress updates for the files it is currently downloading
|
|
DownloadProgress(conn Connection, p *DownloadProgress) error
|
|
}
|
|
|
|
// rawModel is the Model interface, but without the initial Connection
|
|
// parameter. Internal use only.
|
|
type rawModel interface {
|
|
Index(*Index) error
|
|
IndexUpdate(*IndexUpdate) error
|
|
Request(*Request) (RequestResponse, error)
|
|
ClusterConfig(*ClusterConfig) error
|
|
Closed(err error)
|
|
DownloadProgress(*DownloadProgress) error
|
|
}
|
|
|
|
type RequestResponse interface {
|
|
Data() []byte
|
|
Close() // Must always be called once the byte slice is no longer in use
|
|
Wait() // Blocks until Close is called
|
|
}
|
|
|
|
type Connection interface {
|
|
// Send an Index message to the peer device. The message in the
|
|
// parameter may be altered by the connection and should not be used
|
|
// further by the caller.
|
|
Index(ctx context.Context, idx *Index) error
|
|
|
|
// Send an Index Update message to the peer device. The message in the
|
|
// parameter may be altered by the connection and should not be used
|
|
// further by the caller.
|
|
IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error
|
|
|
|
// Send a Request message to the peer device. The message in the
|
|
// parameter may be altered by the connection and should not be used
|
|
// further by the caller.
|
|
Request(ctx context.Context, req *Request) ([]byte, error)
|
|
|
|
// Send a Cluster Configuration message to the peer device. The message
|
|
// in the parameter may be altered by the connection and should not be
|
|
// used further by the caller.
|
|
// For any folder that must be encrypted for the connected device, the
|
|
// password must be provided.
|
|
ClusterConfig(config *ClusterConfig, passwords map[string]string)
|
|
|
|
// Send a Download Progress message to the peer device. The message in
|
|
// the parameter may be altered by the connection and should not be used
|
|
// further by the caller.
|
|
DownloadProgress(ctx context.Context, dp *DownloadProgress)
|
|
|
|
Start()
|
|
Close(err error)
|
|
DeviceID() DeviceID
|
|
Statistics() Statistics
|
|
Closed() <-chan struct{}
|
|
|
|
ConnectionInfo
|
|
}
|
|
|
|
type ConnectionInfo interface {
|
|
Type() string
|
|
Transport() string
|
|
IsLocal() bool
|
|
RemoteAddr() net.Addr
|
|
Priority() int
|
|
String() string
|
|
Crypto() string
|
|
EstablishedAt() time.Time
|
|
ConnectionID() string
|
|
}
|
|
|
|
type rawConnection struct {
|
|
ConnectionInfo
|
|
|
|
deviceID DeviceID
|
|
idString string
|
|
model rawModel
|
|
startTime time.Time
|
|
started chan struct{}
|
|
|
|
cr *countingReader
|
|
cw *countingWriter
|
|
closer io.Closer // Closing the underlying connection and thus cr and cw
|
|
|
|
awaitingMut sync.Mutex // Protects awaiting and nextID.
|
|
awaiting map[int]chan asyncResult
|
|
nextID int
|
|
|
|
idxMut sync.Mutex // ensures serialization of Index calls
|
|
|
|
inbox chan proto.Message
|
|
outbox chan asyncMessage
|
|
closeBox chan asyncMessage
|
|
clusterConfigBox chan *ClusterConfig
|
|
dispatcherLoopStopped chan struct{}
|
|
closed chan struct{}
|
|
closeOnce sync.Once
|
|
sendCloseOnce sync.Once
|
|
compression Compression
|
|
startStopMut sync.Mutex // start and stop must be serialized
|
|
|
|
loopWG sync.WaitGroup // Need to ensure no leftover routines in testing
|
|
}
|
|
|
|
type asyncResult struct {
|
|
val []byte
|
|
err error
|
|
}
|
|
|
|
type asyncMessage struct {
|
|
msg proto.Message
|
|
done chan struct{} // done closes when we're done sending the message
|
|
}
|
|
|
|
const (
|
|
// PingSendInterval is how often we make sure to send a message, by
|
|
// triggering pings if necessary.
|
|
PingSendInterval = 90 * time.Second
|
|
// ReceiveTimeout is the longest we'll wait for a message from the other
|
|
// side before closing the connection.
|
|
ReceiveTimeout = 300 * time.Second
|
|
)
|
|
|
|
// CloseTimeout is the longest we'll wait when trying to send the close
|
|
// message before just closing the connection.
|
|
// Should not be modified in production code, just for testing.
|
|
var CloseTimeout = 10 * time.Second
|
|
|
|
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, model Model, connInfo ConnectionInfo, compress Compression, keyGen *KeyGenerator) Connection {
|
|
// We create the wrapper for the model first, as it needs to be passed
|
|
// in at the lowest level in the stack. At the end of construction,
|
|
// before returning, we add the connection to cwm so that it can be used
|
|
// by the model.
|
|
cwm := &connectionWrappingModel{model: model}
|
|
|
|
// Encryption / decryption is first (outermost) before conversion to
|
|
// native path formats.
|
|
nm := makeNative(cwm)
|
|
em := newEncryptedModel(nm, keyGen)
|
|
|
|
// We do the wire format conversion first (outermost) so that the
|
|
// metadata is in wire format when it reaches the encryption step.
|
|
rc := newRawConnection(deviceID, reader, writer, closer, em, connInfo, compress)
|
|
ec := newEncryptedConnection(rc, rc, em.folderKeys, keyGen)
|
|
wc := wireFormatConnection{ec}
|
|
|
|
cwm.conn = wc
|
|
return wc
|
|
}
|
|
|
|
func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver rawModel, connInfo ConnectionInfo, compress Compression) *rawConnection {
|
|
idString := deviceID.String()
|
|
cr := &countingReader{Reader: reader, idString: idString}
|
|
cw := &countingWriter{Writer: writer, idString: idString}
|
|
registerDeviceMetrics(idString)
|
|
|
|
return &rawConnection{
|
|
ConnectionInfo: connInfo,
|
|
deviceID: deviceID,
|
|
idString: deviceID.String(),
|
|
model: receiver,
|
|
started: make(chan struct{}),
|
|
cr: cr,
|
|
cw: cw,
|
|
closer: closer,
|
|
awaiting: make(map[int]chan asyncResult),
|
|
inbox: make(chan proto.Message),
|
|
outbox: make(chan asyncMessage),
|
|
closeBox: make(chan asyncMessage),
|
|
clusterConfigBox: make(chan *ClusterConfig),
|
|
dispatcherLoopStopped: make(chan struct{}),
|
|
closed: make(chan struct{}),
|
|
compression: compress,
|
|
loopWG: sync.WaitGroup{},
|
|
}
|
|
}
|
|
|
|
// Start creates the goroutines for sending and receiving of messages. It must
|
|
// be called once after creating a connection. It should only be called once,
|
|
// subsequent calls will have no effect.
|
|
func (c *rawConnection) Start() {
|
|
c.startStopMut.Lock()
|
|
defer c.startStopMut.Unlock()
|
|
|
|
select {
|
|
case <-c.started:
|
|
return
|
|
case <-c.closed:
|
|
// we have already closed the connection before starting processing
|
|
// on it.
|
|
return
|
|
default:
|
|
}
|
|
|
|
c.loopWG.Add(5)
|
|
go func() {
|
|
c.readerLoop()
|
|
c.loopWG.Done()
|
|
}()
|
|
go func() {
|
|
err := c.dispatcherLoop()
|
|
c.Close(err)
|
|
c.loopWG.Done()
|
|
}()
|
|
go func() {
|
|
c.writerLoop()
|
|
c.loopWG.Done()
|
|
}()
|
|
go func() {
|
|
c.pingSender()
|
|
c.loopWG.Done()
|
|
}()
|
|
go func() {
|
|
c.pingReceiver()
|
|
c.loopWG.Done()
|
|
}()
|
|
|
|
c.startTime = time.Now().Truncate(time.Second)
|
|
close(c.started)
|
|
}
|
|
|
|
func (c *rawConnection) DeviceID() DeviceID {
|
|
return c.deviceID
|
|
}
|
|
|
|
// Index writes the list of file information to the connected peer device
|
|
func (c *rawConnection) Index(ctx context.Context, idx *Index) error {
|
|
select {
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
c.idxMut.Lock()
|
|
c.send(ctx, idx.toWire(), nil)
|
|
c.idxMut.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// IndexUpdate writes the list of file information to the connected peer device as an update
|
|
func (c *rawConnection) IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error {
|
|
select {
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
c.idxMut.Lock()
|
|
c.send(ctx, idxUp.toWire(), nil)
|
|
c.idxMut.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Request returns the bytes for the specified block after fetching them from the connected peer.
|
|
func (c *rawConnection) Request(ctx context.Context, req *Request) ([]byte, error) {
|
|
select {
|
|
case <-c.closed:
|
|
return nil, ErrClosed
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
rc := make(chan asyncResult, 1)
|
|
|
|
c.awaitingMut.Lock()
|
|
id := c.nextID
|
|
c.nextID++
|
|
if _, ok := c.awaiting[id]; ok {
|
|
c.awaitingMut.Unlock()
|
|
panic("id taken")
|
|
}
|
|
c.awaiting[id] = rc
|
|
c.awaitingMut.Unlock()
|
|
|
|
req.ID = id
|
|
ok := c.send(ctx, req.toWire(), nil)
|
|
if !ok {
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
select {
|
|
case res, ok := <-rc:
|
|
if !ok {
|
|
return nil, ErrClosed
|
|
}
|
|
return res.val, res.err
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// ClusterConfig sends the cluster configuration message to the peer.
|
|
func (c *rawConnection) ClusterConfig(config *ClusterConfig, _ map[string]string) {
|
|
select {
|
|
case c.clusterConfigBox <- config:
|
|
case <-c.closed:
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) Closed() <-chan struct{} {
|
|
return c.closed
|
|
}
|
|
|
|
// DownloadProgress sends the progress updates for the files that are currently being downloaded.
|
|
func (c *rawConnection) DownloadProgress(ctx context.Context, dp *DownloadProgress) {
|
|
c.send(ctx, dp.toWire(), nil)
|
|
}
|
|
|
|
func (c *rawConnection) ping() bool {
|
|
return c.send(context.Background(), &bep.Ping{}, nil)
|
|
}
|
|
|
|
func (c *rawConnection) readerLoop() {
|
|
fourByteBuf := make([]byte, 4)
|
|
for {
|
|
msg, err := c.readMessage(fourByteBuf)
|
|
if err != nil {
|
|
if err == errUnknownMessage {
|
|
// Unknown message types are skipped, for future extensibility.
|
|
continue
|
|
}
|
|
c.internalClose(err)
|
|
return
|
|
}
|
|
select {
|
|
case c.inbox <- msg:
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) dispatcherLoop() (err error) {
|
|
defer close(c.dispatcherLoopStopped)
|
|
var msg proto.Message
|
|
state := stateInitial
|
|
for {
|
|
select {
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
default:
|
|
}
|
|
select {
|
|
case msg = <-c.inbox:
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
}
|
|
|
|
metricDeviceRecvMessages.WithLabelValues(c.idString).Inc()
|
|
|
|
msgContext, err := messageContext(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("protocol error: %w", err)
|
|
}
|
|
l.Debugf("handle %v message", msgContext)
|
|
|
|
switch msg := msg.(type) {
|
|
case *bep.ClusterConfig:
|
|
if state == stateInitial {
|
|
state = stateReady
|
|
}
|
|
case *bep.Close:
|
|
return fmt.Errorf("closed by remote: %v", msg.Reason)
|
|
default:
|
|
if state != stateReady {
|
|
return newProtocolError(fmt.Errorf("invalid state %d", state), msgContext)
|
|
}
|
|
}
|
|
|
|
switch msg := msg.(type) {
|
|
case *bep.Request:
|
|
err = checkFilename(msg.Name)
|
|
}
|
|
if err != nil {
|
|
return newProtocolError(err, msgContext)
|
|
}
|
|
|
|
switch msg := msg.(type) {
|
|
case *bep.ClusterConfig:
|
|
err = c.model.ClusterConfig(clusterConfigFromWire(msg))
|
|
|
|
case *bep.Index:
|
|
idx := indexFromWire(msg)
|
|
if err := checkIndexConsistency(idx.Files); err != nil {
|
|
return newProtocolError(err, msgContext)
|
|
}
|
|
err = c.handleIndex(idx)
|
|
|
|
case *bep.IndexUpdate:
|
|
idxUp := indexUpdateFromWire(msg)
|
|
if err := checkIndexConsistency(idxUp.Files); err != nil {
|
|
return newProtocolError(err, msgContext)
|
|
}
|
|
err = c.handleIndexUpdate(idxUp)
|
|
|
|
case *bep.Request:
|
|
go c.handleRequest(requestFromWire(msg))
|
|
|
|
case *bep.Response:
|
|
c.handleResponse(responseFromWire(msg))
|
|
|
|
case *bep.DownloadProgress:
|
|
err = c.model.DownloadProgress(downloadProgressFromWire(msg))
|
|
}
|
|
if err != nil {
|
|
return newHandleError(err, msgContext)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) readMessage(fourByteBuf []byte) (proto.Message, error) {
|
|
hdr, err := c.readHeader(fourByteBuf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return c.readMessageAfterHeader(hdr, fourByteBuf)
|
|
}
|
|
|
|
func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []byte) (proto.Message, error) {
|
|
// First comes a 4 byte message length
|
|
|
|
if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil {
|
|
return nil, fmt.Errorf("reading message length: %w", err)
|
|
}
|
|
msgLen := int32(binary.BigEndian.Uint32(fourByteBuf))
|
|
if msgLen < 0 {
|
|
return nil, fmt.Errorf("negative message length %d", msgLen)
|
|
} else if msgLen > MaxMessageLen {
|
|
return nil, fmt.Errorf("message length %d exceeds maximum %d", msgLen, MaxMessageLen)
|
|
}
|
|
|
|
// Then comes the message
|
|
|
|
buf := BufferPool.Get(int(msgLen))
|
|
defer BufferPool.Put(buf)
|
|
|
|
if _, err := io.ReadFull(c.cr, buf); err != nil {
|
|
return nil, fmt.Errorf("reading message: %w", err)
|
|
}
|
|
|
|
// ... which might be compressed
|
|
|
|
switch hdr.Compression {
|
|
case bep.MessageCompression_MESSAGE_COMPRESSION_NONE:
|
|
// Nothing
|
|
|
|
case bep.MessageCompression_MESSAGE_COMPRESSION_LZ4:
|
|
decomp, err := lz4Decompress(buf)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decompressing message: %w", err)
|
|
}
|
|
buf = decomp
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown message compression %d", hdr.Compression)
|
|
}
|
|
|
|
// ... and is then unmarshalled
|
|
|
|
metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(4 + len(buf)))
|
|
|
|
msg, err := newMessage(hdr.Type)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := proto.Unmarshal(buf, msg); err != nil {
|
|
return nil, fmt.Errorf("unmarshalling message: %w", err)
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
func (c *rawConnection) readHeader(fourByteBuf []byte) (*bep.Header, error) {
|
|
// First comes a 2 byte header length
|
|
|
|
if _, err := io.ReadFull(c.cr, fourByteBuf[:2]); err != nil {
|
|
return nil, fmt.Errorf("reading length: %w", err)
|
|
}
|
|
hdrLen := int16(binary.BigEndian.Uint16(fourByteBuf))
|
|
if hdrLen < 0 {
|
|
return nil, fmt.Errorf("negative header length %d", hdrLen)
|
|
}
|
|
|
|
// Then comes the header
|
|
|
|
buf := BufferPool.Get(int(hdrLen))
|
|
defer BufferPool.Put(buf)
|
|
|
|
if _, err := io.ReadFull(c.cr, buf); err != nil {
|
|
return nil, fmt.Errorf("reading header: %w", err)
|
|
}
|
|
|
|
var hdr bep.Header
|
|
err := proto.Unmarshal(buf, &hdr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unmarshalling header: %w", err)
|
|
}
|
|
|
|
metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(2 + len(buf)))
|
|
|
|
return &hdr, nil
|
|
}
|
|
|
|
func (c *rawConnection) handleIndex(im *Index) error {
|
|
l.Debugf("Index(%v, %v, %d file)", c.deviceID, im.Folder, len(im.Files))
|
|
return c.model.Index(im)
|
|
}
|
|
|
|
func (c *rawConnection) handleIndexUpdate(im *IndexUpdate) error {
|
|
l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.deviceID, im.Folder, len(im.Files))
|
|
return c.model.IndexUpdate(im)
|
|
}
|
|
|
|
// checkIndexConsistency verifies a number of invariants on FileInfos received in
|
|
// index messages.
|
|
func checkIndexConsistency(fs []FileInfo) error {
|
|
for _, f := range fs {
|
|
if err := checkFileInfoConsistency(f); err != nil {
|
|
return fmt.Errorf("%q: %w", f.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkFileInfoConsistency verifies a number of invariants on the given FileInfo
|
|
func checkFileInfoConsistency(f FileInfo) error {
|
|
if err := checkFilename(f.Name); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch {
|
|
case f.Deleted && len(f.Blocks) != 0:
|
|
// Deleted files should have no blocks
|
|
return errDeletedHasBlocks
|
|
|
|
case f.Type == FileInfoTypeDirectory && len(f.Blocks) != 0:
|
|
// Directories should have no blocks
|
|
return errDirectoryHasBlocks
|
|
|
|
case !f.Deleted && !f.IsInvalid() && f.Type == FileInfoTypeFile && len(f.Blocks) == 0:
|
|
// Non-deleted, non-invalid files should have at least one block
|
|
return errFileHasNoBlocks
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkFilename verifies that the given filename is valid according to the
|
|
// spec on what's allowed over the wire. A filename failing this test is
|
|
// grounds for disconnecting the device.
|
|
func checkFilename(name string) error {
|
|
cleanedName := path.Clean(name)
|
|
if cleanedName != name {
|
|
// The filename on the wire should be in canonical format. If
|
|
// Clean() managed to clean it up, there was something wrong with
|
|
// it.
|
|
return errUncleanFilename
|
|
}
|
|
|
|
switch name {
|
|
case "", ".", "..":
|
|
// These names are always invalid.
|
|
return errInvalidFilename
|
|
}
|
|
if strings.HasPrefix(name, "/") {
|
|
// Names are folder relative, not absolute.
|
|
return errInvalidFilename
|
|
}
|
|
if strings.HasPrefix(name, "../") {
|
|
// Starting with a dotdot is not allowed. Any other dotdots would
|
|
// have been handled by the Clean() call at the top.
|
|
return errInvalidFilename
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *rawConnection) handleRequest(req *Request) {
|
|
res, err := c.model.Request(req)
|
|
if err != nil {
|
|
resp := &Response{
|
|
ID: req.ID,
|
|
Code: errorToCode(err),
|
|
}
|
|
c.send(context.Background(), resp.toWire(), nil)
|
|
return
|
|
}
|
|
done := make(chan struct{})
|
|
resp := &Response{
|
|
ID: req.ID,
|
|
Data: res.Data(),
|
|
Code: errorToCode(nil),
|
|
}
|
|
c.send(context.Background(), resp.toWire(), done)
|
|
<-done
|
|
res.Close()
|
|
}
|
|
|
|
func (c *rawConnection) handleResponse(resp *Response) {
|
|
c.awaitingMut.Lock()
|
|
if rc := c.awaiting[resp.ID]; rc != nil {
|
|
delete(c.awaiting, resp.ID)
|
|
rc <- asyncResult{resp.Data, codeToError(resp.Code)}
|
|
close(rc)
|
|
}
|
|
c.awaitingMut.Unlock()
|
|
}
|
|
|
|
func (c *rawConnection) send(ctx context.Context, msg proto.Message, done chan struct{}) bool {
|
|
select {
|
|
case c.outbox <- asyncMessage{msg, done}:
|
|
return true
|
|
case <-c.closed:
|
|
case <-ctx.Done():
|
|
}
|
|
if done != nil {
|
|
close(done)
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *rawConnection) writerLoop() {
|
|
select {
|
|
case cc := <-c.clusterConfigBox:
|
|
err := c.writeMessage(cc.toWire())
|
|
if err != nil {
|
|
c.internalClose(err)
|
|
return
|
|
}
|
|
case hm := <-c.closeBox:
|
|
_ = c.writeMessage(hm.msg)
|
|
close(hm.done)
|
|
return
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
for {
|
|
// When the connection is closing or closed, that should happen
|
|
// immediately, not compete with the (potentially very busy) outbox.
|
|
select {
|
|
case hm := <-c.closeBox:
|
|
_ = c.writeMessage(hm.msg)
|
|
close(hm.done)
|
|
return
|
|
case <-c.closed:
|
|
return
|
|
default:
|
|
}
|
|
select {
|
|
case cc := <-c.clusterConfigBox:
|
|
err := c.writeMessage(cc.toWire())
|
|
if err != nil {
|
|
c.internalClose(err)
|
|
return
|
|
}
|
|
case hm := <-c.outbox:
|
|
err := c.writeMessage(hm.msg)
|
|
if hm.done != nil {
|
|
close(hm.done)
|
|
}
|
|
if err != nil {
|
|
c.internalClose(err)
|
|
return
|
|
}
|
|
|
|
case hm := <-c.closeBox:
|
|
_ = c.writeMessage(hm.msg)
|
|
close(hm.done)
|
|
return
|
|
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) writeMessage(msg proto.Message) error {
|
|
msgContext, _ := messageContext(msg)
|
|
l.Debugf("Writing %v", msgContext)
|
|
|
|
defer func() {
|
|
metricDeviceSentMessages.WithLabelValues(c.idString).Inc()
|
|
}()
|
|
|
|
size := proto.Size(msg)
|
|
hdr := &bep.Header{
|
|
Type: typeOf(msg),
|
|
}
|
|
hdrSize := proto.Size(hdr)
|
|
if hdrSize > 1<<16-1 {
|
|
panic("impossibly large header")
|
|
}
|
|
|
|
overhead := 2 + hdrSize + 4
|
|
totSize := overhead + size
|
|
buf := BufferPool.Get(totSize)
|
|
defer BufferPool.Put(buf)
|
|
|
|
// Message
|
|
if _, err := protoutil.MarshalTo(buf[overhead:], msg); err != nil {
|
|
return fmt.Errorf("marshalling message: %w", err)
|
|
}
|
|
|
|
if c.shouldCompressMessage(msg) {
|
|
ok, err := c.writeCompressedMessage(msg, buf[overhead:])
|
|
if ok {
|
|
return err
|
|
}
|
|
}
|
|
|
|
metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(totSize))
|
|
|
|
// Header length
|
|
binary.BigEndian.PutUint16(buf, uint16(hdrSize))
|
|
// Header
|
|
if _, err := protoutil.MarshalTo(buf[2:], hdr); err != nil {
|
|
return fmt.Errorf("marshalling header: %w", err)
|
|
}
|
|
// Message length
|
|
binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(size))
|
|
|
|
n, err := c.cw.Write(buf)
|
|
|
|
l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err)
|
|
if err != nil {
|
|
return fmt.Errorf("writing message: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Write msg out compressed, given its uncompressed marshaled payload.
|
|
//
|
|
// The first return value indicates whether compression succeeded.
|
|
// If not, the caller should retry without compression.
|
|
func (c *rawConnection) writeCompressedMessage(msg proto.Message, marshaled []byte) (ok bool, err error) {
|
|
hdr := &bep.Header{
|
|
Type: typeOf(msg),
|
|
Compression: bep.MessageCompression_MESSAGE_COMPRESSION_LZ4,
|
|
}
|
|
hdrSize := proto.Size(hdr)
|
|
if hdrSize > 1<<16-1 {
|
|
panic("impossibly large header")
|
|
}
|
|
|
|
cOverhead := 2 + hdrSize + 4
|
|
|
|
metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(cOverhead + len(marshaled)))
|
|
|
|
// The compressed size may be at most n-n/32 = .96875*n bytes,
|
|
// I.e., if we can't save at least 3.125% bandwidth, we forgo compression.
|
|
// This number is arbitrary but cheap to compute.
|
|
maxCompressed := cOverhead + len(marshaled) - len(marshaled)/32
|
|
buf := BufferPool.Get(maxCompressed)
|
|
defer BufferPool.Put(buf)
|
|
|
|
compressedSize, err := lz4Compress(marshaled, buf[cOverhead:])
|
|
totSize := compressedSize + cOverhead
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
|
|
// Header length
|
|
binary.BigEndian.PutUint16(buf, uint16(hdrSize))
|
|
// Header
|
|
if _, err := protoutil.MarshalTo(buf[2:], hdr); err != nil {
|
|
return true, fmt.Errorf("marshalling header: %w", err)
|
|
}
|
|
// Message length
|
|
binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(compressedSize))
|
|
|
|
n, err := c.cw.Write(buf[:totSize])
|
|
l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, compressedSize, len(marshaled), err)
|
|
if err != nil {
|
|
return true, fmt.Errorf("writing message: %w", err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func typeOf(msg proto.Message) bep.MessageType {
|
|
switch msg.(type) {
|
|
case *bep.ClusterConfig:
|
|
return bep.MessageType_MESSAGE_TYPE_CLUSTER_CONFIG
|
|
case *bep.Index:
|
|
return bep.MessageType_MESSAGE_TYPE_INDEX
|
|
case *bep.IndexUpdate:
|
|
return bep.MessageType_MESSAGE_TYPE_INDEX_UPDATE
|
|
case *bep.Request:
|
|
return bep.MessageType_MESSAGE_TYPE_REQUEST
|
|
case *bep.Response:
|
|
return bep.MessageType_MESSAGE_TYPE_RESPONSE
|
|
case *bep.DownloadProgress:
|
|
return bep.MessageType_MESSAGE_TYPE_DOWNLOAD_PROGRESS
|
|
case *bep.Ping:
|
|
return bep.MessageType_MESSAGE_TYPE_PING
|
|
case *bep.Close:
|
|
return bep.MessageType_MESSAGE_TYPE_CLOSE
|
|
default:
|
|
panic("bug: unknown message type")
|
|
}
|
|
}
|
|
|
|
func newMessage(t bep.MessageType) (proto.Message, error) {
|
|
switch t {
|
|
case bep.MessageType_MESSAGE_TYPE_CLUSTER_CONFIG:
|
|
return new(bep.ClusterConfig), nil
|
|
case bep.MessageType_MESSAGE_TYPE_INDEX:
|
|
return new(bep.Index), nil
|
|
case bep.MessageType_MESSAGE_TYPE_INDEX_UPDATE:
|
|
return new(bep.IndexUpdate), nil
|
|
case bep.MessageType_MESSAGE_TYPE_REQUEST:
|
|
return new(bep.Request), nil
|
|
case bep.MessageType_MESSAGE_TYPE_RESPONSE:
|
|
return new(bep.Response), nil
|
|
case bep.MessageType_MESSAGE_TYPE_DOWNLOAD_PROGRESS:
|
|
return new(bep.DownloadProgress), nil
|
|
case bep.MessageType_MESSAGE_TYPE_PING:
|
|
return new(bep.Ping), nil
|
|
case bep.MessageType_MESSAGE_TYPE_CLOSE:
|
|
return new(bep.Close), nil
|
|
default:
|
|
return nil, errUnknownMessage
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) shouldCompressMessage(msg proto.Message) bool {
|
|
switch c.compression {
|
|
case CompressionNever:
|
|
return false
|
|
|
|
case CompressionAlways:
|
|
// Use compression for large enough messages
|
|
return proto.Size(msg) >= compressionThreshold
|
|
|
|
case CompressionMetadata:
|
|
_, isResponse := msg.(*bep.Response)
|
|
// Compress if it's large enough and not a response message
|
|
return !isResponse && proto.Size(msg) >= compressionThreshold
|
|
|
|
default:
|
|
panic("unknown compression setting")
|
|
}
|
|
}
|
|
|
|
// Close is called when the connection is regularly closed and thus the Close
|
|
// BEP message is sent before terminating the actual connection. The error
|
|
// argument specifies the reason for closing the connection.
|
|
func (c *rawConnection) Close(err error) {
|
|
c.sendCloseOnce.Do(func() {
|
|
done := make(chan struct{})
|
|
timeout := time.NewTimer(CloseTimeout)
|
|
select {
|
|
case c.closeBox <- asyncMessage{&bep.Close{Reason: err.Error()}, done}:
|
|
select {
|
|
case <-done:
|
|
case <-timeout.C:
|
|
case <-c.closed:
|
|
}
|
|
case <-timeout.C:
|
|
case <-c.closed:
|
|
}
|
|
})
|
|
|
|
// Close might be called from a method that is called from within
|
|
// dispatcherLoop, resulting in a deadlock.
|
|
// The sending above must happen before spawning the routine, to prevent
|
|
// the underlying connection from terminating before sending the close msg.
|
|
go c.internalClose(err)
|
|
}
|
|
|
|
// internalClose is called if there is an unexpected error during normal operation.
|
|
func (c *rawConnection) internalClose(err error) {
|
|
c.closeOnce.Do(func() {
|
|
c.startStopMut.Lock()
|
|
|
|
l.Debugf("close connection to %s at %s due to %v", c.deviceID.Short(), c.ConnectionInfo, err)
|
|
if cerr := c.closer.Close(); cerr != nil {
|
|
l.Debugf("failed to close underlying conn %s at %s %v:", c.deviceID.Short(), c.ConnectionInfo, cerr)
|
|
}
|
|
close(c.closed)
|
|
|
|
c.awaitingMut.Lock()
|
|
for i, ch := range c.awaiting {
|
|
if ch != nil {
|
|
close(ch)
|
|
delete(c.awaiting, i)
|
|
}
|
|
}
|
|
c.awaitingMut.Unlock()
|
|
|
|
if !c.startTime.IsZero() {
|
|
// Wait for the dispatcher loop to exit, if it was started to
|
|
// begin with.
|
|
<-c.dispatcherLoopStopped
|
|
}
|
|
|
|
c.startStopMut.Unlock()
|
|
|
|
// We don't want to call into the model while holding the
|
|
// startStopMut.
|
|
c.model.Closed(err)
|
|
})
|
|
}
|
|
|
|
// The pingSender makes sure that we've sent a message within the last
|
|
// PingSendInterval. If we already have something sent in the last
|
|
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
|
|
// results in an effecting ping interval of somewhere between
|
|
// PingSendInterval/2 and PingSendInterval.
|
|
func (c *rawConnection) pingSender() {
|
|
ticker := time.NewTicker(PingSendInterval / 2)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
d := time.Since(c.cw.Last())
|
|
if d < PingSendInterval/2 {
|
|
l.Debugln(c.deviceID, "ping skipped after wr", d)
|
|
continue
|
|
}
|
|
|
|
l.Debugln(c.deviceID, "ping -> after", d)
|
|
c.ping()
|
|
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// The pingReceiver checks that we've received a message (any message will do,
|
|
// but we expect pings in the absence of other messages) within the last
|
|
// ReceiveTimeout. If not, we close the connection with an ErrTimeout.
|
|
func (c *rawConnection) pingReceiver() {
|
|
ticker := time.NewTicker(ReceiveTimeout / 2)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
d := time.Since(c.cr.Last())
|
|
if d > ReceiveTimeout {
|
|
l.Debugln(c.deviceID, "ping timeout", d)
|
|
c.internalClose(ErrTimeout)
|
|
}
|
|
|
|
l.Debugln(c.deviceID, "last read within", d)
|
|
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type Statistics struct {
|
|
At time.Time `json:"at"`
|
|
InBytesTotal int64 `json:"inBytesTotal"`
|
|
OutBytesTotal int64 `json:"outBytesTotal"`
|
|
StartedAt time.Time `json:"startedAt"`
|
|
}
|
|
|
|
func (c *rawConnection) Statistics() Statistics {
|
|
return Statistics{
|
|
At: time.Now().Truncate(time.Second),
|
|
InBytesTotal: c.cr.Tot(),
|
|
OutBytesTotal: c.cw.Tot(),
|
|
StartedAt: c.startTime,
|
|
}
|
|
}
|
|
|
|
func lz4Compress(src, buf []byte) (int, error) {
|
|
n, err := lz4.CompressBlock(src, buf[4:], nil)
|
|
if err != nil {
|
|
return -1, err
|
|
} else if n == 0 {
|
|
return -1, errNotCompressible
|
|
}
|
|
|
|
// The compressed block is prefixed by the size of the uncompressed data.
|
|
binary.BigEndian.PutUint32(buf, uint32(len(src)))
|
|
|
|
return n + 4, nil
|
|
}
|
|
|
|
func lz4Decompress(src []byte) ([]byte, error) {
|
|
size := binary.BigEndian.Uint32(src)
|
|
buf := BufferPool.Get(int(size))
|
|
|
|
n, err := lz4.UncompressBlock(src[4:], buf)
|
|
if err != nil {
|
|
BufferPool.Put(buf)
|
|
return nil, err
|
|
}
|
|
|
|
return buf[:n], nil
|
|
}
|
|
|
|
func newProtocolError(err error, msgContext string) error {
|
|
return fmt.Errorf("protocol error on %v: %w", msgContext, err)
|
|
}
|
|
|
|
func newHandleError(err error, msgContext string) error {
|
|
return fmt.Errorf("handling %v: %w", msgContext, err)
|
|
}
|
|
|
|
func messageContext(msg proto.Message) (string, error) {
|
|
switch msg := msg.(type) {
|
|
case *bep.ClusterConfig:
|
|
return "cluster-config", nil
|
|
case *bep.Index:
|
|
return fmt.Sprintf("index for %v", msg.Folder), nil
|
|
case *bep.IndexUpdate:
|
|
return fmt.Sprintf("index-update for %v", msg.Folder), nil
|
|
case *bep.Request:
|
|
return fmt.Sprintf(`request for "%v" in %v`, msg.Name, msg.Folder), nil
|
|
case *bep.Response:
|
|
return "response", nil
|
|
case *bep.DownloadProgress:
|
|
return fmt.Sprintf("download-progress for %v", msg.Folder), nil
|
|
case *bep.Ping:
|
|
return "ping", nil
|
|
case *bep.Close:
|
|
return "close", nil
|
|
default:
|
|
return "", errors.New("unknown or empty message")
|
|
}
|
|
}
|
|
|
|
// connectionWrappingModel takes the Model interface from the model package,
|
|
// which expects the Connection as the first parameter in all methods, and
|
|
// wraps it to conform to the rawModel interface.
|
|
type connectionWrappingModel struct {
|
|
conn Connection
|
|
model Model
|
|
}
|
|
|
|
func (c *connectionWrappingModel) Index(m *Index) error {
|
|
return c.model.Index(c.conn, m)
|
|
}
|
|
|
|
func (c *connectionWrappingModel) IndexUpdate(idxUp *IndexUpdate) error {
|
|
return c.model.IndexUpdate(c.conn, idxUp)
|
|
}
|
|
|
|
func (c *connectionWrappingModel) Request(req *Request) (RequestResponse, error) {
|
|
return c.model.Request(c.conn, req)
|
|
}
|
|
|
|
func (c *connectionWrappingModel) ClusterConfig(config *ClusterConfig) error {
|
|
return c.model.ClusterConfig(c.conn, config)
|
|
}
|
|
|
|
func (c *connectionWrappingModel) Closed(err error) {
|
|
c.model.Closed(c.conn, err)
|
|
}
|
|
|
|
func (c *connectionWrappingModel) DownloadProgress(p *DownloadProgress) error {
|
|
return c.model.DownloadProgress(c.conn, p)
|
|
}
|