mirror of
https://github.com/KDE/kde-linux.git
synced 2026-04-18 05:29:01 -04:00
because we fed the calcs channel from the main thread we'd eventually get stuck on the calcs workers waiting for space in the results channel but there'd never be space because the results are processed on the main thread and that is busy feeding calcs ... instead buffer the channels and also make sure to feed them from a goroutine so we are starting processing results while still producing calcs. I feel like this may be an anti pattern and maybe one should put everything in a routine and synchronize them all in the main thread. would prevent this sort of nonesense at least
199 lines
5.0 KiB
Go
199 lines
5.0 KiB
Go
// SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
|
|
// SPDX-FileCopyrightText: 2024-2025 Harald Sitter <sitter@kde.org>
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"errors"
|
|
"flag"
|
|
"io"
|
|
"log"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
)
|
|
|
|
func connectToMinIO(endpoint string) *minio.Client {
|
|
awsSection, err := readConfigAWS("default")
|
|
if err != nil {
|
|
log.Fatalln("Failed to read AWS config:", err)
|
|
}
|
|
accessKeyID := awsSection.AccessKeyId
|
|
if accessKeyID == "" {
|
|
log.Fatalln("AWS access key ID is empty")
|
|
}
|
|
secretAccessKey := awsSection.SecretKey
|
|
if secretAccessKey == "" {
|
|
log.Fatalln("AWS secret access key is empty")
|
|
}
|
|
sessionToken := awsSection.SessionToken
|
|
if secretAccessKey == "" {
|
|
log.Fatalln("AWS session token is empty")
|
|
}
|
|
useSSL := true
|
|
|
|
// Initialize minio client object.
|
|
minioClient, err := minio.New(endpoint, &minio.Options{
|
|
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, sessionToken),
|
|
Secure: useSSL,
|
|
TrailingHeaders: true,
|
|
})
|
|
if err != nil {
|
|
log.Fatalln("Failed to create MinIO client:", err)
|
|
}
|
|
|
|
buckets, err := minioClient.ListBuckets(context.Background())
|
|
if err != nil {
|
|
log.Fatalln("Failed to list buckets:", err)
|
|
}
|
|
for _, bucket := range buckets {
|
|
log.Println(bucket)
|
|
}
|
|
|
|
return minioClient
|
|
}
|
|
|
|
func sha256File(path string) string {
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
log.Fatalf("unable to open file %s: %v", path, err)
|
|
}
|
|
defer file.Close()
|
|
|
|
hasher := sha256.New()
|
|
if _, err := io.Copy(hasher, file); err != nil {
|
|
log.Fatalf("unable to hash file %s: %v", path, err)
|
|
}
|
|
return hex.EncodeToString(hasher.Sum(nil))
|
|
}
|
|
|
|
type UploadObject struct {
|
|
ObjectName string
|
|
Path string
|
|
SHA256 string
|
|
}
|
|
|
|
func upload(client *minio.Client, bucket string, objectNamePrefix string) {
|
|
dir := "../upload-tree"
|
|
|
|
// Somewhat involved producer-consumer going on here.
|
|
// - We start N sha256 calculation workers.
|
|
// - We then feed the calcs channel with partial UploadObjects.
|
|
// - Close the calcs channel to eventually let the workers drain and finish.
|
|
// - The workers calculate the sha and then push the complete objects into the results channel.
|
|
// - Once the calcs waitgroup is done we close the results channel as well to let the uploader drain and finish.
|
|
|
|
calcs := make(chan UploadObject, 32) // mind that the size causes this channel to buffer which prevents deadlocks
|
|
results := make(chan UploadObject, 32)
|
|
|
|
calcsWg := sync.WaitGroup{}
|
|
|
|
// The sha256 calculation workers.
|
|
for i := 0; i < 4; i++ {
|
|
calcsWg.Add(1)
|
|
go func() {
|
|
defer calcsWg.Done()
|
|
for obj := range calcs {
|
|
obj.SHA256 = sha256File(obj.Path)
|
|
results <- obj
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Once they are all done we can close the results channel.
|
|
go func() {
|
|
calcsWg.Wait()
|
|
log.Println("All SHA256 calculations done, closing results channel")
|
|
close(results)
|
|
}()
|
|
|
|
// The calcs channel gets fed by the walking of the upload tree.
|
|
go func() {
|
|
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
objectName, err := filepath.Rel(dir, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
objectName = filepath.Join(objectNamePrefix, objectName)
|
|
if objectName == "" {
|
|
return errors.New("object name cannot be empty")
|
|
}
|
|
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
|
|
calcs <- UploadObject{
|
|
ObjectName: objectName,
|
|
Path: path,
|
|
}
|
|
|
|
return nil
|
|
})
|
|
log.Println("Finished walking upload tree, closing calcs channel")
|
|
close(calcs)
|
|
if err != nil {
|
|
log.Fatalln(err)
|
|
}
|
|
}()
|
|
|
|
// Upload. This loop only ends when results is closed and drained.
|
|
for res := range results {
|
|
log.Println("Uploading", res.ObjectName, "from", res.Path)
|
|
_, err := client.FPutObject(context.Background(), bucket, res.ObjectName, res.Path, minio.PutObjectOptions{
|
|
UserMetadata: map[string]string{
|
|
"X-KDE-SHA256": res.SHA256,
|
|
},
|
|
})
|
|
if err != nil {
|
|
log.Fatalln(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
remote := flag.String("remote", "", "remote url to upload to, e.g. s3+https://storage.kde.org/kde-linux/sysupdate/v2/store")
|
|
flag.Parse()
|
|
|
|
remoteURI, err := url.Parse(*remote)
|
|
if err != nil {
|
|
log.Fatalln("Failed to parse remote URL:", err)
|
|
}
|
|
if remoteURI.Scheme != "s3+https" {
|
|
log.Fatalln("Unsupported remote scheme:", remoteURI.Scheme)
|
|
}
|
|
if remoteURI.Host != "storage.kde.org" {
|
|
log.Fatalln("Unsupported remote host:", remoteURI.Host)
|
|
}
|
|
parts := strings.SplitN(remoteURI.Path[1:], "/", 2)
|
|
if len(parts) != 2 {
|
|
log.Fatalln("Invalid remote path, expected format: /bucket/path")
|
|
}
|
|
bucket := parts[0]
|
|
path := parts[1]
|
|
if bucket == "" {
|
|
log.Fatalln("Invalid remote path, expected format: /bucket/path")
|
|
}
|
|
if path == "" {
|
|
log.Println("Warning: path is empty, uploading to bucket root")
|
|
path = "/"
|
|
}
|
|
|
|
log.Println("Connecting to MinIO at", remoteURI.Host)
|
|
minioClient := connectToMinIO(remoteURI.Host)
|
|
|
|
log.Println("Uploading to bucket", bucket, "with path prefix", path)
|
|
upload(minioClient, bucket, path)
|
|
}
|