diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java new file mode 100644 index 000000000..0c7b8a47a --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ReadDrainExchange.java @@ -0,0 +1,30 @@ +package io.xpipe.beacon.exchange.cli; + +import io.xpipe.beacon.RequestMessage; +import io.xpipe.beacon.ResponseMessage; +import io.xpipe.beacon.exchange.MessageExchange; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class ReadDrainExchange implements MessageExchange { + + @Override + public String getId() { + return "readDrain"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + String name; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage {} +} diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java index 5ec300281..0f4997dd2 100644 --- a/beacon/src/main/java/module-info.java +++ b/beacon/src/main/java/module-info.java @@ -41,6 +41,7 @@ module io.xpipe.beacon { RenameStoreExchange, RemoveStoreExchange, StoreAddExchange, + ReadDrainExchange, WritePreparationExchange, WriteExecuteExchange, SelectExchange, diff --git a/core/src/main/java/io/xpipe/core/dialog/Choice.java b/core/src/main/java/io/xpipe/core/dialog/Choice.java index 53ca11b68..55d59c5a7 100644 --- a/core/src/main/java/io/xpipe/core/dialog/Choice.java +++ b/core/src/main/java/io/xpipe/core/dialog/Choice.java @@ -5,7 +5,7 @@ import lombok.AllArgsConstructor; import lombok.Value; @Value -@AllArgsConstructor(onConstructor_ = @JsonCreator) +@AllArgsConstructor(onConstructor=@__({@JsonCreator})) public class Choice { /** diff --git a/core/src/main/java/io/xpipe/core/impl/BufferedTableReadConnection.java b/core/src/main/java/io/xpipe/core/impl/BufferedTableReadConnection.java new file mode 100644 index 000000000..26ebd1942 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/impl/BufferedTableReadConnection.java @@ -0,0 +1,71 @@ +package io.xpipe.core.impl; + +import io.xpipe.core.data.node.ArrayNode; +import io.xpipe.core.data.node.DataStructureNodeAcceptor; +import io.xpipe.core.data.node.TupleNode; +import io.xpipe.core.data.type.TupleType; +import io.xpipe.core.source.TableReadConnection; + +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicInteger; + +public class BufferedTableReadConnection implements TableReadConnection { + + private final TableReadConnection connection; + private final int maxCount; + private int count = 0; + private ArrayNode read; + + public BufferedTableReadConnection(TableReadConnection connection, int maxCount) throws Exception { + this.connection = connection; + this.maxCount = maxCount; + read = connection.readRows(maxCount); + } + + private TupleNode get() throws Exception { + if (count == read.size()) { + read = connection.readRows(maxCount); + } + + if (read.size() == 0) { + return null; + } + + return read.at(count++).asTuple(); + } + + @Override + public void close() throws Exception { + connection.close(); + } + + @Override + public TupleType getDataType() { + return connection.getDataType(); + } + + @Override + public OptionalInt getRowCount() throws Exception { + return connection.getRowCount(); + } + + @Override + public int withRows(DataStructureNodeAcceptor lineAcceptor) throws Exception { + AtomicInteger localCounter = new AtomicInteger(); + TupleNode node; + while (((node = get()) != null)) { + var returned = lineAcceptor.accept(node); + if (!returned) { + break; + } + + localCounter.getAndIncrement(); + } + return localCounter.get(); + } + + @Override + public boolean canRead() throws Exception { + return connection.canRead(); + } +} diff --git a/core/src/main/java/io/xpipe/core/impl/DrainStore.java b/core/src/main/java/io/xpipe/core/impl/DrainStore.java new file mode 100644 index 000000000..40dfba578 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/impl/DrainStore.java @@ -0,0 +1,67 @@ +package io.xpipe.core.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.xpipe.core.charsetter.NewLine; +import io.xpipe.core.charsetter.StreamCharset; +import io.xpipe.core.store.KnownFormatStreamDataStore; +import io.xpipe.core.util.JacksonizedValue; +import lombok.Getter; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.Pipe; + +@JsonTypeName("drain") +@SuperBuilder +@Jacksonized +@Getter +public class DrainStore extends JacksonizedValue implements KnownFormatStreamDataStore { + + private final String description; + private final StreamCharset charset; + private final NewLine newLine; + + @JsonIgnore + private boolean open; + @JsonIgnore + private Pipe pipe; + + private boolean used; + + private void waitForOpen() { + while (!open) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + break; + } + } + } + + @Override + public boolean canOpen() throws Exception { + return false; + } + + @Override + public InputStream openInput() throws Exception { + if (used) { + throw new IllegalStateException("Drain has already been used"); + } + + waitForOpen(); + return Channels.newInputStream(pipe.source()); + } + + @Override + public OutputStream openOutput() throws Exception { + used = true; + pipe = Pipe.open(); + open = true; + return Channels.newOutputStream(pipe.sink()); + } +} diff --git a/core/src/main/java/io/xpipe/core/source/TableDataSource.java b/core/src/main/java/io/xpipe/core/source/TableDataSource.java index ef1c61b8b..3829474a5 100644 --- a/core/src/main/java/io/xpipe/core/source/TableDataSource.java +++ b/core/src/main/java/io/xpipe/core/source/TableDataSource.java @@ -11,14 +11,15 @@ import java.util.Optional; public abstract class TableDataSource extends DataSource { public Optional determineDataType() throws Exception { - try (var readConnection = newReadConnection()) { - var canRead = readConnection != null && readConnection.canRead(); - if (canRead) { + var readConnection = newReadConnection(); + var canRead = readConnection != null && readConnection.canRead(); + if (canRead) { + try (var in = readConnection) { readConnection.init(); return Optional.ofNullable(readConnection.getDataType()); - } else { - return Optional.empty(); } + } else { + return Optional.empty(); } } diff --git a/core/src/main/java/io/xpipe/core/source/TableReadConnection.java b/core/src/main/java/io/xpipe/core/source/TableReadConnection.java index 84581ce91..6fa0ba271 100644 --- a/core/src/main/java/io/xpipe/core/source/TableReadConnection.java +++ b/core/src/main/java/io/xpipe/core/source/TableReadConnection.java @@ -6,6 +6,7 @@ import io.xpipe.core.data.node.DataStructureNodeAcceptor; import io.xpipe.core.data.node.TupleNode; import io.xpipe.core.data.type.TupleType; import io.xpipe.core.data.typed.TypedDataStreamWriter; +import io.xpipe.core.impl.BufferedTableReadConnection; import io.xpipe.core.impl.LimitTableReadConnection; import java.io.OutputStream; @@ -63,6 +64,14 @@ public interface TableReadConnection extends DataSourceReadConnection { return new LimitTableReadConnection(this, limit); } + default TableReadConnection buffered() throws Exception { + return buffered(Integer.MAX_VALUE); + } + + default TableReadConnection buffered(int limit) throws Exception { + return new BufferedTableReadConnection(this, limit); + } + /** * Consumes the table rows until the acceptor returns false. * diff --git a/core/src/main/java/io/xpipe/core/store/KnownFormatStreamDataStore.java b/core/src/main/java/io/xpipe/core/store/KnownFormatStreamDataStore.java new file mode 100644 index 000000000..775b399e1 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/KnownFormatStreamDataStore.java @@ -0,0 +1,11 @@ +package io.xpipe.core.store; + +import io.xpipe.core.charsetter.NewLine; +import io.xpipe.core.charsetter.StreamCharset; + +public interface KnownFormatStreamDataStore extends StreamDataStore { + + StreamCharset getCharset(); + + NewLine getNewLine(); +} diff --git a/extension/src/main/java/io/xpipe/extension/DataStoreActionProvider.java b/extension/src/main/java/io/xpipe/extension/DataStoreActionProvider.java index 2cf866a17..3eb0db74b 100644 --- a/extension/src/main/java/io/xpipe/extension/DataStoreActionProvider.java +++ b/extension/src/main/java/io/xpipe/extension/DataStoreActionProvider.java @@ -2,6 +2,7 @@ package io.xpipe.extension; import io.xpipe.core.store.DataStore; import io.xpipe.extension.event.ErrorEvent; +import javafx.beans.value.ObservableValue; import javafx.scene.layout.Region; import java.util.ArrayList; @@ -40,7 +41,7 @@ public interface DataStoreActionProvider { default void applyToRegion(T store, Region region) {} - String getName(T store); + ObservableValue getName(T store); String getIcon(T store); diff --git a/extension/src/main/java/io/xpipe/extension/comp/DataStoreFlowChoiceComp.java b/extension/src/main/java/io/xpipe/extension/comp/DataStoreFlowChoiceComp.java index eb7dc171f..f754d81e5 100644 --- a/extension/src/main/java/io/xpipe/extension/comp/DataStoreFlowChoiceComp.java +++ b/extension/src/main/java/io/xpipe/extension/comp/DataStoreFlowChoiceComp.java @@ -4,6 +4,7 @@ import io.xpipe.core.store.DataFlow; import io.xpipe.extension.I18n; import io.xpipe.fxcomps.SimpleComp; import javafx.beans.property.Property; +import javafx.beans.property.SimpleObjectProperty; import javafx.beans.value.ObservableValue; import javafx.scene.layout.Region; import lombok.EqualsAndHashCode; @@ -24,7 +25,7 @@ public class DataStoreFlowChoiceComp extends SimpleComp { map.put(DataFlow.INPUT, I18n.observable("extension.input")); map.put(DataFlow.OUTPUT, I18n.observable("extension.output")); map.put(DataFlow.INPUT_OUTPUT, I18n.observable("extension.inout")); - return new ToggleGroupComp<>(selected, map) + return new ToggleGroupComp<>(selected, new SimpleObjectProperty<>(map)) .apply(struc -> { new FancyTooltipAugment<>("extension.inputDescription") .augment(struc.get().getChildren().get(0)); diff --git a/extension/src/main/java/io/xpipe/extension/comp/ToggleGroupComp.java b/extension/src/main/java/io/xpipe/extension/comp/ToggleGroupComp.java index a2249186d..5ad336d15 100644 --- a/extension/src/main/java/io/xpipe/extension/comp/ToggleGroupComp.java +++ b/extension/src/main/java/io/xpipe/extension/comp/ToggleGroupComp.java @@ -4,6 +4,7 @@ import io.xpipe.fxcomps.Comp; import io.xpipe.fxcomps.CompStructure; import io.xpipe.fxcomps.SimpleCompStructure; import io.xpipe.fxcomps.util.PlatformThread; +import io.xpipe.fxcomps.util.SimpleChangeListener; import javafx.beans.property.Property; import javafx.beans.value.ObservableValue; import javafx.scene.control.ToggleButton; @@ -15,9 +16,9 @@ import java.util.Map; public class ToggleGroupComp extends Comp> { private final Property value; - private final Map> range; + private final ObservableValue>> range; - public ToggleGroupComp(Property value, Map> range) { + public ToggleGroupComp(Property value, ObservableValue>> range) { this.value = value; this.range = range; } @@ -27,29 +28,36 @@ public class ToggleGroupComp extends Comp> { var box = new HBox(); box.getStyleClass().add("toggle-group-comp"); ToggleGroup group = new ToggleGroup(); - for (var entry : range.entrySet()) { - var b = new ToggleButton(entry.getValue().getValue()); - b.setOnAction(e -> { - value.setValue(entry.getKey()); - e.consume(); - }); - box.getChildren().add(b); - b.setToggleGroup(group); - value.addListener((c, o, n) -> { - PlatformThread.runLaterIfNeeded(() -> b.setSelected(entry.equals(n))); - }); - if (entry.getKey().equals(value.getValue())) { - b.setSelected(true); + SimpleChangeListener.apply(PlatformThread.sync(range), val -> { + if (!val.containsKey(value.getValue())) { + this.value.setValue(null); } - } - if (box.getChildren().size() > 0) { - box.getChildren().get(0).getStyleClass().add("first"); - for (int i = 1; i < box.getChildren().size() - 1; i++) { - box.getChildren().get(i).getStyleClass().add("center"); + box.getChildren().clear(); + for (var entry : val.entrySet()) { + var b = new ToggleButton(entry.getValue().getValue()); + b.setOnAction(e -> { + value.setValue(entry.getKey()); + e.consume(); + }); + box.getChildren().add(b); + b.setToggleGroup(group); + value.addListener((c, o, n) -> { + PlatformThread.runLaterIfNeeded(() -> b.setSelected(entry.equals(n))); + }); + if (entry.getKey().equals(value.getValue())) { + b.setSelected(true); + } } - box.getChildren().get(box.getChildren().size() - 1).getStyleClass().add("last"); - } + + if (box.getChildren().size() > 0) { + box.getChildren().get(0).getStyleClass().add("first"); + for (int i = 1; i < box.getChildren().size() - 1; i++) { + box.getChildren().get(i).getStyleClass().add("center"); + } + box.getChildren().get(box.getChildren().size() - 1).getStyleClass().add("last"); + } + }); group.selectedToggleProperty().addListener((obsVal, oldVal, newVal) -> { if (newVal == null) oldVal.setSelected(true); diff --git a/extension/src/main/java/io/xpipe/extension/comp/WriteModeChoiceComp.java b/extension/src/main/java/io/xpipe/extension/comp/WriteModeChoiceComp.java index 1143b4d0e..4472fc5e7 100644 --- a/extension/src/main/java/io/xpipe/extension/comp/WriteModeChoiceComp.java +++ b/extension/src/main/java/io/xpipe/extension/comp/WriteModeChoiceComp.java @@ -7,26 +7,30 @@ import io.xpipe.extension.util.Validatable; import io.xpipe.extension.util.Validator; import io.xpipe.extension.util.Validators; import io.xpipe.fxcomps.SimpleComp; +import io.xpipe.fxcomps.util.PlatformThread; import javafx.beans.property.Property; +import javafx.beans.property.SimpleObjectProperty; import javafx.beans.value.ObservableValue; +import javafx.collections.ListChangeListener; +import javafx.collections.ObservableList; import javafx.scene.layout.Region; import lombok.EqualsAndHashCode; import lombok.Value; import net.synedra.validatorfx.Check; import java.util.LinkedHashMap; -import java.util.List; +import java.util.Map; @Value @EqualsAndHashCode(callSuper = true) public class WriteModeChoiceComp extends SimpleComp implements Validatable { Property selected; - List available; + ObservableList available; Validator validator = new SimpleValidator(); Check check; - public WriteModeChoiceComp(Property selected, List available) { + public WriteModeChoiceComp(Property selected, ObservableList available) { this.selected = selected; this.available = available; if (available.size() == 1) { @@ -38,11 +42,19 @@ public class WriteModeChoiceComp extends SimpleComp implements Validatable { @Override protected Region createSimple() { var a = available; - var map = new LinkedHashMap>(); + Property>> map = new SimpleObjectProperty<>(new LinkedHashMap>()); for (WriteMode writeMode : a) { - map.put(writeMode,I18n.observable(writeMode.getId())); + map.getValue().put(writeMode,I18n.observable(writeMode.getId())); } + PlatformThread.sync(available).addListener((ListChangeListener) c -> { + var newMap = new LinkedHashMap>(); + for (WriteMode writeMode : a) { + newMap.put(writeMode,I18n.observable(writeMode.getId())); + } + map.setValue(newMap); + }); + return new ToggleGroupComp<>(selected, map) .apply(struc -> { for (int i = 0; i < a.size(); i++) { diff --git a/extension/src/main/java/io/xpipe/extension/util/DynamicOptionsBuilder.java b/extension/src/main/java/io/xpipe/extension/util/DynamicOptionsBuilder.java index 01c0067ac..006f144e9 100644 --- a/extension/src/main/java/io/xpipe/extension/util/DynamicOptionsBuilder.java +++ b/extension/src/main/java/io/xpipe/extension/util/DynamicOptionsBuilder.java @@ -8,6 +8,7 @@ import io.xpipe.extension.comp.*; import io.xpipe.fxcomps.Comp; import io.xpipe.fxcomps.CompStructure; import javafx.beans.property.Property; +import javafx.beans.property.SimpleObjectProperty; import javafx.beans.value.ObservableValue; import javafx.scene.control.Label; import javafx.scene.layout.Region; @@ -95,7 +96,7 @@ public class DynamicOptionsBuilder { public DynamicOptionsBuilder addToggle( Property prop, ObservableValue name, Map> names) { - var comp = new ToggleGroupComp<>(prop, names); + var comp = new ToggleGroupComp<>(prop, new SimpleObjectProperty<>(names)); entries.add(new DynamicOptionsComp.Entry(name, comp)); props.add(prop); return this; diff --git a/extension/src/main/java/io/xpipe/extension/util/ExpectHelper.java b/extension/src/main/java/io/xpipe/extension/util/ExpectHelper.java index dcc691fc0..ec94d7af4 100644 --- a/extension/src/main/java/io/xpipe/extension/util/ExpectHelper.java +++ b/extension/src/main/java/io/xpipe/extension/util/ExpectHelper.java @@ -32,6 +32,7 @@ public class ExpectHelper { return String.format(""" echo(false) if spawn(%s) then + expect(":") sendln("%s") echo(true) end