diff --git a/app/src/main/java/io/xpipe/app/beacon/AppBeaconServer.java b/app/src/main/java/io/xpipe/app/beacon/AppBeaconServer.java index 9be5ecf02..3d3ff3a2e 100644 --- a/app/src/main/java/io/xpipe/app/beacon/AppBeaconServer.java +++ b/app/src/main/java/io/xpipe/app/beacon/AppBeaconServer.java @@ -159,12 +159,10 @@ public class AppBeaconServer { }); server.createContext("/mcp", exchange -> { - if (exchange.getRequestMethod().equals("GET")) { - McpServer.HANDLER.doGet(exchange); - } else { - McpServer.HANDLER.doPost(exchange); + var mcpServer = McpServer.get(); + if (mcpServer != null) { + mcpServer.createHttpHandler().handle(exchange); } - exchange.close(); }); server.start(); diff --git a/app/src/main/java/io/xpipe/app/core/mode/BaseMode.java b/app/src/main/java/io/xpipe/app/core/mode/BaseMode.java index eac386839..4966e6983 100644 --- a/app/src/main/java/io/xpipe/app/core/mode/BaseMode.java +++ b/app/src/main/java/io/xpipe/app/core/mode/BaseMode.java @@ -65,7 +65,6 @@ public class BaseMode extends OperationMode { AppJavaOptionsCheck.check(); AppSid.init(); AppBeaconServer.init(); - McpServer.init(); AppLayoutModel.init(); if (OperationMode.getStartupMode() == XPipeDaemonMode.GUI) { @@ -113,6 +112,7 @@ public class BaseMode extends OperationMode { AppMainWindow.loadingText("loadingConnections"); DataStorage.init(); storageLoaded.countDown(); + McpServer.init(); StoreViewState.init(); AppMainWindow.loadingText("loadingSettings"); TrackEvent.info("Connection storage initialization thread completed"); diff --git a/app/src/main/java/io/xpipe/app/mcp/HttpStreamableServerTransportProvider.java b/app/src/main/java/io/xpipe/app/mcp/HttpStreamableServerTransportProvider.java index 79e7aee72..54ea08ece 100644 --- a/app/src/main/java/io/xpipe/app/mcp/HttpStreamableServerTransportProvider.java +++ b/app/src/main/java/io/xpipe/app/mcp/HttpStreamableServerTransportProvider.java @@ -219,7 +219,7 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe if (!badRequestErrors.isEmpty()) { String combinedMessage = String.join("; ", badRequestErrors); - this.sendMcpError(exchange, 400, new McpError(combinedMessage)); + this.sendError(exchange, 400, combinedMessage); return; } @@ -323,7 +323,7 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe && jsonrpcRequest.method().equals(McpSchema.METHOD_INITIALIZE)) { if (!badRequestErrors.isEmpty()) { String combinedMessage = String.join("; ", badRequestErrors); - this.sendMcpError(exchange, 400, new McpError(combinedMessage)); + this.sendError(exchange, 400, combinedMessage); return; } @@ -350,8 +350,7 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe } catch (Exception e) { logger.error("Failed to initialize session: {}", e.getMessage()); - this.sendMcpError(exchange, 500, - new McpError("Failed to initialize session: " + e.getMessage())); + this.sendError(exchange, 500, "Failed to initialize session: " + e.getMessage()); return; } } @@ -364,15 +363,14 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe if (!badRequestErrors.isEmpty()) { String combinedMessage = String.join("; ", badRequestErrors); - this.sendMcpError(exchange, 400, new McpError(combinedMessage)); + this.sendError(exchange, 400, combinedMessage); return; } McpStreamableServerSession session = this.sessions.get(sessionId); if (session == null) { - this.sendMcpError(exchange, 404, - new McpError("Session not found: " + sessionId)); + this.sendError(exchange, 404, "Session not found: " + sessionId + ". Was the session not refreshed?"); return; } @@ -413,19 +411,17 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe } } else { - this.sendMcpError(exchange, 500, new McpError("Unknown message type")); + this.sendError(exchange, 500, "Unknown message type"); } } catch (IllegalArgumentException | IOException e) { logger.error("Failed to deserialize message: {}", e.getMessage()); - this.sendMcpError(exchange, 400, - new McpError("Invalid message format: " + e.getMessage())); + this.sendError(exchange, 400, "Invalid message format: " + e.getMessage()); } catch (Exception e) { logger.error("Error handling message: {}", e.getMessage()); try { - this.sendMcpError(exchange, 500, - new McpError("Error processing message: " + e.getMessage())); + this.sendError(exchange, 500, "Error processing message: " + e.getMessage()); } catch (IOException ex) { logger.error(FAILED_TO_SEND_ERROR_RESPONSE, ex.getMessage()); @@ -433,6 +429,12 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe } } } + + + public void doOther(HttpExchange exchange) + throws IOException { + sendError(exchange, 405, "Unsupported HTTP method: " + exchange.getRequestMethod()); + } // // /** // * Handles DELETE requests for session deletion. @@ -495,17 +497,6 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe // } // } - public void sendMcpError(HttpExchange exchange, int httpCode, McpError mcpError) throws IOException { - var jsonError = objectMapper.writeValueAsString(mcpError); - var bytes = jsonError.getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().set("Content-Type", APPLICATION_JSON); - exchange.getResponseHeaders().add("Content-Encoding", UTF_8); - exchange.sendResponseHeaders(httpCode, bytes.length); - try (OutputStream os = exchange.getResponseBody()) { - os.write(bytes); - } - } - /** * Sends an SSE event to a client with a specific ID. * @param writer The writer to send the event through diff --git a/app/src/main/java/io/xpipe/app/mcp/McpRequestHandler.java b/app/src/main/java/io/xpipe/app/mcp/McpRequestHandler.java deleted file mode 100644 index b2f1c4940..000000000 --- a/app/src/main/java/io/xpipe/app/mcp/McpRequestHandler.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.xpipe.app.mcp; - -import io.xpipe.app.ext.ShellStore; -import io.xpipe.app.storage.DataStorage; -import io.xpipe.app.storage.DataStorageQuery; -import io.xpipe.app.storage.DataStoreEntry; -import io.xpipe.app.storage.DataStoreEntryRef; -import io.xpipe.beacon.BeaconClientException; - -public interface McpRequestHandler { - - default DataStoreEntryRef getShellStoreRef(String name) throws BeaconClientException { - var found = DataStorageQuery.queryUserInput(name); - if (found.isEmpty()) { - throw new BeaconClientException("No connection found for input " + name); - } - - if (found.size() > 1) { - throw new BeaconClientException("Multiple connections found: " - + found.stream().map(DataStoreEntry::getName).toList()); - } - - var e = found.getFirst(); - var isShell = e.getStore() instanceof ShellStore; - if (!isShell) { - throw new BeaconClientException( - "Connection " + DataStorage.get().getStorePath(e).toString() + " is not a shell connection"); - } - - return e.ref(); - } -} diff --git a/app/src/main/java/io/xpipe/app/mcp/McpResources.java b/app/src/main/java/io/xpipe/app/mcp/McpResources.java new file mode 100644 index 000000000..3475852ba --- /dev/null +++ b/app/src/main/java/io/xpipe/app/mcp/McpResources.java @@ -0,0 +1,151 @@ +package io.xpipe.app.mcp; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import io.modelcontextprotocol.server.McpServerFeatures; +import io.modelcontextprotocol.spec.McpSchema; +import io.xpipe.app.storage.DataStorage; +import io.xpipe.beacon.BeaconClientException; +import io.xpipe.beacon.api.CategoryInfoExchange; +import io.xpipe.core.JacksonMapper; +import io.xpipe.core.StorePath; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; +import org.apache.commons.lang3.ClassUtils; + +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + +public final class McpResources { + + @Jacksonized + @Builder + @Value + public static class ConnectionResource { + @NonNull + StorePath name; + + @NonNull + StorePath category; + + @NonNull + String type; + + @NonNull + Object connectionData; + + @NonNull + Object usageCategory; + + @NonNull + Instant lastUsed; + + @NonNull + Instant lastModified; + + @NonNull + Object internalState; + + @NonNull + Map internalCache; + } + + + @Jacksonized + @Builder + @Value + public static class CategoryResource { + @NonNull + StorePath name; + + @NonNull + Instant lastUsed; + + @NonNull + Instant lastModified; + + @NonNull + JsonNode config; + } + + public static McpServerFeatures.SyncResourceSpecification connections() throws IOException { + McpSchema.Annotations annotations = new McpSchema.Annotations(List.of(McpSchema.Role.ASSISTANT), 1.0); + var resource = McpSchema.Resource.builder() + .uri("xpipe://connections") + .name("xpipe connections") + .description("Available connections in xpipe") + .mimeType("application/json") + .annotations(annotations) + .build(); + return new McpServerFeatures.SyncResourceSpecification(resource, (exchange, request) -> { + var list = new ArrayList(); + for (var e : DataStorage.get().getStoreEntries()) { + if (!e.getValidity().isUsable()) { + continue; + } + + var names = DataStorage.get().getStorePath(DataStorage.get().getStoreCategoryIfPresent(e.getCategoryUuid()).orElseThrow()).getNames(); + var cat = new StorePath(names.subList(1, names.size())); + var cache = e.getStoreCache().entrySet().stream().filter(stringObjectEntry -> { + return stringObjectEntry.getValue() != null && (ClassUtils.isPrimitiveOrWrapper(stringObjectEntry.getValue().getClass()) || + stringObjectEntry.getValue() instanceof String); + }).collect(Collectors.toMap(stringObjectEntry -> stringObjectEntry.getKey(), stringObjectEntry -> stringObjectEntry.getValue())); + + var resourceData = ConnectionResource.builder().lastModified(e.getLastModified()).lastUsed(e.getLastUsed()) + .category(cat).name(DataStorage.get().getStorePath(e)).connectionData(e.getStore()).usageCategory( + e.getProvider().getUsageCategory()).type(e.getProvider().getId()).internalState( + e.getStorePersistentState() != null ? e.getStorePersistentState() : new Object()).internalCache(cache).build(); + + McpSchema.TextResourceContents c; + try { + c = new McpSchema.TextResourceContents("xpipe://connections/" + e.getUuid(), "application/json", + JacksonMapper.getDefault().writeValueAsString(resourceData)); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + list.add(c); + } + + return new McpSchema.ReadResourceResult(list); + }); + } + + + public static McpServerFeatures.SyncResourceSpecification categories() throws IOException { + McpSchema.Annotations annotations = new McpSchema.Annotations(List.of(McpSchema.Role.ASSISTANT), 0.3); + var resource = McpSchema.Resource.builder() + .uri("xpipe://categories") + .name("xpipe categories") + .description("Available categories in xpipe") + .mimeType("application/json") + .annotations(annotations) + .build(); + return new McpServerFeatures.SyncResourceSpecification(resource, (exchange, request) -> { + var list = new ArrayList(); + for (var cat : DataStorage.get().getStoreCategories()) { + var name = DataStorage.get().getStorePath(cat); + var jsonData = CategoryResource.builder() + .lastModified(cat.getLastModified()) + .lastUsed(cat.getLastUsed()) + .name(name) + .config(JacksonMapper.getDefault().valueToTree(cat.getConfig())) + .build(); + + McpSchema.TextResourceContents c; + try { + c = new McpSchema.TextResourceContents("xpipe://categories/" + cat.getUuid(), "application/json", + JacksonMapper.getDefault().writeValueAsString(jsonData)); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + list.add(c); + } + + return new McpSchema.ReadResourceResult(list); + }); + } +} diff --git a/app/src/main/java/io/xpipe/app/mcp/McpServer.java b/app/src/main/java/io/xpipe/app/mcp/McpServer.java index 01e971c56..cb5557c46 100644 --- a/app/src/main/java/io/xpipe/app/mcp/McpServer.java +++ b/app/src/main/java/io/xpipe/app/mcp/McpServer.java @@ -1,62 +1,108 @@ package io.xpipe.app.mcp; import com.fasterxml.jackson.databind.ObjectMapper; -import io.modelcontextprotocol.server.McpServerFeatures; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; import io.modelcontextprotocol.server.McpSyncServer; import io.modelcontextprotocol.spec.McpSchema; +import io.xpipe.app.core.AppProperties; +import io.xpipe.app.storage.DataStorage; +import io.xpipe.app.storage.DataStoreCategory; +import io.xpipe.app.storage.DataStoreEntry; +import io.xpipe.app.storage.StorageListener; import lombok.SneakyThrows; +import lombok.Value; -import java.util.List; +import java.io.IOException; +@Value public class McpServer { - public static final HttpStreamableServerTransportProvider HANDLER = HttpStreamableServerTransportProvider.builder().mcpEndpoint("/mcp").objectMapper(new ObjectMapper()).build(); + private static McpServer INSTANCE; + + public static McpServer get() { + return INSTANCE; + } + + McpSyncServer mcpSyncServer; + HttpStreamableServerTransportProvider transportProvider; @SneakyThrows public static void init() { - var transportProvider = HANDLER; + var transportProvider = HttpStreamableServerTransportProvider.builder().mcpEndpoint("/mcp").objectMapper(new ObjectMapper()).build(); McpSyncServer syncServer = io.modelcontextprotocol.server.McpServer.sync(transportProvider) - .serverInfo("my-server", "1.0.0") + .serverInfo("XPipe", AppProperties.get().getVersion()) .capabilities(McpSchema.ServerCapabilities.builder() - .resources(false, true) // Enable resource support + .resources(true, true) // Enable resource support .tools(true) // Enable tool support - .prompts(true) // Enable prompt support - .logging() // Enable logging support + .prompts(false) // Enable prompt support .completions() // Enable completions support .build()) .build(); - syncServer.loggingNotification(McpSchema.LoggingMessageNotification.builder() - .level(McpSchema.LoggingLevel.INFO) - .logger("custom-logger") - .data("Custom log message") - .build()); - - var syncResourceSpecification = new McpServerFeatures.SyncResourceSpecification( - new McpSchema.Resource("custom://resource", "name", "description", "mime-type", null), - (exchange, request) -> { - // Resource read implementation - return new McpSchema.ReadResourceResult(List.of(new McpSchema.TextResourceContents("custom://resource", "name", "test"))); - } - ); - - // Sync prompt specification - var syncPromptSpecification = new McpServerFeatures.SyncPromptSpecification( - new McpSchema.Prompt("greeting", "description", List.of( - new McpSchema.PromptArgument("name", "description", true) - )), - (exchange, request) -> { - // Prompt implementation - return new McpSchema.GetPromptResult("test", List.of(new McpSchema.PromptMessage(McpSchema.Role.USER, new McpSchema.TextContent("abc")))); - } - ); - - // Register tools, resources, and prompts syncServer.addTool(McpTools.readFile()); syncServer.addTool(McpTools.listFiles()); syncServer.addTool(McpTools.getFileInfo()); - syncServer.addResource(syncResourceSpecification); - syncServer.addPrompt(syncPromptSpecification); + + syncServer.addResource(McpResources.connections()); + syncServer.addResource(McpResources.categories()); + + DataStorage.get().addListener(new StorageListener() { + @Override + public void onStoreListUpdate() { + syncServer.notifyResourcesListChanged(); + } + + @Override + public void onStoreAdd(DataStoreEntry... entry) { + syncServer.notifyResourcesListChanged(); + } + + @Override + public void onStoreRemove(DataStoreEntry... entry) { + syncServer.notifyResourcesListChanged(); + } + + @Override + public void onCategoryAdd(DataStoreCategory category) { + syncServer.notifyResourcesListChanged(); + } + + @Override + public void onCategoryRemove(DataStoreCategory category) { + syncServer.notifyResourcesListChanged(); + } + + @Override + public void onEntryCategoryChange() { + syncServer.notifyResourcesListChanged(); + } + }); + + INSTANCE = new McpServer(syncServer, transportProvider); + } + + public HttpHandler createHttpHandler() { + return new HttpHandler() { + + @Override + public void handle(HttpExchange exchange) throws IOException { + try (exchange) { + if (exchange.getRequestMethod().equals("GET")) { + transportProvider.doGet(exchange); + } else if (exchange.getRequestMethod().equals("POST")) { + transportProvider.doPost(exchange); + } else { + transportProvider.doOther(exchange); + } + } + } + }; + } + + public static void reset() { + INSTANCE.mcpSyncServer.close(); + INSTANCE = null; } } diff --git a/app/src/main/java/io/xpipe/app/mcp/McpToolHandler.java b/app/src/main/java/io/xpipe/app/mcp/McpToolHandler.java index c060e2735..5f4771454 100644 --- a/app/src/main/java/io/xpipe/app/mcp/McpToolHandler.java +++ b/app/src/main/java/io/xpipe/app/mcp/McpToolHandler.java @@ -2,7 +2,12 @@ package io.xpipe.app.mcp; import io.modelcontextprotocol.server.McpSyncServerExchange; import io.modelcontextprotocol.spec.McpSchema; +import io.xpipe.app.ext.ShellStore; import io.xpipe.app.issue.ErrorEventFactory; +import io.xpipe.app.storage.DataStorage; +import io.xpipe.app.storage.DataStorageQuery; +import io.xpipe.app.storage.DataStoreEntry; +import io.xpipe.app.storage.DataStoreEntryRef; import io.xpipe.beacon.BeaconClientException; import io.xpipe.core.FilePath; import lombok.SneakyThrows; @@ -15,7 +20,7 @@ public interface McpToolHandler extends BiFunction getShellStoreRef(String name) throws BeaconClientException { + var found = DataStorageQuery.queryUserInput(name); + if (found.isEmpty()) { + throw new BeaconClientException("No connection found for input " + name); + } + + if (found.size() > 1) { + throw new BeaconClientException("Multiple connections found: " + + found.stream().map(DataStoreEntry::getName).toList()); + } + + var e = found.getFirst(); + var isShell = e.getStore() instanceof ShellStore; + if (!isShell) { + throw new BeaconClientException( + "Connection " + DataStorage.get().getStorePath(e).toString() + " is not a shell connection"); + } + + return e.ref(); + } } @Override diff --git a/app/src/main/java/io/xpipe/app/mcp/schema/find.json b/app/src/main/java/io/xpipe/app/mcp/schema/find.json deleted file mode 100644 index d4d586113..000000000 --- a/app/src/main/java/io/xpipe/app/mcp/schema/find.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "start": { - "type": "string", - "minLength": 1, - "maxLength": 256, - "description": "The starting path to search, required." - }, - "name": { - "type": "string", - "minLength": 1, - "maxLength": 256, - "description": "The name of the target file or directory to search, supports fuzzy matching, required." - } - }, - "required": [ - "start", - "name" - ] -} diff --git a/app/src/main/java/io/xpipe/app/mcp/schema/read.json b/app/src/main/java/io/xpipe/app/mcp/schema/read.json deleted file mode 100644 index 6d126934a..000000000 --- a/app/src/main/java/io/xpipe/app/mcp/schema/read.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "name": "read_file", - "description": "Reads file", - "input_schema": { - "type": "object", - "properties": { - "path": { - "type": "string", - "description": "The path to read, can be a file or directory, required." - }, - "system": { - "type": "string", - "description": "The system identifier" - } - }, - "required": [ - "path", - "system" - ] - } -} \ No newline at end of file