From c5a3ae314cff184fc93e2628e46c52c4f4f7b3ea Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sat, 20 Jun 2020 21:29:39 +0100 Subject: [PATCH] UR now web scale --- cmd/uraggregate/main.go | 52 ++++---- cmd/ursrv/main.go | 238 ++++++++++-------------------------- lib/ur/contract/contract.go | 19 ++- 3 files changed, 108 insertions(+), 201 deletions(-) diff --git a/cmd/uraggregate/main.go b/cmd/uraggregate/main.go index f2fef605c..adcdf465e 100644 --- a/cmd/uraggregate/main.go +++ b/cmd/uraggregate/main.go @@ -175,13 +175,13 @@ func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) { res, err := db.Exec(`INSERT INTO VersionSummary ( SELECT DATE_TRUNC('day', Received) AS Day, - SUBSTRING(Version FROM '^v\d.\d+') AS Ver, + SUBSTRING(Report->>'version' FROM '^v\d.\d+') AS Ver, COUNT(*) AS Count - FROM Reports + FROM ReportsJson WHERE DATE_TRUNC('day', Received) > $1 AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW()) - AND Version like 'v_.%' + AND Report->>'version' like 'v_.%' GROUP BY Day, Ver ); `, since) @@ -195,11 +195,11 @@ func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) { func aggregateUserMovement(db *sql.DB) (int64, error) { rows, err := db.Query(`SELECT DATE_TRUNC('day', Received) AS Day, - UniqueID - FROM Reports + Report->>'uniqueID' + FROM ReportsJson WHERE DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW()) - AND Version like 'v_.%' + AND Report->>'version' like 'v_.%' ORDER BY Day `) if err != nil { @@ -276,16 +276,16 @@ func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) { res, err := db.Exec(`INSERT INTO Performance ( SELECT DATE_TRUNC('day', Received) AS Day, - AVG(TotFiles) As TotFiles, - AVG(TotMiB) As TotMiB, - AVG(SHA256Perf) As SHA256Perf, - AVG(MemorySize) As MemorySize, - AVG(MemoryUsageMiB) As MemoryUsageMiB - FROM Reports + AVG((Report->>'totFiles')::numeric) As TotFiles, + AVG((Report->>'totMiB')::numeric) As TotMiB, + AVG((Report->>'sha256Perf')::numeric) As SHA256Perf, + AVG((Report->>'memorySize')::numeric) As MemorySize, + AVG((Report->>'memoryUsageMiB')::numeric) As MemoryUsageMiB + FROM ReportsJson WHERE DATE_TRUNC('day', Received) > $1 AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW()) - AND Version like 'v_.%' + AND Report->>'version' like 'v_.%' GROUP BY Day ); `, since) @@ -303,22 +303,22 @@ func aggregateBlockStats(db *sql.DB, since time.Time) (int64, error) { SELECT DATE_TRUNC('day', Received) AS Day, COUNT(1) As Reports, - SUM(BlocksTotal) AS Total, - SUM(BlocksRenamed) AS Renamed, - SUM(BlocksReused) AS Reused, - SUM(BlocksPulled) AS Pulled, - SUM(BlocksCopyOrigin) AS CopyOrigin, - SUM(BlocksCopyOriginShifted) AS CopyOriginShifted, - SUM(BlocksCopyElsewhere) AS CopyElsewhere - FROM Reports + SUM((Report->'blockStats'->>'total')::numeric) AS Total, + SUM((Report->'blockStats'->>'renamed')::numeric) AS Renamed, + SUM((Report->'blockStats'->>'reused')::numeric) AS Reused, + SUM((Report->'blockStats'->>'pulled')::numeric) AS Pulled, + SUM((Report->'blockStats'->>'copyOrigin')::numeric) AS CopyOrigin, + SUM((Report->'blockStats'->>'copyOriginShifted')::numeric) AS CopyOriginShifted, + SUM((Report->'blockStats'->>'copyElsewhere')::numeric) AS CopyElsewhere + FROM ReportsJson WHERE DATE_TRUNC('day', Received) > $1 AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW()) - AND ReportVersion = 3 - AND Version like 'v_.%' - AND Version NOT LIKE 'v0.14.40%' - AND Version NOT LIKE 'v0.14.39%' - AND Version NOT LIKE 'v0.14.38%' + AND (Report->>'urVersion')::numeric >= 3 + AND Report->>'version' like 'v_.%' + AND Report->>'version' NOT LIKE 'v0.14.40%' + AND Report->>'version' NOT LIKE 'v0.14.39%' + AND Report->>'version' NOT LIKE 'v0.14.38%' GROUP BY Day ); `, since) diff --git a/cmd/ursrv/main.go b/cmd/ursrv/main.go index 13dfd276c..71e292ea0 100644 --- a/cmd/ursrv/main.go +++ b/cmd/ursrv/main.go @@ -11,7 +11,6 @@ import ( "crypto/tls" "database/sql" "encoding/json" - "fmt" "html/template" "io" "io/ioutil" @@ -27,7 +26,8 @@ import ( "time" "unicode" - geoip2 "github.com/oschwald/geoip2-golang" + "github.com/lib/pq" + "github.com/oschwald/geoip2-golang" "github.com/syncthing/syncthing/lib/ur/contract" ) @@ -103,198 +103,100 @@ func getEnvDefault(key, def string) string { } func setupDB(db *sql.DB) error { - _, err := db.Exec(`CREATE TABLE IF NOT EXISTS Reports ( + _, err := db.Exec(`CREATE TABLE IF NOT EXISTS ReportsJson ( Received TIMESTAMP NOT NULL, - UniqueID VARCHAR(32) NOT NULL, - Version VARCHAR(32) NOT NULL, - LongVersion VARCHAR(256) NOT NULL, - Platform VARCHAR(32) NOT NULL, - NumFolders INTEGER NOT NULL, - NumDevices INTEGER NOT NULL, - TotFiles INTEGER NOT NULL, - FolderMaxFiles INTEGER NOT NULL, - TotMiB INTEGER NOT NULL, - FolderMaxMiB INTEGER NOT NULL, - MemoryUsageMiB INTEGER NOT NULL, - SHA256Perf DOUBLE PRECISION NOT NULL, - MemorySize INTEGER NOT NULL, - Date VARCHAR(8) NOT NULL + Report JSONB NOT NULL )`) if err != nil { return err } var t string - row := db.QueryRow(`SELECT 'UniqueIDIndex'::regclass`) - if err := row.Scan(&t); err != nil { - if _, err = db.Exec(`CREATE UNIQUE INDEX UniqueIDIndex ON Reports (Date, UniqueID)`); err != nil { + if err := db.QueryRow(`SELECT 'UniqueIDJsonIndex'::regclass`).Scan(&t); err != nil { + if _, err = db.Exec(`CREATE UNIQUE INDEX UniqueIDJsonIndex ON ReportsJson ((Report->>'date'), (Report->>'uniqueID'))`); err != nil { return err } } - row = db.QueryRow(`SELECT 'ReceivedIndex'::regclass`) - if err := row.Scan(&t); err != nil { - if _, err = db.Exec(`CREATE INDEX ReceivedIndex ON Reports (Received)`); err != nil { + if err := db.QueryRow(`SELECT 'ReceivedJsonIndex'::regclass`).Scan(&t); err != nil { + if _, err = db.Exec(`CREATE INDEX ReceivedJsonIndex ON ReportsJson (Received)`); err != nil { return err } } - // V2 + if err := db.QueryRow(`SELECT 'ReportVersionJsonIndex'::regclass`).Scan(&t); err != nil { + if _, err = db.Exec(`CREATE INDEX ReportVersionJsonIndex ON ReportsJson (cast((Report->>'urVersion') as numeric))`); err != nil { + return err + } + } - row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'reportversion'`) - if err := row.Scan(&t); err != nil { - // The ReportVersion column doesn't exist; add the new columns. - _, err = db.Exec(`ALTER TABLE Reports - ADD COLUMN ReportVersion INTEGER NOT NULL DEFAULT 0, - ADD COLUMN NumCPU INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderRO INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderIgnorePerms INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderIgnoreDelete INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderAutoNormalize INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceIntroducer INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceCustomCertName INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceCompressAlways INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceCompressMetadata INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceCompressNever INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceDynamicAddr INTEGER NOT NULL DEFAULT 0, - ADD COLUMN DeviceStaticAddr INTEGER NOT NULL DEFAULT 0, - ADD COLUMN AnnounceGlobalEnabled BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN AnnounceLocalEnabled BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN AnnounceDefaultServersDNS INTEGER NOT NULL DEFAULT 0, - ADD COLUMN AnnounceDefaultServersIP INTEGER NOT NULL DEFAULT 0, - ADD COLUMN AnnounceOtherServers INTEGER NOT NULL DEFAULT 0, - ADD COLUMN RelayEnabled BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN RelayDefaultServers INTEGER NOT NULL DEFAULT 0, - ADD COLUMN RelayOtherServers INTEGER NOT NULL DEFAULT 0, - ADD COLUMN RateLimitEnabled BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN UpgradeAllowedManual BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN UpgradeAllowedAuto BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN FolderSimpleVersioning INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderExternalVersioning INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderStaggeredVersioning INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderTrashcanVersioning INTEGER NOT NULL DEFAULT 0 - `) + // Migrate from old schema to new schema if the table exists. + var count uint64 + if err := db.QueryRow(`SELECT COUNT(1) FROM Reports`).Scan(&count); err == nil && count > 0 { + tx, err := db.Begin() if err != nil { + log.Println("sql:", err) return err } - } + defer tx.Rollback() - row = db.QueryRow(`SELECT 'ReportVersionIndex'::regclass`) - if err := row.Scan(&t); err != nil { - if _, err = db.Exec(`CREATE INDEX ReportVersionIndex ON Reports (ReportVersion)`); err != nil { - return err - } - } - - // V2.5 - - row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'upgradeallowedpre'`) - if err := row.Scan(&t); err != nil { - // The ReportVersion column doesn't exist; add the new columns. - _, err = db.Exec(`ALTER TABLE Reports - ADD COLUMN UpgradeAllowedPre BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN RescanIntvs INT[] NOT NULL DEFAULT '{}' - `) + stmt, err := tx.Prepare(pq.CopyIn("ReportsJson", "Received", "Report")) if err != nil { + log.Println("sql:", err) return err } - } - // V3 + var rep contract.Report - row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'uptime'`) - if err := row.Scan(&t); err != nil { - // The Uptime column doesn't exist; add the new columns. - _, err = db.Exec(`ALTER TABLE Reports - ADD COLUMN Uptime INTEGER NOT NULL DEFAULT 0, - ADD COLUMN NATType VARCHAR(32) NOT NULL DEFAULT '', - ADD COLUMN AlwaysLocalNets BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN CacheIgnoredFiles BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN OverwriteRemoteDeviceNames BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN ProgressEmitterEnabled BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN CustomDefaultFolderPath BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN WeakHashSelection VARCHAR(32) NOT NULL DEFAULT '', - ADD COLUMN CustomTrafficClass BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN CustomTempIndexMinBlocks BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN TemporariesDisabled BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN TemporariesCustom BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN LimitBandwidthInLan BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN CustomReleaseURL BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN RestartOnWakeup BOOLEAN NOT NULL DEFAULT FALSE, - ADD COLUMN CustomStunServers BOOLEAN NOT NULL DEFAULT FALSE, - - ADD COLUMN FolderScanProgressDisabled INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderConflictsDisabled INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderConflictsUnlimited INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderConflictsOther INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderDisableSparseFiles INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderDisableTempIndexes INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderAlwaysWeakHash INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderCustomWeakHashThreshold INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderFsWatcherEnabled INTEGER NOT NULL DEFAULT 0, - ADD COLUMN FolderPullOrder JSONB NOT NULL DEFAULT '{}', - ADD COLUMN FolderFilesystemType JSONB NOT NULL DEFAULT '{}', - ADD COLUMN FolderFsWatcherDelays INT[] NOT NULL DEFAULT '{}', - - ADD COLUMN GUIEnabled INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIUseTLS INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIUseAuth INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIInsecureAdminAccess INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIDebugging INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIInsecureSkipHostCheck INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIInsecureAllowFrameLoading INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIListenLocal INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUIListenUnspecified INTEGER NOT NULL DEFAULT 0, - ADD COLUMN GUITheme JSONB NOT NULL DEFAULT '{}', - - ADD COLUMN BlocksTotal INTEGER NOT NULL DEFAULT 0, - ADD COLUMN BlocksRenamed INTEGER NOT NULL DEFAULT 0, - ADD COLUMN BlocksReused INTEGER NOT NULL DEFAULT 0, - ADD COLUMN BlocksPulled INTEGER NOT NULL DEFAULT 0, - ADD COLUMN BlocksCopyOrigin INTEGER NOT NULL DEFAULT 0, - ADD COLUMN BlocksCopyOriginShifted INTEGER NOT NULL DEFAULT 0, - ADD COLUMN BlocksCopyElsewhere INTEGER NOT NULL DEFAULT 0, - - ADD COLUMN Transport JSONB NOT NULL DEFAULT '{}', - - ADD COLUMN IgnoreLines INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreInverts INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreFolded INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreDeletable INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreRooted INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreIncludes INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreEscapedIncludes INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreDoubleStars INTEGER NOT NULL DEFAULT 0, - ADD COLUMN IgnoreStars INTEGER NOT NULL DEFAULT 0 - `) + rows, err := tx.Query(`DELETE FROM Reports RETURNING ` + strings.Join(rep.FieldNames(), ", ")) if err != nil { + log.Println("sql:", err) return err } - } + defer rows.Close() - // V3 added late in the RC + var done uint64 + pct := count / 100 - row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'weakhashenabled'`) - if err := row.Scan(&t); err != nil { - // The WeakHashEnabled column doesn't exist; add the new columns. - _, err = db.Exec(`ALTER TABLE Reports - ADD COLUMN WeakHashEnabled BOOLEAN NOT NULL DEFAULT FALSE - ADD COLUMN Address VARCHAR(45) NOT NULL DEFAULT '' - `) + for rows.Next() { + err := rows.Scan(rep.FieldPointers()...) + if err != nil { + log.Println("sql scan:", err) + return err + } + _, err = stmt.Exec(rep.Received, rep) + if err != nil { + log.Println("sql insert:", err) + return err + } + done++ + if done%pct == 0 { + log.Printf("Migration progress %d/%d (%d%%)", done, count, (100*done)/count) + } + } + + // Tell the driver bulk copy is finished + _, err = stmt.Exec() if err != nil { + log.Println("sql stmt exec:", err) return err } - } - // Receive only added ad-hoc - - row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'folderrecvonly'`) - if err := row.Scan(&t); err != nil { - // The RecvOnly column doesn't exist; add it. - _, err = db.Exec(`ALTER TABLE Reports - ADD COLUMN FolderRecvOnly INTEGER NOT NULL DEFAULT 0 - `) + err = stmt.Close() if err != nil { + log.Println("sql stmt close:", err) + return err + } + + _, err = tx.Exec("DROP TABLE Reports") + if err != nil { + log.Println("sql drop:", err) + return err + } + + err = tx.Commit() + if err != nil { + log.Println("sql commit:", err) return err } } @@ -303,15 +205,7 @@ func setupDB(db *sql.DB) error { } func insertReport(db *sql.DB, r contract.Report) error { - time := time.Now().UTC() - r.Received = &time - fields := r.FieldPointers() - params := make([]string, len(fields)) - for i := range params { - params[i] = fmt.Sprintf("$%d", i+1) - } - query := "INSERT INTO Reports (" + strings.Join(r.FieldNames(), ", ") + ") VALUES (" + strings.Join(params, ", ") + ")" - _, err := db.Exec(query, fields...) + _, err := db.Exec("INSERT INTO ReportsJson (Report, Received) VALUES ($1, $2)", r, time.Now().UTC()) return err } @@ -319,9 +213,9 @@ func insertReport(db *sql.DB, r contract.Report) error { type withDBFunc func(*sql.DB, http.ResponseWriter, *http.Request) func withDB(db *sql.DB, f withDBFunc) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { f(db, w, r) - }) + } } func main() { @@ -744,7 +638,7 @@ func getReport(db *sql.DB) map[string]interface{} { var rep contract.Report - rows, err := db.Query(`SELECT ` + strings.Join(rep.FieldNames(), ",") + ` FROM Reports WHERE Received > now() - '1 day'::INTERVAL`) + rows, err := db.Query(`SELECT Received, Report FROM ReportsJson WHERE Received > now() - '1 day'::INTERVAL`) if err != nil { log.Println("sql:", err) return nil @@ -752,7 +646,7 @@ func getReport(db *sql.DB) map[string]interface{} { defer rows.Close() for rows.Next() { - err := rows.Scan(rep.FieldPointers()...) + err := rows.Scan(&rep.Received, &rep) if err != nil { log.Println("sql:", err) diff --git a/lib/ur/contract/contract.go b/lib/ur/contract/contract.go index b94426041..7024332a0 100644 --- a/lib/ur/contract/contract.go +++ b/lib/ur/contract/contract.go @@ -42,9 +42,9 @@ func (p *IntMap) Scan(src interface{}) error { type Report struct { // Generated - Received *time.Time `json:"received,omitempty"` // Only from DB - Date string `json:"date,omitempty"` - Address string `json:"address,omitempty"` + Received time.Time // Only from DB + Date string `json:"date,omitempty"` + Address string `json:"address,omitempty"` // v1 fields @@ -409,6 +409,19 @@ func (r *Report) FieldNames() []string { } } +func (r Report) Value() (driver.Value, error) { + return json.Marshal(r) +} + +func (r *Report) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return json.Unmarshal(b, &r) +} + func clear(v interface{}, since int) error { s := reflect.ValueOf(v).Elem() t := s.Type()