mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-05-19 05:45:58 -04:00
Implement more data types and various fixes
This commit is contained in:
@@ -17,12 +17,18 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
|
||||
public static final Integer INTEGER_VALUE = 6;
|
||||
public static final Integer IS_NULL = 7;
|
||||
public static final Integer IS_INTEGER = 9;
|
||||
public static final Integer IS_FLOATING_POINT = 10;
|
||||
public static final Integer FLOATING_POINT_VALUE = 11;
|
||||
public static final Integer IS_DECIMAL = 10;
|
||||
public static final Integer DECIMAL_VALUE = 11;
|
||||
public static final Integer IS_TEXT = 12;
|
||||
public static final Integer IS_INSTANT = 13;
|
||||
public static final Integer IS_BINARY = 14;
|
||||
|
||||
public static final Integer IS_DATE = 15;
|
||||
public static final Integer DATE_VALUE = 16;
|
||||
|
||||
public static final Integer IS_CURRENCY = 17;
|
||||
public static final Integer CURRENCY_CODE = 18;
|
||||
|
||||
private Map<Integer, String> metaAttributes;
|
||||
|
||||
public void clearMetaAttributes() {
|
||||
@@ -58,12 +64,12 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataStructureNode tag(Integer key, String value) {
|
||||
public DataStructureNode tag(Integer key, Object value) {
|
||||
if (metaAttributes == null) {
|
||||
metaAttributes = new HashMap<>();
|
||||
}
|
||||
|
||||
metaAttributes.put(key, value);
|
||||
metaAttributes.put(key, value.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -124,6 +130,7 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
|
||||
return "("
|
||||
+ (metaAttributes != null
|
||||
? metaAttributes.entrySet().stream()
|
||||
.sorted(Comparator.comparingInt(entry -> entry.getKey()))
|
||||
.map(e -> e.getValue() != null
|
||||
? e.getKey() + ":" + e.getValue()
|
||||
: e.getKey().toString())
|
||||
|
||||
@@ -6,7 +6,9 @@ import io.xpipe.core.data.type.ValueType;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.Currency;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class ValueNode extends DataStructureNode {
|
||||
@@ -25,6 +27,42 @@ public abstract class ValueNode extends DataStructureNode {
|
||||
return new SimpleValueNode(data);
|
||||
}
|
||||
|
||||
public static ValueNode ofDate(String raw, Instant instant) {
|
||||
var created = of(raw);
|
||||
created.tag(IS_DATE);
|
||||
created.tag(DATE_VALUE, instant.toString());
|
||||
return created;
|
||||
}
|
||||
|
||||
public static ValueNode ofDecimal(String raw, double decimal) {
|
||||
return ofDecimal(raw, String.valueOf(decimal));
|
||||
}
|
||||
|
||||
public static ValueNode ofDecimal(String raw, String decimal) {
|
||||
var created = of(raw);
|
||||
created.tag(IS_DECIMAL);
|
||||
created.tag(DECIMAL_VALUE, decimal);
|
||||
return created;
|
||||
}
|
||||
|
||||
public static ValueNode ofInteger(String raw, long integer) {
|
||||
return ofInteger(raw, String.valueOf(integer));
|
||||
}
|
||||
|
||||
public static ValueNode ofInteger(String raw, String integer) {
|
||||
var created = of(raw);
|
||||
created.tag(IS_INTEGER);
|
||||
created.tag(INTEGER_VALUE, integer);
|
||||
return created;
|
||||
}
|
||||
|
||||
public static ValueNode ofCurrency(String raw, String decimal, Currency currency) {
|
||||
var created = ofDecimal(raw, decimal);
|
||||
created.tag(IS_CURRENCY);
|
||||
created.tag(CURRENCY_CODE, currency.getCurrencyCode());
|
||||
return created;
|
||||
}
|
||||
|
||||
public static ValueNode ofBytes(byte[] data) {
|
||||
var created = of(data);
|
||||
created.tag(IS_BINARY);
|
||||
@@ -49,9 +87,15 @@ public abstract class ValueNode extends DataStructureNode {
|
||||
return created;
|
||||
}
|
||||
|
||||
public static ValueNode ofDecimal(double decimal) {
|
||||
var created = of(decimal);
|
||||
created.tag(IS_DECIMAL);
|
||||
return created;
|
||||
}
|
||||
|
||||
public static ValueNode ofDecimal(BigDecimal decimal) {
|
||||
var created = of(decimal);
|
||||
created.tag(IS_FLOATING_POINT);
|
||||
created.tag(IS_DECIMAL);
|
||||
return created;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.xpipe.core.dialog;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
|
||||
import java.util.List;
|
||||
@@ -10,16 +11,18 @@ import java.util.List;
|
||||
@JsonTypeName("choice")
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@ToString
|
||||
@Getter
|
||||
public class ChoiceElement extends DialogElement {
|
||||
|
||||
private final String description;
|
||||
private final List<Choice> elements;
|
||||
private final boolean required;
|
||||
private final boolean quiet;
|
||||
|
||||
private int selected;
|
||||
|
||||
@JsonCreator
|
||||
public ChoiceElement(String description, List<Choice> elements, boolean required, int selected) {
|
||||
public ChoiceElement(String description, List<Choice> elements, boolean required, boolean quiet, int selected) {
|
||||
if (elements.stream().allMatch(Choice::isDisabled)) {
|
||||
throw new IllegalArgumentException("All choices are disabled");
|
||||
}
|
||||
@@ -28,6 +31,7 @@ public class ChoiceElement extends DialogElement {
|
||||
this.elements = elements;
|
||||
this.required = required;
|
||||
this.selected = selected;
|
||||
this.quiet = quiet;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.xpipe.core.dialog;
|
||||
|
||||
import io.xpipe.core.charsetter.Charsetter;
|
||||
import io.xpipe.core.util.SecretValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
@@ -22,11 +22,11 @@ import java.util.function.Supplier;
|
||||
* The evaluation function can be set with {@link #evaluateTo(Supplier)}.
|
||||
* Alternatively, a dialogue can also copy the evaluation function of another dialogue with {@link #evaluateTo(Dialog)}.
|
||||
* An evaluation result can also be mapped to another type with {@link #map(Function)}.
|
||||
* It is also possible to listen for the completion of this dialogue with {@link #onCompletion(Consumer)}.
|
||||
* It is also possible to listen for the completion of this dialogue with {@link #onCompletion(Charsetter.FailableConsumer)} )}.
|
||||
*/
|
||||
public abstract class Dialog {
|
||||
|
||||
private final List<Consumer<?>> completion = new ArrayList<>();
|
||||
private final List<Charsetter.FailableConsumer<?, Exception>> completion = new ArrayList<>();
|
||||
protected Object eval;
|
||||
private Supplier<?> evaluation;
|
||||
|
||||
@@ -65,8 +65,8 @@ public abstract class Dialog {
|
||||
* @param selected the selected element index
|
||||
*/
|
||||
public static Dialog.Choice choice(
|
||||
String description, List<io.xpipe.core.dialog.Choice> elements, boolean required, int selected) {
|
||||
Dialog.Choice c = new Dialog.Choice(description, elements, required, selected);
|
||||
String description, List<io.xpipe.core.dialog.Choice> elements, boolean required, boolean quiet, int selected) {
|
||||
Dialog.Choice c = new Dialog.Choice(description, elements, required, quiet, selected);
|
||||
c.evaluateTo(c::getSelected);
|
||||
return c;
|
||||
}
|
||||
@@ -77,12 +77,13 @@ public abstract class Dialog {
|
||||
* @param description the shown question description
|
||||
* @param toString a function that maps the objects to a string
|
||||
* @param required signals whether choices required or can be left empty
|
||||
* @param quiet
|
||||
* @param def the element which is selected by default
|
||||
* @param vals the range of possible elements
|
||||
*/
|
||||
@SafeVarargs
|
||||
public static <T> Dialog.Choice choice(
|
||||
String description, Function<T, String> toString, boolean required, T def, T... vals) {
|
||||
String description, Function<T, String> toString, boolean required, boolean quiet, T def, T... vals) {
|
||||
var elements = Arrays.stream(vals)
|
||||
.map(v -> new io.xpipe.core.dialog.Choice(null, toString.apply(v)))
|
||||
.toList();
|
||||
@@ -91,7 +92,7 @@ public abstract class Dialog {
|
||||
throw new IllegalArgumentException("Default value " + def.toString() + " is not in possible values");
|
||||
}
|
||||
|
||||
var c = choice(description, elements, required, index);
|
||||
var c = choice(description, elements, required, quiet, index);
|
||||
c.evaluateTo(() -> {
|
||||
if (c.getSelected() == -1) {
|
||||
return null;
|
||||
@@ -249,6 +250,9 @@ public abstract class Dialog {
|
||||
dialog = d.get();
|
||||
var start = dialog.start();
|
||||
evaluateTo(dialog);
|
||||
if (start == null) {
|
||||
complete();
|
||||
}
|
||||
return start;
|
||||
}
|
||||
|
||||
@@ -354,7 +358,7 @@ public abstract class Dialog {
|
||||
boolean required,
|
||||
int selected,
|
||||
Function<Integer, Dialog> c) {
|
||||
var choice = new ChoiceElement(description, elements, required, selected);
|
||||
var choice = new ChoiceElement(description, elements, required, false, selected);
|
||||
return new Dialog() {
|
||||
|
||||
private Dialog choiceMade;
|
||||
@@ -409,7 +413,7 @@ public abstract class Dialog {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Dialog onCompletion(Consumer<?> s) {
|
||||
public Dialog onCompletion(Charsetter.FailableConsumer<?, Exception> s) {
|
||||
completion.add(s);
|
||||
return this;
|
||||
}
|
||||
@@ -419,7 +423,7 @@ public abstract class Dialog {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Dialog onCompletion(List<Consumer<?>> s) {
|
||||
public Dialog onCompletion(List<Charsetter.FailableConsumer<?, Exception>> s) {
|
||||
completion.addAll(s);
|
||||
return this;
|
||||
}
|
||||
@@ -430,13 +434,13 @@ public abstract class Dialog {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> void complete() {
|
||||
public <T> void complete() throws Exception {
|
||||
if (evaluation != null) {
|
||||
eval = evaluation.get();
|
||||
completion.forEach(c -> {
|
||||
Consumer<T> ct = (Consumer<T>) c;
|
||||
for (Charsetter.FailableConsumer<?, Exception> c : completion) {
|
||||
Charsetter.FailableConsumer<T, Exception> ct = (Charsetter.FailableConsumer<T, Exception>) c;
|
||||
ct.accept((T) eval);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -459,8 +463,8 @@ public abstract class Dialog {
|
||||
|
||||
private final ChoiceElement element;
|
||||
|
||||
private Choice(String description, List<io.xpipe.core.dialog.Choice> elements, boolean required, int selected) {
|
||||
this.element = new ChoiceElement(description, elements, required, selected);
|
||||
private Choice(String description, List<io.xpipe.core.dialog.Choice> elements, boolean required, boolean quiet, int selected) {
|
||||
this.element = new ChoiceElement(description, elements, required, quiet, selected);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.source.TableReadConnection;
|
||||
|
||||
import java.util.OptionalInt;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class BufferedTableReadConnection implements TableReadConnection {
|
||||
|
||||
@@ -50,18 +49,14 @@ public class BufferedTableReadConnection implements TableReadConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
AtomicInteger localCounter = new AtomicInteger();
|
||||
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
TupleNode node;
|
||||
while (((node = get()) != null)) {
|
||||
var returned = lineAcceptor.accept(node);
|
||||
if (!returned) {
|
||||
break;
|
||||
}
|
||||
|
||||
localCounter.getAndIncrement();
|
||||
}
|
||||
return localCounter.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -6,7 +6,6 @@ import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.source.TableReadConnection;
|
||||
|
||||
import java.util.OptionalInt;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class LimitTableReadConnection implements TableReadConnection {
|
||||
|
||||
@@ -40,20 +39,15 @@ public class LimitTableReadConnection implements TableReadConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
AtomicInteger localCounter = new AtomicInteger();
|
||||
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
connection.withRows(node -> {
|
||||
if (count == maxCount) {
|
||||
return false;
|
||||
}
|
||||
count++;
|
||||
|
||||
var returned = lineAcceptor.accept(node);
|
||||
localCounter.getAndIncrement();
|
||||
|
||||
return returned;
|
||||
return lineAcceptor.accept(node);
|
||||
});
|
||||
return localCounter.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -60,15 +60,14 @@ public class XpbtReadConnection extends StreamReadConnection implements TableRea
|
||||
}
|
||||
|
||||
@Override
|
||||
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
if (empty) {
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
var reader = TypedDataStructureNodeReader.of(dataType);
|
||||
AtomicBoolean quit = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
var counter = 0;
|
||||
while (!quit.get()) {
|
||||
var node = parser.parseStructure(inputStream, reader);
|
||||
if (node == null) {
|
||||
@@ -80,7 +79,6 @@ public class XpbtReadConnection extends StreamReadConnection implements TableRea
|
||||
if (!lineAcceptor.accept(node.asTuple())) {
|
||||
quit.set(true);
|
||||
}
|
||||
counter++;
|
||||
} catch (Exception ex) {
|
||||
quit.set(true);
|
||||
exception.set(ex);
|
||||
@@ -90,7 +88,6 @@ public class XpbtReadConnection extends StreamReadConnection implements TableRea
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
}
|
||||
return counter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -58,6 +58,10 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
|
||||
store.checkComplete();
|
||||
}
|
||||
|
||||
public void validate() throws Exception {
|
||||
store.validate();
|
||||
}
|
||||
|
||||
public List<WriteMode> getAvailableWriteModes() {
|
||||
if (getFlow() != null && !getFlow().hasOutput()) {
|
||||
return List.of();
|
||||
|
||||
@@ -25,7 +25,13 @@ public abstract class TableDataSource<DS extends DataStore> extends DataSource<D
|
||||
|
||||
@Override
|
||||
public final DataSourceInfo determineInfo() throws Exception {
|
||||
if (!getFlow().hasInput()) {
|
||||
if (!getFlow().hasInput() || !getStore().canOpen()) {
|
||||
return new DataSourceInfo.Table(null, -1);
|
||||
}
|
||||
|
||||
try {
|
||||
checkComplete();
|
||||
} catch (Exception e) {
|
||||
return new DataSourceInfo.Table(null, -1);
|
||||
}
|
||||
|
||||
|
||||
@@ -37,8 +37,7 @@ public interface TableReadConnection extends DataSourceReadConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
return 0;
|
||||
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -77,7 +76,7 @@ public interface TableReadConnection extends DataSourceReadConnection {
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception;
|
||||
void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception;
|
||||
|
||||
/**
|
||||
* Reads multiple rows in bulk.
|
||||
@@ -119,6 +118,17 @@ public interface TableReadConnection extends DataSourceReadConnection {
|
||||
var inputType = getDataType();
|
||||
var tCon = (TableWriteConnection) con;
|
||||
var mapping = tCon.createMapping(inputType);
|
||||
return withRows(tCon.writeLinesAcceptor(mapping.orElseThrow()));
|
||||
var acceptor = tCon.writeLinesAcceptor(mapping.orElseThrow());
|
||||
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
withRows(acc -> {
|
||||
if (!acceptor.accept(acc)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
counter.getAndIncrement();
|
||||
return true;
|
||||
});
|
||||
return counter.get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.xpipe.core.charsetter.NewLine;
|
||||
import io.xpipe.core.util.JacksonizedValue;
|
||||
import io.xpipe.core.util.SecretValue;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.Charset;
|
||||
@@ -69,6 +70,8 @@ public class LocalStore extends JacksonizedValue implements MachineFileStore, St
|
||||
|
||||
private final List<SecretValue> input;
|
||||
private final Integer timeout;
|
||||
|
||||
@Getter
|
||||
private final List<String> command;
|
||||
private final Charset charset;
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@@ -125,4 +126,6 @@ public abstract class ProcessControl {
|
||||
public abstract InputStream getStderr();
|
||||
|
||||
public abstract Charset getCharset();
|
||||
|
||||
public abstract List<String> getCommand();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user