package p2p import ( "context" "errors" "fmt" "io" "net" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/node" "github.com/mudler/xlog" ) func (f *FederatedServer) Start(ctx context.Context) error { n, err := NewNode(f.p2ptoken) if err != nil { return fmt.Errorf("creating a new node: %w", err) } err = n.Start(ctx) if err != nil { return fmt.Errorf("creating a new node: %w", err) } if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel schema.NodeData) { xlog.Debug("Discovered node", "node", tunnel.ID) }, false); err != nil { return err } return f.proxy(ctx, n) } func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { xlog.Info("Allocating service", "service", fs.service, "address", fs.listenAddr) // Open local port for listening l, err := net.Listen("tcp", fs.listenAddr) if err != nil { xlog.Error("Error listening", "error", err) return err } go func() { <-ctx.Done() l.Close() }() nodeAnnounce(ctx, node) defer l.Close() for { select { case <-ctx.Done(): return errors.New("context canceled") default: xlog.Debug("New connection", "address", l.Addr().String()) // Listen for an incoming connection. conn, err := l.Accept() if err != nil { fmt.Println("Error accepting: ", err.Error()) continue } // Handle connections in a new goroutine, forwarding to the p2p service go func() { workerID := "" if fs.workerTarget != "" { workerID = fs.workerTarget } else if fs.loadBalanced { xlog.Debug("Load balancing request") workerID = fs.SelectLeastUsedServer() if workerID == "" { xlog.Debug("Least used server not found, selecting random") workerID = fs.RandomServer() } } else { workerID = fs.RandomServer() } if workerID == "" { xlog.Error("No available nodes yet") fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect") return } xlog.Debug("Selected node", "node", workerID) nodeData, exists := GetNode(fs.service, workerID) if !exists { xlog.Error("Node not found", "node", workerID) fs.sendHTMLResponse(conn, 404, "Node not found") return } proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) if fs.loadBalanced { fs.RecordRequest(workerID) } }() } } } // sendHTMLResponse sends a basic HTML response with a status code and a message. // This is extracted to make the HTML content maintainable. func (fs *FederatedServer) sendHTMLResponse(conn net.Conn, statusCode int, message string) { defer conn.Close() // Define the HTML content separately for easier maintenance. htmlContent := fmt.Sprintf("

%s

\r\n", message) // Create the HTTP response with dynamic status code and content. response := fmt.Sprintf( "HTTP/1.1 %d %s\r\n"+ "Content-Type: text/html\r\n"+ "Connection: close\r\n"+ "\r\n"+ "%s", statusCode, getHTTPStatusText(statusCode), htmlContent, ) // Write the response to the client connection. _, writeErr := io.WriteString(conn, response) if writeErr != nil { xlog.Error("Error writing response to client", "error", writeErr) } } // getHTTPStatusText returns a textual representation of HTTP status codes. func getHTTPStatusText(statusCode int) string { switch statusCode { case 503: return "Service Unavailable" case 404: return "Not Found" case 200: return "OK" default: return "Unknown Status" } }