This commit is contained in:
crschnick
2025-08-01 17:05:55 +00:00
parent 6e5e8ceb5e
commit be4400b73f
9 changed files with 278 additions and 141 deletions

View File

@@ -159,12 +159,10 @@ public class AppBeaconServer {
});
server.createContext("/mcp", exchange -> {
if (exchange.getRequestMethod().equals("GET")) {
McpServer.HANDLER.doGet(exchange);
} else {
McpServer.HANDLER.doPost(exchange);
var mcpServer = McpServer.get();
if (mcpServer != null) {
mcpServer.createHttpHandler().handle(exchange);
}
exchange.close();
});
server.start();

View File

@@ -65,7 +65,6 @@ public class BaseMode extends OperationMode {
AppJavaOptionsCheck.check();
AppSid.init();
AppBeaconServer.init();
McpServer.init();
AppLayoutModel.init();
if (OperationMode.getStartupMode() == XPipeDaemonMode.GUI) {
@@ -113,6 +112,7 @@ public class BaseMode extends OperationMode {
AppMainWindow.loadingText("loadingConnections");
DataStorage.init();
storageLoaded.countDown();
McpServer.init();
StoreViewState.init();
AppMainWindow.loadingText("loadingSettings");
TrackEvent.info("Connection storage initialization thread completed");

View File

@@ -219,7 +219,7 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
if (!badRequestErrors.isEmpty()) {
String combinedMessage = String.join("; ", badRequestErrors);
this.sendMcpError(exchange, 400, new McpError(combinedMessage));
this.sendError(exchange, 400, combinedMessage);
return;
}
@@ -323,7 +323,7 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
&& jsonrpcRequest.method().equals(McpSchema.METHOD_INITIALIZE)) {
if (!badRequestErrors.isEmpty()) {
String combinedMessage = String.join("; ", badRequestErrors);
this.sendMcpError(exchange, 400, new McpError(combinedMessage));
this.sendError(exchange, 400, combinedMessage);
return;
}
@@ -350,8 +350,7 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
}
catch (Exception e) {
logger.error("Failed to initialize session: {}", e.getMessage());
this.sendMcpError(exchange, 500,
new McpError("Failed to initialize session: " + e.getMessage()));
this.sendError(exchange, 500, "Failed to initialize session: " + e.getMessage());
return;
}
}
@@ -364,15 +363,14 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
if (!badRequestErrors.isEmpty()) {
String combinedMessage = String.join("; ", badRequestErrors);
this.sendMcpError(exchange, 400, new McpError(combinedMessage));
this.sendError(exchange, 400, combinedMessage);
return;
}
McpStreamableServerSession session = this.sessions.get(sessionId);
if (session == null) {
this.sendMcpError(exchange, 404,
new McpError("Session not found: " + sessionId));
this.sendError(exchange, 404, "Session not found: " + sessionId + ". Was the session not refreshed?");
return;
}
@@ -413,19 +411,17 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
}
}
else {
this.sendMcpError(exchange, 500, new McpError("Unknown message type"));
this.sendError(exchange, 500, "Unknown message type");
}
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
this.sendMcpError(exchange, 400,
new McpError("Invalid message format: " + e.getMessage()));
this.sendError(exchange, 400, "Invalid message format: " + e.getMessage());
}
catch (Exception e) {
logger.error("Error handling message: {}", e.getMessage());
try {
this.sendMcpError(exchange, 500,
new McpError("Error processing message: " + e.getMessage()));
this.sendError(exchange, 500, "Error processing message: " + e.getMessage());
}
catch (IOException ex) {
logger.error(FAILED_TO_SEND_ERROR_RESPONSE, ex.getMessage());
@@ -433,6 +429,12 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
}
}
}
public void doOther(HttpExchange exchange)
throws IOException {
sendError(exchange, 405, "Unsupported HTTP method: " + exchange.getRequestMethod());
}
//
// /**
// * Handles DELETE requests for session deletion.
@@ -495,17 +497,6 @@ public class HttpStreamableServerTransportProvider implements McpStreamableServe
// }
// }
public void sendMcpError(HttpExchange exchange, int httpCode, McpError mcpError) throws IOException {
var jsonError = objectMapper.writeValueAsString(mcpError);
var bytes = jsonError.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().set("Content-Type", APPLICATION_JSON);
exchange.getResponseHeaders().add("Content-Encoding", UTF_8);
exchange.sendResponseHeaders(httpCode, bytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(bytes);
}
}
/**
* Sends an SSE event to a client with a specific ID.
* @param writer The writer to send the event through

View File

@@ -1,32 +0,0 @@
package io.xpipe.app.mcp;
import io.xpipe.app.ext.ShellStore;
import io.xpipe.app.storage.DataStorage;
import io.xpipe.app.storage.DataStorageQuery;
import io.xpipe.app.storage.DataStoreEntry;
import io.xpipe.app.storage.DataStoreEntryRef;
import io.xpipe.beacon.BeaconClientException;
public interface McpRequestHandler {
default DataStoreEntryRef<ShellStore> getShellStoreRef(String name) throws BeaconClientException {
var found = DataStorageQuery.queryUserInput(name);
if (found.isEmpty()) {
throw new BeaconClientException("No connection found for input " + name);
}
if (found.size() > 1) {
throw new BeaconClientException("Multiple connections found: "
+ found.stream().map(DataStoreEntry::getName).toList());
}
var e = found.getFirst();
var isShell = e.getStore() instanceof ShellStore;
if (!isShell) {
throw new BeaconClientException(
"Connection " + DataStorage.get().getStorePath(e).toString() + " is not a shell connection");
}
return e.ref();
}
}

View File

@@ -0,0 +1,151 @@
package io.xpipe.app.mcp;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.modelcontextprotocol.server.McpServerFeatures;
import io.modelcontextprotocol.spec.McpSchema;
import io.xpipe.app.storage.DataStorage;
import io.xpipe.beacon.BeaconClientException;
import io.xpipe.beacon.api.CategoryInfoExchange;
import io.xpipe.core.JacksonMapper;
import io.xpipe.core.StorePath;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import org.apache.commons.lang3.ClassUtils;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
public final class McpResources {
@Jacksonized
@Builder
@Value
public static class ConnectionResource {
@NonNull
StorePath name;
@NonNull
StorePath category;
@NonNull
String type;
@NonNull
Object connectionData;
@NonNull
Object usageCategory;
@NonNull
Instant lastUsed;
@NonNull
Instant lastModified;
@NonNull
Object internalState;
@NonNull
Map<String, Object> internalCache;
}
@Jacksonized
@Builder
@Value
public static class CategoryResource {
@NonNull
StorePath name;
@NonNull
Instant lastUsed;
@NonNull
Instant lastModified;
@NonNull
JsonNode config;
}
public static McpServerFeatures.SyncResourceSpecification connections() throws IOException {
McpSchema.Annotations annotations = new McpSchema.Annotations(List.of(McpSchema.Role.ASSISTANT), 1.0);
var resource = McpSchema.Resource.builder()
.uri("xpipe://connections")
.name("xpipe connections")
.description("Available connections in xpipe")
.mimeType("application/json")
.annotations(annotations)
.build();
return new McpServerFeatures.SyncResourceSpecification(resource, (exchange, request) -> {
var list = new ArrayList<McpSchema.ResourceContents>();
for (var e : DataStorage.get().getStoreEntries()) {
if (!e.getValidity().isUsable()) {
continue;
}
var names = DataStorage.get().getStorePath(DataStorage.get().getStoreCategoryIfPresent(e.getCategoryUuid()).orElseThrow()).getNames();
var cat = new StorePath(names.subList(1, names.size()));
var cache = e.getStoreCache().entrySet().stream().filter(stringObjectEntry -> {
return stringObjectEntry.getValue() != null && (ClassUtils.isPrimitiveOrWrapper(stringObjectEntry.getValue().getClass()) ||
stringObjectEntry.getValue() instanceof String);
}).collect(Collectors.toMap(stringObjectEntry -> stringObjectEntry.getKey(), stringObjectEntry -> stringObjectEntry.getValue()));
var resourceData = ConnectionResource.builder().lastModified(e.getLastModified()).lastUsed(e.getLastUsed())
.category(cat).name(DataStorage.get().getStorePath(e)).connectionData(e.getStore()).usageCategory(
e.getProvider().getUsageCategory()).type(e.getProvider().getId()).internalState(
e.getStorePersistentState() != null ? e.getStorePersistentState() : new Object()).internalCache(cache).build();
McpSchema.TextResourceContents c;
try {
c = new McpSchema.TextResourceContents("xpipe://connections/" + e.getUuid(), "application/json",
JacksonMapper.getDefault().writeValueAsString(resourceData));
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
list.add(c);
}
return new McpSchema.ReadResourceResult(list);
});
}
public static McpServerFeatures.SyncResourceSpecification categories() throws IOException {
McpSchema.Annotations annotations = new McpSchema.Annotations(List.of(McpSchema.Role.ASSISTANT), 0.3);
var resource = McpSchema.Resource.builder()
.uri("xpipe://categories")
.name("xpipe categories")
.description("Available categories in xpipe")
.mimeType("application/json")
.annotations(annotations)
.build();
return new McpServerFeatures.SyncResourceSpecification(resource, (exchange, request) -> {
var list = new ArrayList<McpSchema.ResourceContents>();
for (var cat : DataStorage.get().getStoreCategories()) {
var name = DataStorage.get().getStorePath(cat);
var jsonData = CategoryResource.builder()
.lastModified(cat.getLastModified())
.lastUsed(cat.getLastUsed())
.name(name)
.config(JacksonMapper.getDefault().valueToTree(cat.getConfig()))
.build();
McpSchema.TextResourceContents c;
try {
c = new McpSchema.TextResourceContents("xpipe://categories/" + cat.getUuid(), "application/json",
JacksonMapper.getDefault().writeValueAsString(jsonData));
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
list.add(c);
}
return new McpSchema.ReadResourceResult(list);
});
}
}

View File

@@ -1,62 +1,108 @@
package io.xpipe.app.mcp;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.server.McpServerFeatures;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import io.modelcontextprotocol.server.McpSyncServer;
import io.modelcontextprotocol.spec.McpSchema;
import io.xpipe.app.core.AppProperties;
import io.xpipe.app.storage.DataStorage;
import io.xpipe.app.storage.DataStoreCategory;
import io.xpipe.app.storage.DataStoreEntry;
import io.xpipe.app.storage.StorageListener;
import lombok.SneakyThrows;
import lombok.Value;
import java.util.List;
import java.io.IOException;
@Value
public class McpServer {
public static final HttpStreamableServerTransportProvider HANDLER = HttpStreamableServerTransportProvider.builder().mcpEndpoint("/mcp").objectMapper(new ObjectMapper()).build();
private static McpServer INSTANCE;
public static McpServer get() {
return INSTANCE;
}
McpSyncServer mcpSyncServer;
HttpStreamableServerTransportProvider transportProvider;
@SneakyThrows
public static void init() {
var transportProvider = HANDLER;
var transportProvider = HttpStreamableServerTransportProvider.builder().mcpEndpoint("/mcp").objectMapper(new ObjectMapper()).build();
McpSyncServer syncServer = io.modelcontextprotocol.server.McpServer.sync(transportProvider)
.serverInfo("my-server", "1.0.0")
.serverInfo("XPipe", AppProperties.get().getVersion())
.capabilities(McpSchema.ServerCapabilities.builder()
.resources(false, true) // Enable resource support
.resources(true, true) // Enable resource support
.tools(true) // Enable tool support
.prompts(true) // Enable prompt support
.logging() // Enable logging support
.prompts(false) // Enable prompt support
.completions() // Enable completions support
.build())
.build();
syncServer.loggingNotification(McpSchema.LoggingMessageNotification.builder()
.level(McpSchema.LoggingLevel.INFO)
.logger("custom-logger")
.data("Custom log message")
.build());
var syncResourceSpecification = new McpServerFeatures.SyncResourceSpecification(
new McpSchema.Resource("custom://resource", "name", "description", "mime-type", null),
(exchange, request) -> {
// Resource read implementation
return new McpSchema.ReadResourceResult(List.of(new McpSchema.TextResourceContents("custom://resource", "name", "test")));
}
);
// Sync prompt specification
var syncPromptSpecification = new McpServerFeatures.SyncPromptSpecification(
new McpSchema.Prompt("greeting", "description", List.of(
new McpSchema.PromptArgument("name", "description", true)
)),
(exchange, request) -> {
// Prompt implementation
return new McpSchema.GetPromptResult("test", List.of(new McpSchema.PromptMessage(McpSchema.Role.USER, new McpSchema.TextContent("abc"))));
}
);
// Register tools, resources, and prompts
syncServer.addTool(McpTools.readFile());
syncServer.addTool(McpTools.listFiles());
syncServer.addTool(McpTools.getFileInfo());
syncServer.addResource(syncResourceSpecification);
syncServer.addPrompt(syncPromptSpecification);
syncServer.addResource(McpResources.connections());
syncServer.addResource(McpResources.categories());
DataStorage.get().addListener(new StorageListener() {
@Override
public void onStoreListUpdate() {
syncServer.notifyResourcesListChanged();
}
@Override
public void onStoreAdd(DataStoreEntry... entry) {
syncServer.notifyResourcesListChanged();
}
@Override
public void onStoreRemove(DataStoreEntry... entry) {
syncServer.notifyResourcesListChanged();
}
@Override
public void onCategoryAdd(DataStoreCategory category) {
syncServer.notifyResourcesListChanged();
}
@Override
public void onCategoryRemove(DataStoreCategory category) {
syncServer.notifyResourcesListChanged();
}
@Override
public void onEntryCategoryChange() {
syncServer.notifyResourcesListChanged();
}
});
INSTANCE = new McpServer(syncServer, transportProvider);
}
public HttpHandler createHttpHandler() {
return new HttpHandler() {
@Override
public void handle(HttpExchange exchange) throws IOException {
try (exchange) {
if (exchange.getRequestMethod().equals("GET")) {
transportProvider.doGet(exchange);
} else if (exchange.getRequestMethod().equals("POST")) {
transportProvider.doPost(exchange);
} else {
transportProvider.doOther(exchange);
}
}
}
};
}
public static void reset() {
INSTANCE.mcpSyncServer.close();
INSTANCE = null;
}
}

View File

@@ -2,7 +2,12 @@ package io.xpipe.app.mcp;
import io.modelcontextprotocol.server.McpSyncServerExchange;
import io.modelcontextprotocol.spec.McpSchema;
import io.xpipe.app.ext.ShellStore;
import io.xpipe.app.issue.ErrorEventFactory;
import io.xpipe.app.storage.DataStorage;
import io.xpipe.app.storage.DataStorageQuery;
import io.xpipe.app.storage.DataStoreEntry;
import io.xpipe.app.storage.DataStoreEntryRef;
import io.xpipe.beacon.BeaconClientException;
import io.xpipe.core.FilePath;
import lombok.SneakyThrows;
@@ -15,7 +20,7 @@ public interface McpToolHandler extends BiFunction<McpSyncServerExchange, McpSch
return t;
}
class ToolRequest implements McpRequestHandler {
class ToolRequest {
protected final McpSyncServerExchange exchange;
protected final McpSchema.CallToolRequest request;
@@ -58,6 +63,27 @@ public interface McpToolHandler extends BiFunction<McpSyncServerExchange, McpSch
}
return path;
}
public DataStoreEntryRef<ShellStore> getShellStoreRef(String name) throws BeaconClientException {
var found = DataStorageQuery.queryUserInput(name);
if (found.isEmpty()) {
throw new BeaconClientException("No connection found for input " + name);
}
if (found.size() > 1) {
throw new BeaconClientException("Multiple connections found: "
+ found.stream().map(DataStoreEntry::getName).toList());
}
var e = found.getFirst();
var isShell = e.getStore() instanceof ShellStore;
if (!isShell) {
throw new BeaconClientException(
"Connection " + DataStorage.get().getStorePath(e).toString() + " is not a shell connection");
}
return e.ref();
}
}
@Override

View File

@@ -1,22 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"start": {
"type": "string",
"minLength": 1,
"maxLength": 256,
"description": "The starting path to search, required."
},
"name": {
"type": "string",
"minLength": 1,
"maxLength": 256,
"description": "The name of the target file or directory to search, supports fuzzy matching, required."
}
},
"required": [
"start",
"name"
]
}

View File

@@ -1,21 +0,0 @@
{
"name": "read_file",
"description": "Reads file",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to read, can be a file or directory, required."
},
"system": {
"type": "string",
"description": "The system identifier"
}
},
"required": [
"path",
"system"
]
}
}