Refactor exchanges, add dialog APIs, add store providers

This commit is contained in:
Christopher Schnick
2022-06-12 02:40:28 +02:00
parent f4f9c1d978
commit 696568d5bc
50 changed files with 791 additions and 206 deletions

View File

@@ -0,0 +1,21 @@
package io.xpipe.core.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import lombok.Getter;
@JsonTypeName("query")
@Getter
public class BaseQueryElement extends DialogElement {
private final String description;
private final boolean required;
protected String value;
@JsonCreator
public BaseQueryElement(String description, boolean required, String value) {
this.description = description;
this.required = required;
this.value = value;
}
}

View File

@@ -0,0 +1,67 @@
package io.xpipe.core.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import java.util.List;
@JsonTypeName("choice")
public class ChoiceElement extends DialogElement {
private final List<Element> elements;
private int selected;
@Override
public boolean apply(String value) {
if (value == null) {
return true;
}
if (value.length() != 1) {
return true;
}
var c = value.charAt(0);
if (Character.isDigit(c)) {
selected = Integer.parseInt(value) - 1;
return true;
}
for (int i = 0; i < elements.size(); i++) {
if (elements.get(i).getCharacter() != null && elements.get(i).getCharacter().equals(c)) {
selected = i;
return true;
}
}
return false;
}
@Value
@Builder
@Jacksonized
@AllArgsConstructor
public static class Element {
Character character;
String description;
}
@JsonCreator
public ChoiceElement(List<Element> elements, int selected) {
this.elements = elements;
this.selected = selected;
}
public List<Element> getElements() {
return elements;
}
public int getSelected() {
return selected;
}
}

View File

@@ -17,10 +17,10 @@ public class ConfigParameter {
}
@JsonIgnore
ConfigConverter<?> converter;
QueryConverter<?> converter;
@SuppressWarnings("unchecked")
public <T> ConfigConverter<T> getConverter() {
return (ConfigConverter<T>) converter;
public <T> QueryConverter<T> getConverter() {
return (QueryConverter<T>) converter;
}
}

View File

@@ -0,0 +1,176 @@
package io.xpipe.core.config;
import java.util.function.Function;
import java.util.function.Supplier;
public abstract class Dialog {
private static class Sequence extends Dialog {
private int index = 0;
private final DialogElement[] es;
public Sequence(DialogElement... es) {
this.es = es;
}
@Override
public DialogElement start() {
index = 0;
return es[0];
}
@Override
public DialogElement receive(String answer) {
if (es[index].apply(answer)) {
if (index == es.length - 1) {
complete();
return null;
} else {
return es[++index];
}
}
return es[index];
}
}
public static Dialog chain(DialogElement... es) {
return new Dialog.Sequence(es);
}
public static Dialog chain(Dialog... ds) {
return new Dialog() {
private int current = 0;
@Override
public DialogElement start() {
current = 0;
return ds[0].start();
}
@Override
public DialogElement receive(String answer) {
DialogElement currentElement = ds[current].receive(answer);
if (currentElement == null) {
ds[current].complete();
if (current == ds.length - 1) {
complete();
return null;
} else {
return ds[++current].start();
}
}
return currentElement;
}
};
}
public static Dialog of(DialogElement e) {
return new Dialog() {
@Override
public DialogElement start() {
return e;
}
@Override
public DialogElement receive(String answer) {
if (e.apply(answer)) {
complete();
return null;
}
return e;
}
};
}
public static Dialog retryIf(Dialog d, Supplier<String> msg) {
return new Dialog() {
private boolean retry;
@Override
public DialogElement start() {
return d.start();
}
@Override
public DialogElement receive(String answer) {
if (retry) {
retry = false;
return d.start();
}
var next = d.receive(answer);
if (next == null) {
var s = msg.get();
if (s != null) {
retry = true;
return new HeaderElement(s);
}
}
return next;
}
}.evaluateTo(d.onCompletion);
}
public static Dialog choice(ChoiceElement choice, Function<Integer, Dialog> c) {
return new Dialog() {
private Dialog choiceMade;
@Override
public DialogElement start() {
choiceMade = null;
return choice;
}
@Override
public DialogElement receive(String answer) {
if (choiceMade != null) {
var r = choiceMade.receive(answer);
if (r == null) {
complete();
}
return r;
}
if (choice.apply(answer)) {
choiceMade = c.apply(choice.getSelected());
return choiceMade.start();
}
return choice;
}
};
}
private Object eval;
private Supplier<?> onCompletion;
public abstract DialogElement start();
public Dialog evaluateTo(Supplier<?> s) {
onCompletion = s;
return this;
}
@SuppressWarnings("unchecked")
public <T> T getResult() {
return (T) eval;
}
public void complete() {
if (onCompletion != null) {
eval = onCompletion.get();
}
}
public abstract DialogElement receive(String answer);
}

