mirror of
https://github.com/syncthing/syncthing.git
synced 2025-12-23 22:18:14 -05:00
Merge branch 'main' into v2
* main: feat(config): expose folder and device info as metrics (fixes #9519) (#10148) chore: add issue types to GitHub issue templates build: remove schedule from PR metadata job chore(protocol): only allow enc. password changes on cluster config (#10145) chore(protocol): don't start connection routines a second time (#10146)
This commit is contained in:
1
.github/ISSUE_TEMPLATE/01-feature.yml
vendored
1
.github/ISSUE_TEMPLATE/01-feature.yml
vendored
@@ -1,6 +1,7 @@
|
||||
name: Feature request
|
||||
description: File a new feature request
|
||||
labels: ["enhancement", "needs-triage"]
|
||||
type: Feature
|
||||
body:
|
||||
|
||||
- type: textarea
|
||||
|
||||
1
.github/ISSUE_TEMPLATE/02-bug.yml
vendored
1
.github/ISSUE_TEMPLATE/02-bug.yml
vendored
@@ -1,6 +1,7 @@
|
||||
name: Bug report
|
||||
description: If you're actually looking for support instead, see "I need help / I have a question".
|
||||
labels: ["bug", "needs-triage"]
|
||||
type: Bug
|
||||
body:
|
||||
- type: markdown
|
||||
attributes:
|
||||
|
||||
3
.github/workflows/pr-metadata.yaml
vendored
3
.github/workflows/pr-metadata.yaml
vendored
@@ -7,12 +7,9 @@ on:
|
||||
- reopened
|
||||
- edited
|
||||
- synchronize
|
||||
schedule:
|
||||
- cron: "42 7 * * *"
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
|
||||
@@ -473,6 +473,7 @@ func (c *serveCmd) syncthingMain() {
|
||||
os.Exit(svcutil.ExitError.AsInt())
|
||||
}
|
||||
earlyService.Add(cfgWrapper)
|
||||
config.RegisterInfoMetrics(cfgWrapper)
|
||||
|
||||
// Candidate builds should auto upgrade. Make sure the option is set,
|
||||
// unless we are in a build where it's disabled or the STNOUPGRADE
|
||||
|
||||
62
lib/config/metrics.go
Normal file
62
lib/config/metrics.go
Normal file
@@ -0,0 +1,62 @@
|
||||
// Copyright (C) 2025 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// RegisterInfoMetrics registers Prometheus metrics for the given config
|
||||
// wrapper.
|
||||
func RegisterInfoMetrics(cfg Wrapper) {
|
||||
prometheus.DefaultRegisterer.MustRegister(prometheus.CollectorFunc((&folderInfoMetric{cfg}).Collect))
|
||||
prometheus.DefaultRegisterer.MustRegister(prometheus.CollectorFunc((&folderDeviceMetric{cfg}).Collect))
|
||||
}
|
||||
|
||||
type folderInfoMetric struct {
|
||||
cfg Wrapper
|
||||
}
|
||||
|
||||
var folderInfoMetricDesc = prometheus.NewDesc(
|
||||
"syncthing_config_folder_info",
|
||||
"Provides additional information labels on folders",
|
||||
[]string{"folder", "label", "type", "path", "paused"},
|
||||
nil,
|
||||
)
|
||||
|
||||
func (m *folderInfoMetric) Collect(ch chan<- prometheus.Metric) {
|
||||
for _, folder := range m.cfg.FolderList() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
folderInfoMetricDesc,
|
||||
prometheus.GaugeValue, 1,
|
||||
folder.ID, folder.Label, folder.Type.String(), folder.Path, strconv.FormatBool(folder.Paused),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
type folderDeviceMetric struct {
|
||||
cfg Wrapper
|
||||
}
|
||||
|
||||
var folderDeviceMetricDesc = prometheus.NewDesc(
|
||||
"syncthing_config_device_info",
|
||||
"Provides additional information labels on devices",
|
||||
[]string{"device", "name", "introducer", "paused", "untrusted"},
|
||||
nil,
|
||||
)
|
||||
|
||||
func (m *folderDeviceMetric) Collect(ch chan<- prometheus.Metric) {
|
||||
for _, device := range m.cfg.DeviceList() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
folderDeviceMetricDesc,
|
||||
prometheus.GaugeValue, 1,
|
||||
device.DeviceID.String(), device.Name, strconv.FormatBool(device.Introducer), strconv.FormatBool(device.Paused), strconv.FormatBool(device.Untrusted),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -444,7 +444,7 @@ func (s *service) handleHellos(ctx context.Context) error {
|
||||
// connections are limited.
|
||||
rd, wr := s.limiter.getLimiters(remoteID, c, c.IsLocal())
|
||||
|
||||
protoConn := protocol.NewConnection(remoteID, rd, wr, c, s.model, c, deviceCfg.Compression.ToProtocol(), s.cfg.FolderPasswords(remoteID), s.keyGen)
|
||||
protoConn := protocol.NewConnection(remoteID, rd, wr, c, s.model, c, deviceCfg.Compression.ToProtocol(), s.keyGen)
|
||||
s.accountAddedConnection(protoConn, hello, s.cfg.Options().ConnectionPriorityUpgradeThreshold)
|
||||
go func() {
|
||||
<-protoConn.Closed()
|
||||
|
||||
@@ -31,17 +31,17 @@ func TestIndexhandlerConcurrency(t *testing.T) {
|
||||
ci := &protomock.ConnectionInfo{}
|
||||
|
||||
m1 := &mocks.Model{}
|
||||
c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil, nil)
|
||||
c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil)
|
||||
c1.Start()
|
||||
defer c1.Close(io.EOF)
|
||||
|
||||
m2 := &mocks.Model{}
|
||||
c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil, nil)
|
||||
c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil)
|
||||
c2.Start()
|
||||
defer c2.Close(io.EOF)
|
||||
|
||||
c1.ClusterConfig(&protocol.ClusterConfig{})
|
||||
c2.ClusterConfig(&protocol.ClusterConfig{})
|
||||
c1.ClusterConfig(&protocol.ClusterConfig{}, nil)
|
||||
c2.ClusterConfig(&protocol.ClusterConfig{}, nil)
|
||||
c1.Index(ctx, &protocol.Index{Folder: "foo"})
|
||||
c2.Index(ctx, &protocol.Index{Folder: "foo"})
|
||||
|
||||
|
||||
@@ -1641,8 +1641,7 @@ func (m *model) sendClusterConfig(ids []protocol.DeviceID) {
|
||||
// Generating cluster-configs acquires the mutex.
|
||||
for _, conn := range ccConns {
|
||||
cm, passwords := m.generateClusterConfig(conn.DeviceID())
|
||||
conn.SetFolderPasswords(passwords)
|
||||
go conn.ClusterConfig(cm)
|
||||
go conn.ClusterConfig(cm, passwords)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2379,10 +2378,9 @@ func (m *model) promoteConnections() {
|
||||
l.Debugf("Promoting connection to %s at %s", deviceID.Short(), conn)
|
||||
postLockActions = append(postLockActions, func() {
|
||||
if conn.Statistics().StartedAt.IsZero() {
|
||||
conn.SetFolderPasswords(passwords)
|
||||
conn.Start()
|
||||
}
|
||||
conn.ClusterConfig(cm)
|
||||
conn.ClusterConfig(cm, passwords)
|
||||
})
|
||||
m.promotedConnID[deviceID] = connIDs[0]
|
||||
}
|
||||
@@ -2393,9 +2391,8 @@ func (m *model) promoteConnections() {
|
||||
conn := m.connections[connID]
|
||||
if conn.Statistics().StartedAt.IsZero() {
|
||||
postLockActions = append(postLockActions, func() {
|
||||
conn.SetFolderPasswords(passwords)
|
||||
conn.Start()
|
||||
conn.ClusterConfig(&protocol.ClusterConfig{Secondary: true})
|
||||
conn.ClusterConfig(&protocol.ClusterConfig{Secondary: true}, passwords)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2968,7 +2968,7 @@ func TestConnCloseOnRestart(t *testing.T) {
|
||||
nw := &testutil.NoopRW{}
|
||||
ci := &protocolmocks.ConnectionInfo{}
|
||||
ci.ConnectionIDReturns(srand.String(16))
|
||||
m.AddConnection(protocol.NewConnection(device1, br, nw, testutil.NoopCloser{}, m, ci, protocol.CompressionNever, nil, m.keyGen), protocol.Hello{})
|
||||
m.AddConnection(protocol.NewConnection(device1, br, nw, testutil.NoopCloser{}, m, ci, protocol.CompressionNever, m.keyGen), protocol.Hello{})
|
||||
m.mut.RLock()
|
||||
if len(m.closed) != 1 {
|
||||
t.Fatalf("Expected just one conn (len(m.closed) == %v)", len(m.closed))
|
||||
@@ -3535,11 +3535,11 @@ func testConfigChangeTriggersClusterConfigs(t *testing.T, expectFirst, expectSec
|
||||
cc1 := make(chan struct{}, 1)
|
||||
cc2 := make(chan struct{}, 1)
|
||||
fc1 := newFakeConnection(device1, m)
|
||||
fc1.ClusterConfigCalls(func(_ *protocol.ClusterConfig) {
|
||||
fc1.ClusterConfigCalls(func(_ *protocol.ClusterConfig, _ map[string]string) {
|
||||
cc1 <- struct{}{}
|
||||
})
|
||||
fc2 := newFakeConnection(device2, m)
|
||||
fc2.ClusterConfigCalls(func(_ *protocol.ClusterConfig) {
|
||||
fc2.ClusterConfigCalls(func(_ *protocol.ClusterConfig, _ map[string]string) {
|
||||
cc2 <- struct{}{}
|
||||
})
|
||||
m.AddConnection(fc1, protocol.Hello{})
|
||||
|
||||
@@ -1236,7 +1236,7 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
fc.ClusterConfigCalls(func(cc *protocol.ClusterConfig) {
|
||||
fc.ClusterConfigCalls(func(cc *protocol.ClusterConfig, _ map[string]string) {
|
||||
select {
|
||||
case ccChan <- cc:
|
||||
case <-done:
|
||||
|
||||
@@ -64,14 +64,14 @@ func benchmarkRequestsTLS(b *testing.B, conn0, conn1 net.Conn) {
|
||||
|
||||
func benchmarkRequestsConnPair(b *testing.B, conn0, conn1 net.Conn) {
|
||||
// Start up Connections on them
|
||||
c0 := NewConnection(LocalDeviceID, conn0, conn0, testutil.NoopCloser{}, new(fakeModel), new(mockedConnectionInfo), CompressionMetadata, nil, testKeyGen)
|
||||
c0 := NewConnection(LocalDeviceID, conn0, conn0, testutil.NoopCloser{}, new(fakeModel), new(mockedConnectionInfo), CompressionMetadata, testKeyGen)
|
||||
c0.Start()
|
||||
c1 := NewConnection(LocalDeviceID, conn1, conn1, testutil.NoopCloser{}, new(fakeModel), new(mockedConnectionInfo), CompressionMetadata, nil, testKeyGen)
|
||||
c1 := NewConnection(LocalDeviceID, conn1, conn1, testutil.NoopCloser{}, new(fakeModel), new(mockedConnectionInfo), CompressionMetadata, testKeyGen)
|
||||
c1.Start()
|
||||
|
||||
// Satisfy the assertions in the protocol by sending an initial cluster config
|
||||
c0.ClusterConfig(&ClusterConfig{})
|
||||
c1.ClusterConfig(&ClusterConfig{})
|
||||
c0.ClusterConfig(&ClusterConfig{}, nil)
|
||||
c1.ClusterConfig(&ClusterConfig{}, nil)
|
||||
|
||||
// Report some useful stats and reset the timer for the actual test
|
||||
b.ReportAllocs()
|
||||
|
||||
@@ -50,10 +50,10 @@ type encryptedModel struct {
|
||||
keyGen *KeyGenerator
|
||||
}
|
||||
|
||||
func newEncryptedModel(model rawModel, folderKeys *folderKeyRegistry, keyGen *KeyGenerator) encryptedModel {
|
||||
func newEncryptedModel(model rawModel, keyGen *KeyGenerator) encryptedModel {
|
||||
return encryptedModel{
|
||||
model: model,
|
||||
folderKeys: folderKeys,
|
||||
folderKeys: newFolderKeyRegistry(),
|
||||
keyGen: keyGen,
|
||||
}
|
||||
}
|
||||
@@ -187,10 +187,6 @@ func (e encryptedConnection) Start() {
|
||||
e.conn.Start()
|
||||
}
|
||||
|
||||
func (e encryptedConnection) SetFolderPasswords(passwords map[string]string) {
|
||||
e.folderKeys.setPasswords(passwords)
|
||||
}
|
||||
|
||||
func (e encryptedConnection) DeviceID() DeviceID {
|
||||
return e.conn.DeviceID()
|
||||
}
|
||||
@@ -262,8 +258,9 @@ func (e encryptedConnection) DownloadProgress(ctx context.Context, dp *DownloadP
|
||||
// No need to send these
|
||||
}
|
||||
|
||||
func (e encryptedConnection) ClusterConfig(config *ClusterConfig) {
|
||||
e.conn.ClusterConfig(config)
|
||||
func (e encryptedConnection) ClusterConfig(config *ClusterConfig, passwords map[string]string) {
|
||||
e.folderKeys.setPasswords(e.keyGen, passwords)
|
||||
e.conn.ClusterConfig(config, passwords)
|
||||
}
|
||||
|
||||
func (e encryptedConnection) Close(err error) {
|
||||
@@ -680,15 +677,13 @@ func IsEncryptedParent(pathComponents []string) bool {
|
||||
}
|
||||
|
||||
type folderKeyRegistry struct {
|
||||
keyGen *KeyGenerator
|
||||
keys map[string]*[keySize]byte // folder ID -> key
|
||||
mut sync.RWMutex
|
||||
keys map[string]*[keySize]byte // folder ID -> key
|
||||
mut sync.RWMutex
|
||||
}
|
||||
|
||||
func newFolderKeyRegistry(keyGen *KeyGenerator, passwords map[string]string) *folderKeyRegistry {
|
||||
func newFolderKeyRegistry() *folderKeyRegistry {
|
||||
return &folderKeyRegistry{
|
||||
keyGen: keyGen,
|
||||
keys: keysFromPasswords(keyGen, passwords),
|
||||
keys: make(map[string]*[keySize]byte),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -699,8 +694,8 @@ func (r *folderKeyRegistry) get(folder string) (*[keySize]byte, bool) {
|
||||
return key, ok
|
||||
}
|
||||
|
||||
func (r *folderKeyRegistry) setPasswords(passwords map[string]string) {
|
||||
func (r *folderKeyRegistry) setPasswords(keyGen *KeyGenerator, passwords map[string]string) {
|
||||
r.mut.Lock()
|
||||
r.keys = keysFromPasswords(r.keyGen, passwords)
|
||||
r.keys = keysFromPasswords(keyGen, passwords)
|
||||
r.mut.Unlock()
|
||||
}
|
||||
|
||||
@@ -26,10 +26,11 @@ type Connection struct {
|
||||
closedReturnsOnCall map[int]struct {
|
||||
result1 <-chan struct{}
|
||||
}
|
||||
ClusterConfigStub func(*protocol.ClusterConfig)
|
||||
ClusterConfigStub func(*protocol.ClusterConfig, map[string]string)
|
||||
clusterConfigMutex sync.RWMutex
|
||||
clusterConfigArgsForCall []struct {
|
||||
arg1 *protocol.ClusterConfig
|
||||
arg2 map[string]string
|
||||
}
|
||||
ConnectionIDStub func() string
|
||||
connectionIDMutex sync.RWMutex
|
||||
@@ -145,11 +146,6 @@ type Connection struct {
|
||||
result1 []byte
|
||||
result2 error
|
||||
}
|
||||
SetFolderPasswordsStub func(map[string]string)
|
||||
setFolderPasswordsMutex sync.RWMutex
|
||||
setFolderPasswordsArgsForCall []struct {
|
||||
arg1 map[string]string
|
||||
}
|
||||
StartStub func()
|
||||
startMutex sync.RWMutex
|
||||
startArgsForCall []struct {
|
||||
@@ -283,16 +279,17 @@ func (fake *Connection) ClosedReturnsOnCall(i int, result1 <-chan struct{}) {
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *Connection) ClusterConfig(arg1 *protocol.ClusterConfig) {
|
||||
func (fake *Connection) ClusterConfig(arg1 *protocol.ClusterConfig, arg2 map[string]string) {
|
||||
fake.clusterConfigMutex.Lock()
|
||||
fake.clusterConfigArgsForCall = append(fake.clusterConfigArgsForCall, struct {
|
||||
arg1 *protocol.ClusterConfig
|
||||
}{arg1})
|
||||
arg2 map[string]string
|
||||
}{arg1, arg2})
|
||||
stub := fake.ClusterConfigStub
|
||||
fake.recordInvocation("ClusterConfig", []interface{}{arg1})
|
||||
fake.recordInvocation("ClusterConfig", []interface{}{arg1, arg2})
|
||||
fake.clusterConfigMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.ClusterConfigStub(arg1)
|
||||
fake.ClusterConfigStub(arg1, arg2)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,17 +299,17 @@ func (fake *Connection) ClusterConfigCallCount() int {
|
||||
return len(fake.clusterConfigArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *Connection) ClusterConfigCalls(stub func(*protocol.ClusterConfig)) {
|
||||
func (fake *Connection) ClusterConfigCalls(stub func(*protocol.ClusterConfig, map[string]string)) {
|
||||
fake.clusterConfigMutex.Lock()
|
||||
defer fake.clusterConfigMutex.Unlock()
|
||||
fake.ClusterConfigStub = stub
|
||||
}
|
||||
|
||||
func (fake *Connection) ClusterConfigArgsForCall(i int) *protocol.ClusterConfig {
|
||||
func (fake *Connection) ClusterConfigArgsForCall(i int) (*protocol.ClusterConfig, map[string]string) {
|
||||
fake.clusterConfigMutex.RLock()
|
||||
defer fake.clusterConfigMutex.RUnlock()
|
||||
argsForCall := fake.clusterConfigArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *Connection) ConnectionID() string {
|
||||
@@ -908,38 +905,6 @@ func (fake *Connection) RequestReturnsOnCall(i int, result1 []byte, result2 erro
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *Connection) SetFolderPasswords(arg1 map[string]string) {
|
||||
fake.setFolderPasswordsMutex.Lock()
|
||||
fake.setFolderPasswordsArgsForCall = append(fake.setFolderPasswordsArgsForCall, struct {
|
||||
arg1 map[string]string
|
||||
}{arg1})
|
||||
stub := fake.SetFolderPasswordsStub
|
||||
fake.recordInvocation("SetFolderPasswords", []interface{}{arg1})
|
||||
fake.setFolderPasswordsMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.SetFolderPasswordsStub(arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *Connection) SetFolderPasswordsCallCount() int {
|
||||
fake.setFolderPasswordsMutex.RLock()
|
||||
defer fake.setFolderPasswordsMutex.RUnlock()
|
||||
return len(fake.setFolderPasswordsArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *Connection) SetFolderPasswordsCalls(stub func(map[string]string)) {
|
||||
fake.setFolderPasswordsMutex.Lock()
|
||||
defer fake.setFolderPasswordsMutex.Unlock()
|
||||
fake.SetFolderPasswordsStub = stub
|
||||
}
|
||||
|
||||
func (fake *Connection) SetFolderPasswordsArgsForCall(i int) map[string]string {
|
||||
fake.setFolderPasswordsMutex.RLock()
|
||||
defer fake.setFolderPasswordsMutex.RUnlock()
|
||||
argsForCall := fake.setFolderPasswordsArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *Connection) Start() {
|
||||
fake.startMutex.Lock()
|
||||
fake.startArgsForCall = append(fake.startArgsForCall, struct {
|
||||
@@ -1207,8 +1172,6 @@ func (fake *Connection) Invocations() map[string][][]interface{} {
|
||||
defer fake.remoteAddrMutex.RUnlock()
|
||||
fake.requestMutex.RLock()
|
||||
defer fake.requestMutex.RUnlock()
|
||||
fake.setFolderPasswordsMutex.RLock()
|
||||
defer fake.setFolderPasswordsMutex.RUnlock()
|
||||
fake.startMutex.RLock()
|
||||
defer fake.startMutex.RUnlock()
|
||||
fake.statisticsMutex.RLock()
|
||||
|
||||
@@ -129,7 +129,9 @@ type Connection interface {
|
||||
// Send a Cluster Configuration message to the peer device. The message
|
||||
// in the parameter may be altered by the connection and should not be
|
||||
// used further by the caller.
|
||||
ClusterConfig(config *ClusterConfig)
|
||||
// For any folder that must be encrypted for the connected device, the
|
||||
// password must be provided.
|
||||
ClusterConfig(config *ClusterConfig, passwords map[string]string)
|
||||
|
||||
// Send a Download Progress message to the peer device. The message in
|
||||
// the parameter may be altered by the connection and should not be used
|
||||
@@ -137,7 +139,6 @@ type Connection interface {
|
||||
DownloadProgress(ctx context.Context, dp *DownloadProgress)
|
||||
|
||||
Start()
|
||||
SetFolderPasswords(passwords map[string]string)
|
||||
Close(err error)
|
||||
DeviceID() DeviceID
|
||||
Statistics() Statistics
|
||||
@@ -215,7 +216,7 @@ const (
|
||||
// Should not be modified in production code, just for testing.
|
||||
var CloseTimeout = 10 * time.Second
|
||||
|
||||
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, model Model, connInfo ConnectionInfo, compress Compression, passwords map[string]string, keyGen *KeyGenerator) Connection {
|
||||
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, model Model, connInfo ConnectionInfo, compress Compression, keyGen *KeyGenerator) Connection {
|
||||
// We create the wrapper for the model first, as it needs to be passed
|
||||
// in at the lowest level in the stack. At the end of construction,
|
||||
// before returning, we add the connection to cwm so that it can be used
|
||||
@@ -225,7 +226,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer
|
||||
// Encryption / decryption is first (outermost) before conversion to
|
||||
// native path formats.
|
||||
nm := makeNative(cwm)
|
||||
em := newEncryptedModel(nm, newFolderKeyRegistry(keyGen, passwords), keyGen)
|
||||
em := newEncryptedModel(nm, keyGen)
|
||||
|
||||
// We do the wire format conversion first (outermost) so that the
|
||||
// metadata is in wire format when it reaches the encryption step.
|
||||
@@ -265,12 +266,15 @@ func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, clo
|
||||
}
|
||||
|
||||
// Start creates the goroutines for sending and receiving of messages. It must
|
||||
// be called exactly once after creating a connection.
|
||||
// be called once after creating a connection. It should only be called once,
|
||||
// subsequent calls will have no effect.
|
||||
func (c *rawConnection) Start() {
|
||||
c.startStopMut.Lock()
|
||||
defer c.startStopMut.Unlock()
|
||||
|
||||
select {
|
||||
case <-c.started:
|
||||
return
|
||||
case <-c.closed:
|
||||
// we have already closed the connection before starting processing
|
||||
// on it.
|
||||
@@ -379,7 +383,7 @@ func (c *rawConnection) Request(ctx context.Context, req *Request) ([]byte, erro
|
||||
}
|
||||
|
||||
// ClusterConfig sends the cluster configuration message to the peer.
|
||||
func (c *rawConnection) ClusterConfig(config *ClusterConfig) {
|
||||
func (c *rawConnection) ClusterConfig(config *ClusterConfig, _ map[string]string) {
|
||||
select {
|
||||
case c.clusterConfigBox <- config:
|
||||
case <-c.closed:
|
||||
|
||||
@@ -33,14 +33,14 @@ func TestPing(t *testing.T) {
|
||||
ar, aw := io.Pipe()
|
||||
br, bw := io.Pipe()
|
||||
|
||||
c0 := getRawConnection(NewConnection(c0ID, ar, bw, testutil.NoopCloser{}, newTestModel(), new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c0 := getRawConnection(NewConnection(c0ID, ar, bw, testutil.NoopCloser{}, newTestModel(), new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c0.Start()
|
||||
defer closeAndWait(c0, ar, bw)
|
||||
c1 := getRawConnection(NewConnection(c1ID, br, aw, testutil.NoopCloser{}, newTestModel(), new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c1 := getRawConnection(NewConnection(c1ID, br, aw, testutil.NoopCloser{}, newTestModel(), new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c1.Start()
|
||||
defer closeAndWait(c1, ar, bw)
|
||||
c0.ClusterConfig(&ClusterConfig{})
|
||||
c1.ClusterConfig(&ClusterConfig{})
|
||||
c0.ClusterConfig(&ClusterConfig{}, nil)
|
||||
c1.ClusterConfig(&ClusterConfig{}, nil)
|
||||
|
||||
if ok := c0.ping(); !ok {
|
||||
t.Error("c0 ping failed")
|
||||
@@ -59,14 +59,14 @@ func TestClose(t *testing.T) {
|
||||
ar, aw := io.Pipe()
|
||||
br, bw := io.Pipe()
|
||||
|
||||
c0 := getRawConnection(NewConnection(c0ID, ar, bw, testutil.NoopCloser{}, m0, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c0 := getRawConnection(NewConnection(c0ID, ar, bw, testutil.NoopCloser{}, m0, new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c0.Start()
|
||||
defer closeAndWait(c0, ar, bw)
|
||||
c1 := NewConnection(c1ID, br, aw, testutil.NoopCloser{}, m1, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen)
|
||||
c1 := NewConnection(c1ID, br, aw, testutil.NoopCloser{}, m1, new(mockedConnectionInfo), CompressionAlways, testKeyGen)
|
||||
c1.Start()
|
||||
defer closeAndWait(c1, ar, bw)
|
||||
c0.ClusterConfig(&ClusterConfig{})
|
||||
c1.ClusterConfig(&ClusterConfig{})
|
||||
c0.ClusterConfig(&ClusterConfig{}, nil)
|
||||
c1.ClusterConfig(&ClusterConfig{}, nil)
|
||||
|
||||
c0.internalClose(errManual)
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestCloseOnBlockingSend(t *testing.T) {
|
||||
m := newTestModel()
|
||||
|
||||
rw := testutil.NewBlockingRW()
|
||||
c := getRawConnection(NewConnection(c0ID, rw, rw, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c := getRawConnection(NewConnection(c0ID, rw, rw, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c.Start()
|
||||
defer closeAndWait(c, rw)
|
||||
|
||||
@@ -112,7 +112,7 @@ func TestCloseOnBlockingSend(t *testing.T) {
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
c.ClusterConfig(&ClusterConfig{})
|
||||
c.ClusterConfig(&ClusterConfig{}, nil)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@@ -155,14 +155,14 @@ func TestCloseRace(t *testing.T) {
|
||||
ar, aw := io.Pipe()
|
||||
br, bw := io.Pipe()
|
||||
|
||||
c0 := getRawConnection(NewConnection(c0ID, ar, bw, testutil.NoopCloser{}, m0, new(mockedConnectionInfo), CompressionNever, nil, testKeyGen))
|
||||
c0 := getRawConnection(NewConnection(c0ID, ar, bw, testutil.NoopCloser{}, m0, new(mockedConnectionInfo), CompressionNever, testKeyGen))
|
||||
c0.Start()
|
||||
defer closeAndWait(c0, ar, bw)
|
||||
c1 := NewConnection(c1ID, br, aw, testutil.NoopCloser{}, m1, new(mockedConnectionInfo), CompressionNever, nil, testKeyGen)
|
||||
c1 := NewConnection(c1ID, br, aw, testutil.NoopCloser{}, m1, new(mockedConnectionInfo), CompressionNever, testKeyGen)
|
||||
c1.Start()
|
||||
defer closeAndWait(c1, ar, bw)
|
||||
c0.ClusterConfig(&ClusterConfig{})
|
||||
c1.ClusterConfig(&ClusterConfig{})
|
||||
c0.ClusterConfig(&ClusterConfig{}, nil)
|
||||
c1.ClusterConfig(&ClusterConfig{}, nil)
|
||||
|
||||
c1.Index(context.Background(), &Index{Folder: "default"})
|
||||
select {
|
||||
@@ -195,7 +195,7 @@ func TestClusterConfigFirst(t *testing.T) {
|
||||
m := newTestModel()
|
||||
|
||||
rw := testutil.NewBlockingRW()
|
||||
c := getRawConnection(NewConnection(c0ID, rw, &testutil.NoopRW{}, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c := getRawConnection(NewConnection(c0ID, rw, &testutil.NoopRW{}, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c.Start()
|
||||
defer closeAndWait(c, rw)
|
||||
|
||||
@@ -206,7 +206,7 @@ func TestClusterConfigFirst(t *testing.T) {
|
||||
// Allow some time for c.writerLoop to set up after c.Start
|
||||
}
|
||||
|
||||
c.ClusterConfig(&ClusterConfig{})
|
||||
c.ClusterConfig(&ClusterConfig{}, nil)
|
||||
|
||||
done := make(chan struct{})
|
||||
if ok := c.send(context.Background(), &bep.Ping{}, done); !ok {
|
||||
@@ -247,7 +247,7 @@ func TestCloseTimeout(t *testing.T) {
|
||||
m := newTestModel()
|
||||
|
||||
rw := testutil.NewBlockingRW()
|
||||
c := getRawConnection(NewConnection(c0ID, rw, rw, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c := getRawConnection(NewConnection(c0ID, rw, rw, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c.Start()
|
||||
defer closeAndWait(c, rw)
|
||||
|
||||
@@ -507,7 +507,7 @@ func TestClusterConfigAfterClose(t *testing.T) {
|
||||
m := newTestModel()
|
||||
|
||||
rw := testutil.NewBlockingRW()
|
||||
c := getRawConnection(NewConnection(c0ID, rw, rw, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c := getRawConnection(NewConnection(c0ID, rw, rw, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
c.Start()
|
||||
defer closeAndWait(c, rw)
|
||||
|
||||
@@ -515,7 +515,7 @@ func TestClusterConfigAfterClose(t *testing.T) {
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
c.ClusterConfig(&ClusterConfig{})
|
||||
c.ClusterConfig(&ClusterConfig{}, nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@@ -531,7 +531,7 @@ func TestDispatcherToCloseDeadlock(t *testing.T) {
|
||||
// the model callbacks (ClusterConfig).
|
||||
m := newTestModel()
|
||||
rw := testutil.NewBlockingRW()
|
||||
c := getRawConnection(NewConnection(c0ID, rw, &testutil.NoopRW{}, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, nil, testKeyGen))
|
||||
c := getRawConnection(NewConnection(c0ID, rw, &testutil.NoopRW{}, testutil.NoopCloser{}, m, new(mockedConnectionInfo), CompressionAlways, testKeyGen))
|
||||
m.ccFn = func(*ClusterConfig) {
|
||||
c.Close(errManual)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user