diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java index 0c7b8a47a..939baf2a1 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java @@ -3,6 +3,7 @@ package io.xpipe.beacon.exchange.cli; import io.xpipe.beacon.RequestMessage; import io.xpipe.beacon.ResponseMessage; import io.xpipe.beacon.exchange.MessageExchange; +import io.xpipe.core.impl.SinkDrainStore; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -21,6 +22,9 @@ public class ReadDrainExchange implements MessageExchange { public static class Request implements RequestMessage { @NonNull String name; + + @NonNull + SinkDrainStore store; } @Jacksonized diff --git a/core/src/main/java/io/xpipe/core/impl/DrainStore.java b/core/src/main/java/io/xpipe/core/impl/DrainStore.java deleted file mode 100644 index 40dfba578..000000000 --- a/core/src/main/java/io/xpipe/core/impl/DrainStore.java +++ /dev/null @@ -1,67 +0,0 @@ -package io.xpipe.core.impl; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonTypeName; -import io.xpipe.core.charsetter.NewLine; -import io.xpipe.core.charsetter.StreamCharset; -import io.xpipe.core.store.KnownFormatStreamDataStore; -import io.xpipe.core.util.JacksonizedValue; -import lombok.Getter; -import lombok.experimental.SuperBuilder; -import lombok.extern.jackson.Jacksonized; - -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.Channels; -import java.nio.channels.Pipe; - -@JsonTypeName("drain") -@SuperBuilder -@Jacksonized -@Getter -public class DrainStore extends JacksonizedValue implements KnownFormatStreamDataStore { - - private final String description; - private final StreamCharset charset; - private final NewLine newLine; - - @JsonIgnore - private boolean open; - @JsonIgnore - private Pipe pipe; - - private boolean used; - - private void waitForOpen() { - while (!open) { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - break; - } - } - } - - @Override - public boolean canOpen() throws Exception { - return false; - } - - @Override - public InputStream openInput() throws Exception { - if (used) { - throw new IllegalStateException("Drain has already been used"); - } - - waitForOpen(); - return Channels.newInputStream(pipe.source()); - } - - @Override - public OutputStream openOutput() throws Exception { - used = true; - pipe = Pipe.open(); - open = true; - return Channels.newOutputStream(pipe.sink()); - } -} diff --git a/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java b/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java new file mode 100644 index 000000000..8f8dc6687 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java @@ -0,0 +1,151 @@ +package io.xpipe.core.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.xpipe.core.charsetter.NewLine; +import io.xpipe.core.charsetter.StreamCharset; +import io.xpipe.core.store.DataFlow; +import io.xpipe.core.store.KnownFormatStreamDataStore; +import io.xpipe.core.util.JacksonizedValue; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.Pipe; + +/* +TODO: Properly enter closed State + */ + +@JsonTypeName("drain") +@SuperBuilder +@Jacksonized +@Getter +public class SinkDrainStore extends JacksonizedValue implements KnownFormatStreamDataStore { + + public static enum State { + NONE_CONNECTED, + PRODUCER_CONNECTED, + CONSUMER_CONNECTED, + OPEN, + CLOSED + } + + private final String description; + private final StreamCharset charset; + private final NewLine newLine; + + @JsonIgnore + @Setter + @Builder.Default + private State state = State.NONE_CONNECTED; + + @JsonIgnore + private Pipe pipe; + + @Override + public DataFlow getFlow() { + if (state == State.NONE_CONNECTED) { + return DataFlow.INPUT_OR_OUTPUT; + } + + if (state == State.PRODUCER_CONNECTED) { + return DataFlow.INPUT; + } + + if (state == State.CONSUMER_CONNECTED) { + return DataFlow.OUTPUT; + } + + return null; + } + + private void waitForOpen() { + while (state != State.OPEN) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + break; + } + } + } + + @Override + public boolean canOpen() throws Exception { + return state == State.PRODUCER_CONNECTED; + } + + @Override + public InputStream openInput() throws Exception { + checkState(false); + + if (state == State.PRODUCER_CONNECTED) { + state = State.OPEN; + } + + if (state == State.NONE_CONNECTED) { + state = State.CONSUMER_CONNECTED; + waitForOpen(); + } + + try { + openPipe(); + return Channels.newInputStream(pipe.source()); + } catch (Exception ex) { + state = State.CLOSED; + throw ex; + } + } + + @Override + public OutputStream openOutput() throws Exception { + checkState(true); + + if (state == State.CONSUMER_CONNECTED) { + state = State.OPEN; + } + + if (state == State.NONE_CONNECTED) { + state = State.PRODUCER_CONNECTED; + waitForOpen(); + } + + try { + openPipe(); + return Channels.newOutputStream(pipe.sink()); + } catch (Exception ex) { + state = State.CLOSED; + throw ex; + } + } + + private void openPipe() throws IOException { + if (pipe == null) { + pipe = Pipe.open(); + } + } + + private void checkState(boolean isProducer) { + if (state == State.CLOSED) { + throw new IllegalStateException("Drain has already been closed"); + } + + if (state == State.OPEN) { + throw new IllegalStateException("Drain is already open"); + } + + if (state == State.PRODUCER_CONNECTED && isProducer) { + throw new IllegalStateException("Producer is already connected"); + } + + if (state == State.CONSUMER_CONNECTED && !isProducer) { + throw new IllegalStateException("Consumer is already connected"); + } + } +} diff --git a/core/src/main/java/io/xpipe/core/source/DataSource.java b/core/src/main/java/io/xpipe/core/source/DataSource.java index ddf849f81..a12f63ae6 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSource.java +++ b/core/src/main/java/io/xpipe/core/source/DataSource.java @@ -63,10 +63,18 @@ public abstract class DataSource extends JacksonizedValue } public List getAvailableWriteModes() { - if (getFlow() != null && !getFlow().hasOutput()) { + if (getFlow() == null) { return List.of(); } + if (getFlow() != null && (getFlow() == DataFlow.TRANSFORMER || getFlow() == DataFlow.INPUT)) { + return List.of(); + } + + if (getFlow() != null && (getFlow() == DataFlow.OUTPUT || getFlow() == DataFlow.INPUT_OR_OUTPUT)) { + return List.of(WriteMode.REPLACE); + } + return List.of(WriteMode.REPLACE, WriteMode.APPEND, WriteMode.PREPEND); } diff --git a/core/src/main/java/io/xpipe/core/store/DataFlow.java b/core/src/main/java/io/xpipe/core/store/DataFlow.java index ff116688d..3e542df56 100644 --- a/core/src/main/java/io/xpipe/core/store/DataFlow.java +++ b/core/src/main/java/io/xpipe/core/store/DataFlow.java @@ -7,6 +7,8 @@ public enum DataFlow { INPUT("Input"), @JsonProperty("output") OUTPUT("Output"), + @JsonProperty("inputOrOutput") + INPUT_OR_OUTPUT("Input or Output"), @JsonProperty("inputOutput") INPUT_OUTPUT("Input/Output"), @JsonProperty("transformer") diff --git a/extension/src/main/java/io/xpipe/extension/comp/TextFieldComp.java b/extension/src/main/java/io/xpipe/extension/comp/TextFieldComp.java index 2b1c1c806..3c7cbc361 100644 --- a/extension/src/main/java/io/xpipe/extension/comp/TextFieldComp.java +++ b/extension/src/main/java/io/xpipe/extension/comp/TextFieldComp.java @@ -59,7 +59,7 @@ public class TextFieldComp extends Comp> { }); text.focusedProperty().addListener((observable, oldValue, newValue) -> { - if (!newValue) { + if (!newValue && lazy) { lastAppliedValue.setValue(currentValue.getValue()); } });