From 71b5d2d716deb455a1d7edddb1024f42ec27dfdf Mon Sep 17 00:00:00 2001 From: Christopher Schnick Date: Thu, 22 Dec 2022 03:57:15 +0100 Subject: [PATCH] Implement various fixes for sink drains --- .../main/java/io/xpipe/api/DataStores.java | 2 +- .../beacon/exchange/ReadStreamExchange.java | 32 +++++ .../beacon/exchange/WriteStreamExchange.java | 32 +++++ .../beacon}/util/QuietDialogHandler.java | 6 +- beacon/src/main/java/module-info.java | 6 +- .../io/xpipe/core/charsetter/NewLine.java | 2 +- .../io/xpipe/core/dialog/QueryConverter.java | 2 +- .../io/xpipe/core/impl/SinkDrainStore.java | 113 ++++++++++-------- .../java/io/xpipe/core/store/DataStore.java | 8 ++ .../xpipe/core/store/StatefulDataStore.java | 19 +++ .../io/xpipe/core/util/CoreJacksonModule.java | 40 +++++++ .../io/xpipe/core/util/DataStateProvider.java | 25 ++++ core/src/main/java/module-info.java | 1 + .../extension/util/DataStoreFormatter.java | 5 + .../extension/util/ExecScriptHelper.java | 12 +- .../io/xpipe/extension/util/ThreadHelper.java | 14 +++ 16 files changed, 264 insertions(+), 55 deletions(-) create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/ReadStreamExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/WriteStreamExchange.java rename {api/src/main/java/io/xpipe/api => beacon/src/main/java/io/xpipe/beacon}/util/QuietDialogHandler.java (91%) create mode 100644 core/src/main/java/io/xpipe/core/store/StatefulDataStore.java create mode 100644 core/src/main/java/io/xpipe/core/util/DataStateProvider.java diff --git a/api/src/main/java/io/xpipe/api/DataStores.java b/api/src/main/java/io/xpipe/api/DataStores.java index c8720401f..68feb5a5d 100644 --- a/api/src/main/java/io/xpipe/api/DataStores.java +++ b/api/src/main/java/io/xpipe/api/DataStores.java @@ -1,7 +1,7 @@ package io.xpipe.api; import io.xpipe.api.connector.XPipeApiConnection; -import io.xpipe.api.util.QuietDialogHandler; +import io.xpipe.beacon.util.QuietDialogHandler; import io.xpipe.beacon.exchange.cli.StoreAddExchange; import io.xpipe.core.store.DataStore; diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadStreamExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadStreamExchange.java new file mode 100644 index 000000000..58d8ec1ea --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadStreamExchange.java @@ -0,0 +1,32 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.RequestMessage; +import io.xpipe.beacon.ResponseMessage; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +/** + * Stores a stream of data in a storage. + */ +public class ReadStreamExchange implements MessageExchange { + + @Override + public String getId() { + return "readStream"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull String name; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/WriteStreamExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/WriteStreamExchange.java new file mode 100644 index 000000000..1f3b0acc4 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/WriteStreamExchange.java @@ -0,0 +1,32 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.RequestMessage; +import io.xpipe.beacon.ResponseMessage; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +/** + * Stores a stream of data in a storage. + */ +public class WriteStreamExchange implements MessageExchange { + + @Override + public String getId() { + return "writeStream"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull String name; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/api/src/main/java/io/xpipe/api/util/QuietDialogHandler.java b/beacon/src/main/java/io/xpipe/beacon/util/QuietDialogHandler.java similarity index 91% rename from api/src/main/java/io/xpipe/api/util/QuietDialogHandler.java rename to beacon/src/main/java/io/xpipe/beacon/util/QuietDialogHandler.java index 55cefef84..898f1f9f8 100644 --- a/api/src/main/java/io/xpipe/api/util/QuietDialogHandler.java +++ b/beacon/src/main/java/io/xpipe/beacon/util/QuietDialogHandler.java @@ -1,4 +1,4 @@ -package io.xpipe.api.util; +package io.xpipe.beacon.util; import io.xpipe.beacon.BeaconConnection; import io.xpipe.beacon.ClientException; @@ -13,6 +13,10 @@ import java.util.UUID; public class QuietDialogHandler { + public static void handle(DialogReference ref, BeaconConnection connection) throws ClientException { + new QuietDialogHandler(ref, connection, Map.of()).handle(); + } + private final UUID dialogKey; private final BeaconConnection connection; private final Map overrides; diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java index e2609b803..b35b06a93 100644 --- a/beacon/src/main/java/module-info.java +++ b/beacon/src/main/java/module-info.java @@ -1,12 +1,12 @@ import com.fasterxml.jackson.databind.Module; import io.xpipe.beacon.BeaconJacksonModule; import io.xpipe.beacon.BeaconProxyImpl; -import io.xpipe.core.util.ProxyFunction; import io.xpipe.beacon.exchange.*; import io.xpipe.beacon.exchange.api.QueryRawDataExchange; import io.xpipe.beacon.exchange.api.QueryTableDataExchange; import io.xpipe.beacon.exchange.api.QueryTextDataExchange; import io.xpipe.beacon.exchange.cli.*; +import io.xpipe.core.util.ProxyFunction; import io.xpipe.core.util.ProxyProvider; module io.xpipe.beacon { @@ -21,6 +21,8 @@ module io.xpipe.beacon { opens io.xpipe.beacon.exchange.api; opens io.xpipe.beacon.exchange.data; opens io.xpipe.beacon.exchange.cli; + exports io.xpipe.beacon.util; + opens io.xpipe.beacon.util; requires static com.fasterxml.jackson.core; requires static com.fasterxml.jackson.databind; @@ -37,6 +39,8 @@ module io.xpipe.beacon { InstanceExchange, EditStoreExchange, AddSourceExchange, + WriteStreamExchange, + ReadStreamExchange, StoreProviderListExchange, ListCollectionsExchange, ListEntriesExchange, diff --git a/core/src/main/java/io/xpipe/core/charsetter/NewLine.java b/core/src/main/java/io/xpipe/core/charsetter/NewLine.java index ef4f6bf66..da1b4b446 100644 --- a/core/src/main/java/io/xpipe/core/charsetter/NewLine.java +++ b/core/src/main/java/io/xpipe/core/charsetter/NewLine.java @@ -25,7 +25,7 @@ public enum NewLine { .orElseThrow(); } - public static NewLine id(String id) { + public static NewLine byId(String id) { return Arrays.stream(values()) .filter(n -> n.getId().equalsIgnoreCase(id)) .findFirst() diff --git a/core/src/main/java/io/xpipe/core/dialog/QueryConverter.java b/core/src/main/java/io/xpipe/core/dialog/QueryConverter.java index 12f166a6c..3aba5104a 100644 --- a/core/src/main/java/io/xpipe/core/dialog/QueryConverter.java +++ b/core/src/main/java/io/xpipe/core/dialog/QueryConverter.java @@ -14,7 +14,7 @@ public abstract class QueryConverter { public static final QueryConverter NEW_LINE = new QueryConverter() { @Override protected NewLine fromString(String s) { - return NewLine.id(s); + return NewLine.byId(s); } @Override diff --git a/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java b/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java index 8f8dc6687..04e1dc675 100644 --- a/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java +++ b/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java @@ -1,33 +1,25 @@ package io.xpipe.core.impl; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeName; import io.xpipe.core.charsetter.NewLine; import io.xpipe.core.charsetter.StreamCharset; import io.xpipe.core.store.DataFlow; import io.xpipe.core.store.KnownFormatStreamDataStore; +import io.xpipe.core.store.StatefulDataStore; import io.xpipe.core.util.JacksonizedValue; -import lombok.Builder; import lombok.Getter; -import lombok.Setter; import lombok.experimental.SuperBuilder; import lombok.extern.jackson.Jacksonized; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.nio.channels.Channels; import java.nio.channels.Pipe; -/* -TODO: Properly enter closed State - */ - @JsonTypeName("drain") @SuperBuilder @Jacksonized @Getter -public class SinkDrainStore extends JacksonizedValue implements KnownFormatStreamDataStore { +public class SinkDrainStore extends JacksonizedValue implements KnownFormatStreamDataStore, StatefulDataStore { public static enum State { NONE_CONNECTED, @@ -37,29 +29,38 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea CLOSED } - private final String description; private final StreamCharset charset; private final NewLine newLine; - @JsonIgnore - @Setter - @Builder.Default - private State state = State.NONE_CONNECTED; + public State getState() { + return getState("state", State.class, State.NONE_CONNECTED); + } - @JsonIgnore - private Pipe pipe; + private void setState(State n) { + setState("state", n); + } + + public Pipe getOrOpenPipe() { + return getOrComputeState("pipe", Pipe.class, () -> { + try { + return Pipe.open(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } @Override public DataFlow getFlow() { - if (state == State.NONE_CONNECTED) { + if (getState() == State.NONE_CONNECTED) { return DataFlow.INPUT_OR_OUTPUT; } - if (state == State.PRODUCER_CONNECTED) { + if (getState() == State.PRODUCER_CONNECTED) { return DataFlow.INPUT; } - if (state == State.CONSUMER_CONNECTED) { + if (getState() == State.CONSUMER_CONNECTED) { return DataFlow.OUTPUT; } @@ -67,7 +68,7 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea } private void waitForOpen() { - while (state != State.OPEN) { + while (getState() != State.OPEN) { try { Thread.sleep(200); } catch (InterruptedException e) { @@ -76,29 +77,44 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea } } + @Override + public boolean shouldPersist() { + return getState() != State.CLOSED; + } + + @Override + public boolean shouldSave() { + return false; + } + @Override public boolean canOpen() throws Exception { - return state == State.PRODUCER_CONNECTED; + return getState() == State.PRODUCER_CONNECTED; } @Override public InputStream openInput() throws Exception { checkState(false); - if (state == State.PRODUCER_CONNECTED) { - state = State.OPEN; + if (getState() == State.PRODUCER_CONNECTED) { + setState(State.OPEN); } - if (state == State.NONE_CONNECTED) { - state = State.CONSUMER_CONNECTED; - waitForOpen(); + if (getState() == State.NONE_CONNECTED) { + setState(State.CONSUMER_CONNECTED); + //waitForOpen(); } try { - openPipe(); - return Channels.newInputStream(pipe.source()); + return new FilterInputStream(Channels.newInputStream(getOrOpenPipe().source())) { + @Override + public void close() throws IOException { + super.close(); + setState(State.CLOSED); + } + }; } catch (Exception ex) { - state = State.CLOSED; + setState(State.CLOSED); throw ex; } } @@ -107,44 +123,43 @@ public class SinkDrainStore extends JacksonizedValue implements KnownFormatStrea public OutputStream openOutput() throws Exception { checkState(true); - if (state == State.CONSUMER_CONNECTED) { - state = State.OPEN; + if (getState() == State.CONSUMER_CONNECTED) { + setState(State.OPEN); } - if (state == State.NONE_CONNECTED) { - state = State.PRODUCER_CONNECTED; - waitForOpen(); + if (getState() == State.NONE_CONNECTED) { + setState(State.PRODUCER_CONNECTED); + //waitForOpen(); } try { - openPipe(); - return Channels.newOutputStream(pipe.sink()); + return new FilterOutputStream(Channels.newOutputStream(getOrOpenPipe().sink())) { + @Override + public void close() throws IOException { + super.close(); + setState(State.CLOSED); + } + }; } catch (Exception ex) { - state = State.CLOSED; + setState(State.CLOSED); throw ex; } } - private void openPipe() throws IOException { - if (pipe == null) { - pipe = Pipe.open(); - } - } - private void checkState(boolean isProducer) { - if (state == State.CLOSED) { + if (getState() == State.CLOSED) { throw new IllegalStateException("Drain has already been closed"); } - if (state == State.OPEN) { + if (getState() == State.OPEN) { throw new IllegalStateException("Drain is already open"); } - if (state == State.PRODUCER_CONNECTED && isProducer) { + if (getState() == State.PRODUCER_CONNECTED && isProducer) { throw new IllegalStateException("Producer is already connected"); } - if (state == State.CONSUMER_CONNECTED && !isProducer) { + if (getState() == State.CONSUMER_CONNECTED && !isProducer) { throw new IllegalStateException("Consumer is already connected"); } } diff --git a/core/src/main/java/io/xpipe/core/store/DataStore.java b/core/src/main/java/io/xpipe/core/store/DataStore.java index 688b48d8e..1fa1a139b 100644 --- a/core/src/main/java/io/xpipe/core/store/DataStore.java +++ b/core/src/main/java/io/xpipe/core/store/DataStore.java @@ -18,6 +18,14 @@ import java.util.Optional; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public interface DataStore { + default boolean shouldPersist() { + return true; + } + + default boolean shouldSave() { + return true; + } + default boolean isComplete() { try { checkComplete(); diff --git a/core/src/main/java/io/xpipe/core/store/StatefulDataStore.java b/core/src/main/java/io/xpipe/core/store/StatefulDataStore.java new file mode 100644 index 000000000..70ab1793d --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/StatefulDataStore.java @@ -0,0 +1,19 @@ +package io.xpipe.core.store; + +import io.xpipe.core.util.DataStateProvider; + +import java.util.function.Supplier; + +public interface StatefulDataStore extends DataStore { + + default T getState(String key, Class c, T def) { + return DataStateProvider.get().getState(this, key, c, () -> def); + } + default T getOrComputeState(String key, Class c, Supplier def) { + return DataStateProvider.get().getState(this, key, c, def); + } + + default void setState(String key, Object val) { + DataStateProvider.get().putState(this, key, val); + } +} diff --git a/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java b/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java index 5d65b9ea5..e9fa6c426 100644 --- a/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java +++ b/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java @@ -12,6 +12,8 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; +import io.xpipe.core.charsetter.NewLine; +import io.xpipe.core.charsetter.StreamCharset; import io.xpipe.core.data.type.ArrayType; import io.xpipe.core.data.type.TupleType; import io.xpipe.core.data.type.ValueType; @@ -55,6 +57,12 @@ public class CoreJacksonModule extends SimpleModule { addSerializer(Charset.class, new CharsetSerializer()); addDeserializer(Charset.class, new CharsetDeserializer()); + addSerializer(StreamCharset.class, new StreamCharsetSerializer()); + addDeserializer(StreamCharset.class, new StreamCharsetDeserializer()); + + addSerializer(NewLine.class, new NewLineSerializer()); + addDeserializer(NewLine.class, new NewLineDeserializer()); + addSerializer(Path.class, new LocalPathSerializer()); addDeserializer(Path.class, new LocalPathDeserializer()); @@ -122,6 +130,38 @@ public class CoreJacksonModule extends SimpleModule { } } + public static class NewLineSerializer extends JsonSerializer { + + @Override + public void serialize(NewLine value, JsonGenerator jgen, SerializerProvider provider) throws IOException { + jgen.writeString(value.getId()); + } + } + + public static class NewLineDeserializer extends JsonDeserializer { + + @Override + public NewLine deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + return NewLine.byId(p.getValueAsString()); + } + } + + public static class StreamCharsetSerializer extends JsonSerializer { + + @Override + public void serialize(StreamCharset value, JsonGenerator jgen, SerializerProvider provider) throws IOException { + jgen.writeString(value.toString()); + } + } + + public static class StreamCharsetDeserializer extends JsonDeserializer { + + @Override + public StreamCharset deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + return StreamCharset.get(p.getValueAsString()); + } + } + public static class LocalPathSerializer extends JsonSerializer { @Override diff --git a/core/src/main/java/io/xpipe/core/util/DataStateProvider.java b/core/src/main/java/io/xpipe/core/util/DataStateProvider.java new file mode 100644 index 000000000..24d8e400f --- /dev/null +++ b/core/src/main/java/io/xpipe/core/util/DataStateProvider.java @@ -0,0 +1,25 @@ +package io.xpipe.core.util; + +import io.xpipe.core.store.DataStore; + +import java.util.ServiceLoader; +import java.util.function.Supplier; + +public abstract class DataStateProvider { + + private static DataStateProvider INSTANCE; + + public static DataStateProvider get() { + if (INSTANCE == null) { + INSTANCE = ServiceLoader.load(ModuleLayer.boot(), DataStateProvider.class) + .findFirst() + .orElseThrow(); + } + + return INSTANCE; + } + + public abstract void putState(DataStore store, String key, Object value); + + public abstract T getState(DataStore store, String key, Class c, Supplier def); +} diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index f3a2aec04..8f90f2761 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -27,6 +27,7 @@ open module io.xpipe.core { uses LocalProcessControlProvider; uses io.xpipe.core.util.ProxyProvider; uses io.xpipe.core.util.ProxyManagerProvider; + uses io.xpipe.core.util.DataStateProvider; provides WriteMode with WriteMode.Replace, WriteMode.Append, WriteMode.Prepend; provides com.fasterxml.jackson.databind.Module with diff --git a/extension/src/main/java/io/xpipe/extension/util/DataStoreFormatter.java b/extension/src/main/java/io/xpipe/extension/util/DataStoreFormatter.java index b251a79c0..9f9de2d54 100644 --- a/extension/src/main/java/io/xpipe/extension/util/DataStoreFormatter.java +++ b/extension/src/main/java/io/xpipe/extension/util/DataStoreFormatter.java @@ -47,7 +47,12 @@ public class DataStoreFormatter { public static String toName(DataStore input) { return toName(input, Integer.MAX_VALUE); } + public static String toName(DataStore input, int length) { + if (input == null) { + return "?"; + } + var named = XPipeDaemon.getInstance().getStoreName(input); if (named.isPresent()) { return cut(named.get(), length); diff --git a/extension/src/main/java/io/xpipe/extension/util/ExecScriptHelper.java b/extension/src/main/java/io/xpipe/extension/util/ExecScriptHelper.java index 011466f94..3f6ae46f8 100644 --- a/extension/src/main/java/io/xpipe/extension/util/ExecScriptHelper.java +++ b/extension/src/main/java/io/xpipe/extension/util/ExecScriptHelper.java @@ -2,6 +2,7 @@ package io.xpipe.extension.util; import io.xpipe.core.impl.FileNames; import io.xpipe.core.process.ShellProcessControl; +import io.xpipe.core.store.ShellStore; import io.xpipe.core.util.XPipeTempDirectory; import lombok.SneakyThrows; @@ -10,9 +11,15 @@ import java.util.Objects; public class ExecScriptHelper { public static int getConnectionHash(String command) { - return Objects.hash(command); + return Math.abs(Objects.hash(command)); } + @SneakyThrows + public static String createLocalExecScript(String content) { + try (var l = ShellStore.local().create().start()) { + return createExecScript(l, content); + } + } @SneakyThrows public static String createExecScript(ShellProcessControl processControl, String content) { var fileName = "exec-" + getConnectionHash(content); @@ -35,6 +42,9 @@ public class ExecScriptHelper { c.getStdin().write(content.getBytes(processControl.getCharset())); c.closeStdin(); } + + processControl.restart(); + return file; } } diff --git a/extension/src/main/java/io/xpipe/extension/util/ThreadHelper.java b/extension/src/main/java/io/xpipe/extension/util/ThreadHelper.java index bd9d5fe8e..62ebd32ec 100644 --- a/extension/src/main/java/io/xpipe/extension/util/ThreadHelper.java +++ b/extension/src/main/java/io/xpipe/extension/util/ThreadHelper.java @@ -1,5 +1,6 @@ package io.xpipe.extension.util; +import io.xpipe.extension.event.ErrorEvent; import org.apache.commons.lang3.function.FailableRunnable; public class ThreadHelper { @@ -18,6 +19,19 @@ public class ThreadHelper { return t; } + public static Thread runFailableAsync(FailableRunnable r) { + var t = new Thread(() -> { + try { + r.run(); + } catch (Throwable e) { + ErrorEvent.fromThrowable(e).handle(); + } + }); + t.setDaemon(true); + t.start(); + return t; + } + public static Thread create(String name, boolean daemon, Runnable r) { var t = new Thread(r); t.setDaemon(daemon);