This commit is contained in:
Christopher Schnick
2022-10-15 16:06:02 +02:00
parent 27843ae0fd
commit 55da767dab
186 changed files with 1541 additions and 1606 deletions

View File

@@ -27,10 +27,6 @@ import java.nio.file.Path;
*/
public interface DataSource {
void forwardTo(DataSource target);
void appendTo(DataSource target);
/**
* NOT YET IMPLEMENTED!
*
@@ -126,7 +122,6 @@ public interface DataSource {
}
}
/**
* Wrapper for {@link #create(DataSourceId, String, InputStream)} that creates an anonymous data source.
*/
@@ -168,6 +163,10 @@ public interface DataSource {
return DataSourceImpl.create(id, type, in);
}
void forwardTo(DataSource target);
void appendTo(DataSource target);
public io.xpipe.core.source.DataSource<?> getInternalSource();
/**
@@ -180,7 +179,6 @@ public interface DataSource {
*/
DataSourceType getType();
DataSourceConfig getConfig();
/**

View File

@@ -12,7 +12,9 @@ public class DataStores {
public static void addNamedStore(DataStore store, String name) {
XPipeConnection.execute(con -> {
var req = StoreAddExchange.Request.builder()
.storeInput(store).name(name).build();
.storeInput(store)
.name(name)
.build();
StoreAddExchange.Response res = con.performSimpleExchange(req);
new QuietDialogHandler(res.getConfig(), con, Map.of()).handle();

View File

@@ -8,6 +8,8 @@ import java.util.Optional;
public final class XPipeConnection extends BeaconConnection {
private XPipeConnection() {}
public static XPipeConnection open() {
var con = new XPipeConnection();
con.constructSocket();
@@ -23,7 +25,9 @@ public final class XPipeConnection extends BeaconConnection {
throw new IllegalStateException();
}
DialogExchange.Response response = con.performSimpleExchange(DialogExchange.Request.builder().dialogKey(reference.getDialogId()).build());
DialogExchange.Response response = con.performSimpleExchange(DialogExchange.Request.builder()
.dialogKey(reference.getDialogId())
.build());
element = response.getElement();
if (response.getElement() == null) {
break;
@@ -58,40 +62,6 @@ public final class XPipeConnection extends BeaconConnection {
}
}
private XPipeConnection() {
}
@Override
protected void constructSocket() {
if (!BeaconServer.isRunning()) {
try {
start();
} catch (Exception ex) {
throw new BeaconException("Unable to start xpipe daemon", ex);
}
var r = waitForStartup(null);
if (r.isEmpty()) {
throw new BeaconException("Wait for xpipe daemon timed out");
} else {
beaconClient = r.get();
return;
}
}
try {
beaconClient = new BeaconClient();
} catch (Exception ex) {
throw new BeaconException("Unable to connect to running xpipe daemon", ex);
}
}
private void start() throws Exception {
if (BeaconServer.tryStart() == null) {
throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command");
};
}
public static Optional<BeaconClient> waitForStartup(Process process) {
for (int i = 0; i < 160; i++) {
if (process != null && !process.isAlive()) {
@@ -125,6 +95,38 @@ public final class XPipeConnection extends BeaconConnection {
}
}
@Override
protected void constructSocket() {
if (!BeaconServer.isRunning()) {
try {
start();
} catch (Exception ex) {
throw new BeaconException("Unable to start xpipe daemon", ex);
}
var r = waitForStartup(null);
if (r.isEmpty()) {
throw new BeaconException("Wait for xpipe daemon timed out");
} else {
beaconClient = r.get();
return;
}
}
try {
beaconClient = new BeaconClient();
} catch (Exception ex) {
throw new BeaconException("Unable to connect to running xpipe daemon", ex);
}
}
private void start() throws Exception {
if (BeaconServer.tryStart() == null) {
throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command");
}
;
}
@FunctionalInterface
public static interface Handler {

View File

@@ -2,7 +2,9 @@ package io.xpipe.api.impl;
import io.xpipe.api.DataRaw;
import io.xpipe.api.DataSourceConfig;
import io.xpipe.core.source.*;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceType;
import java.io.InputStream;
@@ -10,7 +12,11 @@ public class DataRawImpl extends DataSourceImpl implements DataRaw {
private final DataSourceInfo.Raw info;
public DataRawImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Raw info, io.xpipe.core.source.DataSource<?> internalSource) {
public DataRawImpl(
DataSourceId sourceId,
DataSourceConfig sourceConfig,
DataSourceInfo.Raw info,
io.xpipe.core.source.DataSource<?> internalSource) {
super(sourceId, sourceConfig, internalSource);
this.info = info;
}

View File

@@ -13,28 +13,15 @@ import java.io.InputStream;
public abstract class DataSourceImpl implements DataSource {
@Override
public void forwardTo(DataSource target) {
XPipeConnection.execute(con -> {
var req = ForwardExchange.Request.builder()
.source(DataSourceReference.id(sourceId))
.target(DataSourceReference.id(target.getId()))
.build();
ForwardExchange.Response res = con.performSimpleExchange(req);
});
}
@Override
public void appendTo(DataSource target) {
XPipeConnection.execute(con -> {
var req = ForwardExchange.Request.builder()
.source(DataSourceReference.id(sourceId))
.target(DataSourceReference.id(target.getId()))
.append(true)
.build();
ForwardExchange.Response res = con.performSimpleExchange(req);
});
private final DataSourceId sourceId;
private final DataSourceConfig config;
private final io.xpipe.core.source.DataSource<?> internalSource;
public DataSourceImpl(
DataSourceId sourceId, DataSourceConfig config, io.xpipe.core.source.DataSource<?> internalSource) {
this.sourceId = sourceId;
this.config = config;
this.internalSource = internalSource;
}
public static DataSource get(DataSourceReference ds) {
@@ -59,17 +46,17 @@ public abstract class DataSourceImpl implements DataSource {
var info = res.getInfo().asRaw();
yield new DataRawImpl(res.getId(), config, info, res.getInternalSource());
}
case COLLECTION -> throw new UnsupportedOperationException("Unimplemented case: " + res.getInfo().getType());
default -> throw new IllegalArgumentException("Unexpected value: " + res.getInfo().getType());
case COLLECTION -> throw new UnsupportedOperationException(
"Unimplemented case: " + res.getInfo().getType());
default -> throw new IllegalArgumentException(
"Unexpected value: " + res.getInfo().getType());
};
});
}
public static DataSource create(DataSourceId id, io.xpipe.core.source.DataSource<?> source) {
var startReq = AddSourceExchange.Request.builder()
.source(source)
.target(id)
.build();
var startReq =
AddSourceExchange.Request.builder().source(source).target(id).build();
var returnedId = XPipeConnection.execute(con -> {
AddSourceExchange.Response r = con.performSimpleExchange(startReq);
return r.getId();
@@ -108,9 +95,7 @@ public abstract class DataSourceImpl implements DataSource {
var configInstance = startRes.getConfig();
XPipeConnection.finishDialog(configInstance);
var ref = id != null ?
DataSourceReference.id(id) :
DataSourceReference.latest();
var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest();
return get(ref);
}
@@ -137,20 +122,31 @@ public abstract class DataSourceImpl implements DataSource {
var configInstance = startRes.getConfig();
XPipeConnection.finishDialog(configInstance);
var ref = id != null ?
DataSourceReference.id(id) :
DataSourceReference.latest();
var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest();
return get(ref);
}
private final DataSourceId sourceId;
private final DataSourceConfig config;
private final io.xpipe.core.source.DataSource<?> internalSource;
@Override
public void forwardTo(DataSource target) {
XPipeConnection.execute(con -> {
var req = ForwardExchange.Request.builder()
.source(DataSourceReference.id(sourceId))
.target(DataSourceReference.id(target.getId()))
.build();
ForwardExchange.Response res = con.performSimpleExchange(req);
});
}
public DataSourceImpl(DataSourceId sourceId, DataSourceConfig config, io.xpipe.core.source.DataSource<?> internalSource) {
this.sourceId = sourceId;
this.config = config;
this.internalSource = internalSource;
@Override
public void appendTo(DataSource target) {
XPipeConnection.execute(con -> {
var req = ForwardExchange.Request.builder()
.source(DataSourceReference.id(sourceId))
.target(DataSourceReference.id(target.getId()))
.append(true)
.build();
ForwardExchange.Response res = con.performSimpleExchange(req);
});
}
public io.xpipe.core.source.DataSource<?> getInternalSource() {

View File

@@ -3,13 +3,19 @@ package io.xpipe.api.impl;
import io.xpipe.api.DataSourceConfig;
import io.xpipe.api.DataStructure;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.source.*;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceType;
public class DataStructureImpl extends DataSourceImpl implements DataStructure {
private final DataSourceInfo.Structure info;
public DataStructureImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Structure info, io.xpipe.core.source.DataSource<?> internalSource) {
public DataStructureImpl(
DataSourceId sourceId,
DataSourceConfig sourceConfig,
DataSourceInfo.Structure info,
io.xpipe.core.source.DataSource<?> internalSource) {
super(sourceId, sourceConfig, internalSource);
this.info = info;
}

View File

@@ -47,7 +47,11 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator {
connection.close();
var req = ReadExchange.Request.builder()
.target(id).store(res.getStore()).provider("xpbt").configureAll(false).build();
.target(id)
.store(res.getStore())
.provider("xpbt")
.configureAll(false)
.build();
ReadExchange.Response response = XPipeConnection.execute(con -> {
return con.performSimpleExchange(req);
});
@@ -71,7 +75,9 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator {
@Override
public synchronized void add(DataStructureNode row) {
TupleNode toUse = type.matches(row) ? row.asTuple() : type.convert(row).orElseThrow().asTuple();
TupleNode toUse = type.matches(row)
? row.asTuple()
: type.convert(row).orElseThrow().asTuple();
connection.withOutputStream(out -> {
writeDescriptor();
TypedDataStreamWriter.writeStructure(out, toUse, writtenDescriptor);

View File

@@ -27,7 +27,11 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
private final DataSourceInfo.Table info;
DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info, io.xpipe.core.source.DataSource<?> internalSource) {
DataTableImpl(
DataSourceId id,
DataSourceConfig sourceConfig,
DataSourceInfo.Table info,
io.xpipe.core.source.DataSource<?> internalSource) {
super(id, sourceConfig, internalSource);
this.info = info;
}
@@ -44,8 +48,8 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
public Stream<TupleNode> stream() {
var iterator = new TableIterator();
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false).onClose(iterator::finish);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.onClose(iterator::finish);
}
@Override
@@ -63,15 +67,23 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
List<DataStructureNode> nodes = new ArrayList<>();
XPipeConnection.execute(con -> {
var req = QueryTableDataExchange.Request.builder()
.ref(DataSourceReference.id(getId())).maxRows(maxRows).build();
.ref(DataSourceReference.id(getId()))
.maxRows(maxRows)
.build();
con.performInputExchange(req, (QueryTableDataExchange.Response res, InputStream in) -> {
var r = new TypedDataStreamParser(info.getDataType());
r.parseStructures(in, TypedDataStructureNodeReader.of(info.getDataType()), nodes::add);
var r = new TypedDataStreamParser(info.getDataType());
r.parseStructures(in, TypedDataStructureNodeReader.of(info.getDataType()), nodes::add);
});
});
return ArrayNode.of(nodes);
}
@Override
public Iterator<TupleNode> iterator() {
return new TableIterator();
}
;
private class TableIterator implements Iterator<TupleNode> {
private final BeaconConnection connection;
@@ -85,7 +97,9 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
connection = XPipeConnection.open();
var req = QueryTableDataExchange.Request.builder()
.ref(DataSourceReference.id(getId())).maxRows(Integer.MAX_VALUE).build();
.ref(DataSourceReference.id(getId()))
.maxRows(Integer.MAX_VALUE)
.build();
connection.sendRequest(req);
connection.receiveResponse();
connection.receiveBody();
@@ -116,10 +130,5 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
return node;
}
};
@Override
public Iterator<TupleNode> iterator() {
return new TableIterator();
}
}

View File

@@ -27,7 +27,11 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
private final DataSourceInfo.Text info;
public DataTextImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Text info, io.xpipe.core.source.DataSource<?> internalSource) {
public DataTextImpl(
DataSourceId sourceId,
DataSourceConfig sourceConfig,
DataSourceInfo.Text info,
io.xpipe.core.source.DataSource<?> internalSource) {
super(sourceId, sourceConfig, internalSource);
this.info = info;
}
@@ -70,7 +74,9 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
{
connection = XPipeConnection.open();
var req = QueryTextDataExchange.Request.builder()
.ref(DataSourceReference.id(getId())).maxLines(-1).build();
.ref(DataSourceReference.id(getId()))
.maxLines(-1)
.build();
connection.sendRequest(req);
connection.receiveResponse();
reader = new BufferedReader(new InputStreamReader(connection.receiveBody(), StandardCharsets.UTF_8));
@@ -98,8 +104,7 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
}
};
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.onClose(iterator::close);
}
@@ -123,6 +128,4 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
});
return builder.toString();
}
}

View File

@@ -14,9 +14,9 @@ import java.util.UUID;
public class QuietDialogHandler {
private final UUID dialogKey;
private DialogElement element;
private final BeaconConnection connection;
private final Map<String, String> overrides;
private DialogElement element;
public QuietDialogHandler(DialogReference ref, BeaconConnection connection, Map<String, String> overrides) {
this.dialogKey = ref.getDialogId();
@@ -36,10 +36,13 @@ public class QuietDialogHandler {
response = handleQuery(q);
}
DialogExchange.Response res = connection.performSimpleExchange(
DialogExchange.Request.builder().dialogKey(dialogKey).value(response).build());
DialogExchange.Response res = connection.performSimpleExchange(DialogExchange.Request.builder()
.dialogKey(dialogKey)
.value(response)
.build());
if (res.getElement() != null && element.equals(res.getElement())) {
throw new ClientException("Invalid value for key " + res.getElement().toDisplayString());
throw new ClientException(
"Invalid value for key " + res.getElement().toDisplayString());
}
element = res.getElement();

View File

@@ -6,8 +6,7 @@ import java.util.stream.Collectors;
public class TypeDescriptor {
public static String create(List<String> names) {
return "[" + names.stream()
.map(n -> n != null ? "\"" + n + "\"" : null)
.collect(Collectors.joining(",")) + "]\n";
return "[" + names.stream().map(n -> n != null ? "\"" + n + "\"" : null).collect(Collectors.joining(","))
+ "]\n";
}
}

View File

@@ -5,4 +5,4 @@ module io.xpipe.api {
requires transitive io.xpipe.core;
requires io.xpipe.beacon;
}
}