View File

@@ -0,0 +1,25 @@
package io.xpipe.core.config;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.EqualsAndHashCode;
import java.util.UUID;
@EqualsAndHashCode
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public abstract class DialogElement {
protected String id;
public DialogElement() {
this.id = UUID.randomUUID().toString();
}
public boolean apply(String value) {
throw new UnsupportedOperationException();
}
public String getId() {
return id;
}
}

View File

@@ -0,0 +1,24 @@
package io.xpipe.core.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("header")
public class HeaderElement extends DialogElement {
protected String header;
@JsonCreator
public HeaderElement(String header) {
this.header = header;
}
@Override
public boolean apply(String value) {
return true;
}
public String getHeader() {
return header;
}
}

View File

@@ -2,9 +2,9 @@ package io.xpipe.core.config;
import java.nio.charset.Charset;
public abstract class ConfigConverter<T> {
public abstract class QueryConverter<T> {
public static final ConfigConverter<Charset> CHARSET = new ConfigConverter<Charset>() {
public static final QueryConverter<Charset> CHARSET = new QueryConverter<Charset>() {
@Override
protected Charset fromString(String s) {
return Charset.forName(s);
@@ -16,7 +16,7 @@ public abstract class ConfigConverter<T> {
}
};
public static final ConfigConverter<String> STRING = new ConfigConverter<String>() {
public static final QueryConverter<String> STRING = new QueryConverter<String>() {
@Override
protected String fromString(String s) {
return s;
@@ -28,7 +28,19 @@ public abstract class ConfigConverter<T> {
}
};
public static final ConfigConverter<Character> CHARACTER = new ConfigConverter<Character>() {
public static final QueryConverter<Integer> INTEGER = new QueryConverter<Integer>() {
@Override
protected Integer fromString(String s) {
return Integer.parseInt(s);
}
@Override
protected String toString(Integer value) {
return value.toString();
}
};
public static final QueryConverter<Character> CHARACTER = new QueryConverter<Character>() {
@Override
protected Character fromString(String s) {
if (s.length() != 1) {
@@ -44,7 +56,7 @@ public abstract class ConfigConverter<T> {
}
};
public static final ConfigConverter<Boolean> BOOLEAN = new ConfigConverter<Boolean>() {
public static final QueryConverter<Boolean> BOOLEAN = new QueryConverter<Boolean>() {
@Override
protected Boolean fromString(String s) {
if (s.equalsIgnoreCase("y") || s.equalsIgnoreCase("yes") || s.equalsIgnoreCase("true")) {

View File

@@ -0,0 +1,47 @@
package io.xpipe.core.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@JsonSerialize(as = BaseQueryElement.class)
public class QueryElement extends BaseQueryElement {
@JsonIgnore
private final QueryConverter<?> converter;
public QueryElement(String description, boolean required, String value, QueryConverter<?> converter) {
super(description, required, value);
this.converter = converter;
}
public QueryElement(String description, boolean required, Object value, QueryConverter<?> converter) {
super(description, required, value != null ? value.toString() : null);
this.converter = converter;
}
@Override
public boolean apply(String value) {
if (value == null && this.value != null) {
if (isRequired() && this.value == null) {
return false;
}
this.value = null;
return true;
}
try {
converter.convertFromString(value);
} catch (Exception ex) {
return false;
}
this.value = value;
return true;
}
@SuppressWarnings("unchecked")
public <T> T getConvertedValue() {
return (T) converter.convertFromString(value);
}
}

View File

@@ -38,8 +38,12 @@ public abstract class DataSource<DS extends DataStore> {
return c;
}
public boolean isComplete() {
return true;
protected boolean supportsRead() {
return false;
}
protected boolean supportsWrite() {
return false;
}
/**
@@ -65,9 +69,13 @@ public abstract class DataSource<DS extends DataStore> {
*/
public abstract DataSourceInfo determineInfo() throws Exception;
public abstract DataSourceReadConnection openReadConnection() throws Exception;
public DataSourceReadConnection openReadConnection() throws Exception {
throw new UnsupportedOperationException();
}
public abstract DataSourceConnection openWriteConnection() throws Exception;
public DataSourceConnection openWriteConnection() throws Exception {
throw new UnsupportedOperationException();
}
public DataSourceConnection openAppendingWriteConnection() throws Exception {
throw new UnsupportedOperationException("Appending write is not supported");

View File

@@ -0,0 +1,94 @@
package io.xpipe.core.source;
import io.xpipe.core.data.node.*;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.type.ValueType;
import io.xpipe.core.store.JdbcStore;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
@Data
@EqualsAndHashCode(callSuper = true)
public abstract class JdbcQuerySource extends TableDataSource<JdbcStore> {
JdbcStore store;
protected abstract String createQuery();
public Connection createConnection() throws SQLException {
return store.createConnection();
}
@Override
protected boolean supportsRead() {
return true;
}
@Override
protected TableReadConnection newReadConnection() {
return new TableReadConnection() {
private Connection connection;
private Statement statement;
private TupleType dataType;
private ResultSet resultSet;
@Override
public void init() throws Exception {
connection = createConnection();
statement = connection.createStatement();
resultSet = statement.executeQuery(createQuery());
var meta = resultSet.getMetaData();
var names = new ArrayList<String>();
for (int i = 0; i < meta.getColumnCount(); i++) {
names.add(meta.getColumnName(i + 1));
}
dataType = TupleType.of(names, Collections.nCopies(names.size(), ValueType.of()));
}
@Override
public void close() throws Exception {
statement.close();
connection.close();
}
@Override
public TupleType getDataType() {
return dataType;
}
@Override
public int getRowCount() throws Exception {
return resultSet.getFetchSize();
}
@Override
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
while (resultSet.next()) {
var vals = new ArrayList<DataStructureNode>();
for (int i = 0; i < dataType.getSize(); i++) {
vals.add(ValueNode.of(resultSet.getString(i)));
}
var node = TupleNode.of(dataType.getNames(), vals);
if (!lineAcceptor.accept(node)) {
break;
}
}
}
@Override
public ArrayNode readRows(int maxLines) throws Exception {
return null;
}
};
}
}

View File

@@ -18,7 +18,7 @@ public interface TableReadConnection extends DataSourceReadConnection {
public static TableReadConnection empty() {
return new TableReadConnection() {
@Override
public TupleType getDataType() throws Exception {
public TupleType getDataType() {
return TupleType.empty();
}
@@ -42,7 +42,7 @@ public interface TableReadConnection extends DataSourceReadConnection {
/**
* Returns the data type of the table data.
*/
TupleType getDataType() throws Exception;
TupleType getDataType();
/**
* Returns the amount of rows to be read or -1 if the amount is unknown.

View File

@@ -0,0 +1,19 @@
package io.xpipe.core.store;
import com.fasterxml.jackson.annotation.JsonTypeName;
import lombok.Value;
import java.io.InputStream;
@Value
@JsonTypeName("commandInput")
public class CommandInputStore implements StreamDataStore {
String cmd;
@Override
public InputStream openInput() throws Exception {
var proc = Runtime.getRuntime().exec(cmd);
return proc.getInputStream();
}
}

View File

@@ -0,0 +1,52 @@
package io.xpipe.core.store;
import com.fasterxml.jackson.annotation.JsonTypeName;
import lombok.Value;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Map;
import java.util.Optional;
@Value
@JsonTypeName("httpRequest")
public class HttpRequestStore implements StreamDataStore {
public static boolean isHttpRequest(String s) {
return s.startsWith("http:") || s.startsWith("https:");
}
public static Optional<HttpRequestStore> fromString(String s) {
try {
var uri = new URI(s);
return Optional.of(new HttpRequestStore(uri, Map.of()));
} catch (URISyntaxException e) {
return Optional.empty();
}
}
URI uri;
Map<String, String> headers;
@Override
public InputStream openInput() throws Exception {
var b = HttpRequest.newBuilder().uri(uri);
headers.forEach(b::setHeader);
var req = b.GET().build();
var client = HttpClient.newHttpClient();
var res = client.send(req, HttpResponse.BodyHandlers.ofByteArray());
return new ByteArrayInputStream(res.body());
}
@Override
public boolean exists() {
return false;
}
}

View File

@@ -0,0 +1,34 @@
package io.xpipe.core.store;
import lombok.*;
import lombok.experimental.FieldDefaults;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@Getter
@EqualsAndHashCode
@ToString
@AllArgsConstructor
public abstract class JdbcStore implements DataStore {
String hostname;
int port;
public void checkConnect() throws Exception {
try (Connection con = createConnection()) {
return;
}
}
public Connection createConnection() throws SQLException {
return DriverManager.getConnection(toUrl(), toProperties());
}
public abstract String toUrl();
public abstract Properties toProperties();
}

View File

@@ -49,7 +49,9 @@ public interface StreamDataStore extends DataStore {
throw new UnsupportedOperationException("Can't open store output");
}
boolean exists();
default boolean exists() {
return false;
}
default boolean persistent() {
return false;

View File

@@ -11,6 +11,9 @@ import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.xpipe.core.config.BaseQueryElement;
import io.xpipe.core.config.ChoiceElement;
import io.xpipe.core.config.HeaderElement;
import io.xpipe.core.data.type.ArrayType;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.type.ValueType;
@@ -18,6 +21,7 @@ import io.xpipe.core.data.type.WildcardType;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceReference;
import io.xpipe.core.store.CollectionEntryDataStore;
import io.xpipe.core.store.HttpRequestStore;
import io.xpipe.core.store.LocalDirectoryDataStore;
import io.xpipe.core.store.LocalFileDataStore;
@@ -33,6 +37,7 @@ public class CoreJacksonModule extends SimpleModule {
new NamedType(LocalFileDataStore.class),
new NamedType(LocalDirectoryDataStore.class),
new NamedType(CollectionEntryDataStore.class),
new NamedType(HttpRequestStore.class),
new NamedType(ValueType.class),
new NamedType(TupleType.class),
new NamedType(ArrayType.class),
@@ -41,7 +46,10 @@ public class CoreJacksonModule extends SimpleModule {
new NamedType(DataSourceInfo.Structure.class),
new NamedType(DataSourceInfo.Text.class),
new NamedType(DataSourceInfo.Collection.class),
new NamedType(DataSourceInfo.Raw.class)
new NamedType(DataSourceInfo.Raw.class),
new NamedType(BaseQueryElement.class),
new NamedType(ChoiceElement.class),
new NamedType(HeaderElement.class)
);
addSerializer(Charset.class, new CharsetSerializer());

View File

@@ -22,7 +22,9 @@ module io.xpipe.core {
requires com.fasterxml.jackson.core;
requires com.fasterxml.jackson.databind;
requires java.net.http;
requires static lombok;
requires java.sql;
uses com.fasterxml.jackson.databind.Module;
provides com.fasterxml.jackson.databind.Module with CoreJacksonModule;