Various small fixes

This commit is contained in:
Christopher Schnick
2022-12-23 10:29:08 +01:00
parent 71b5d2d716
commit 9a2a1a8745
18 changed files with 415 additions and 137 deletions

View File

@@ -42,10 +42,8 @@ public abstract class DataSourceImpl implements DataSource {
case RAW -> {
yield new DataRawImpl(res.getId(), config, res.getInternalSource());
}
case COLLECTION -> throw new UnsupportedOperationException(
"Unimplemented case: " + res.getType());
default -> throw new IllegalArgumentException(
"Unexpected value: " + res.getType());
case COLLECTION -> throw new UnsupportedOperationException("Unimplemented case: " + res.getType());
default -> throw new IllegalArgumentException("Unexpected value: " + res.getType());
};
});
}
@@ -64,17 +62,18 @@ public abstract class DataSourceImpl implements DataSource {
public static DataSource create(DataSourceId id, String type, DataStore store) {
if (store instanceof StreamDataStore s && s.isContentExclusivelyAccessible()) {
var res = XPipeApiConnection.execute(con -> {
var req = StoreStreamExchange.Request.builder().build();
StoreStreamExchange.Response r = con.performOutputExchange(req, out -> {
store = XPipeApiConnection.execute(con -> {
var internal = con.createInternalStreamStore();
var req = WriteStreamExchange.Request.builder()
.name(internal.getUuid().toString())
.build();
con.performOutputExchange(req, out -> {
try (InputStream inputStream = s.openInput()) {
inputStream.transferTo(out);
}
});
return r;
return internal;
});
store = res.getStore();
}
var startReq = ReadExchange.Request.builder()
@@ -96,14 +95,17 @@ public abstract class DataSourceImpl implements DataSource {
}
public static DataSource create(DataSourceId id, String type, InputStream in) {
var res = XPipeApiConnection.execute(con -> {
var req = StoreStreamExchange.Request.builder().build();
StoreStreamExchange.Response r = con.performOutputExchange(req, out -> in.transferTo(out));
return r;
var store = XPipeApiConnection.execute(con -> {
var internal = con.createInternalStreamStore();
var req = WriteStreamExchange.Request.builder()
.name(internal.getUuid().toString())
.build();
con.performOutputExchange(req, out -> {
in.transferTo(out);
});
return internal;
});
var store = res.getStore();
var startReq = ReadExchange.Request.builder()
.provider(type)
.store(store)

View File

@@ -7,12 +7,15 @@ import io.xpipe.api.connector.XPipeApiConnection;
import io.xpipe.api.util.TypeDescriptor;
import io.xpipe.beacon.BeaconException;
import io.xpipe.beacon.exchange.ReadExchange;
import io.xpipe.beacon.exchange.StoreStreamExchange;
import io.xpipe.beacon.exchange.WriteStreamExchange;
import io.xpipe.beacon.exchange.cli.StoreAddExchange;
import io.xpipe.beacon.util.QuietDialogHandler;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedDataStreamWriter;
import io.xpipe.core.impl.InternalStreamStore;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceReference;
@@ -25,13 +28,20 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator {
private final XPipeApiConnection connection;
private final TupleType type;
private int rows;
private InternalStreamStore store;
private TupleType writtenDescriptor;
private OutputStream bodyOutput;
public DataTableAccumulatorImpl(TupleType type) {
this.type = type;
connection = XPipeApiConnection.open();
connection.sendRequest(StoreStreamExchange.Request.builder().build());
store = new InternalStreamStore();
var addReq = StoreAddExchange.Request.builder().storeInput(store).name(store.getUuid().toString()).build();
StoreAddExchange.Response addRes = connection.performSimpleExchange(addReq);
QuietDialogHandler.handle(addRes.getConfig(), connection);
connection.sendRequest(WriteStreamExchange.Request.builder().name(store.getUuid().toString()).build());
bodyOutput = connection.sendBody();
}
@@ -43,12 +53,12 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator {
throw new BeaconException(e);
}
StoreStreamExchange.Response res = connection.receiveResponse();
WriteStreamExchange.Response res = connection.receiveResponse();
connection.close();
var req = ReadExchange.Request.builder()
.target(id)
.store(res.getStore())
.store(store)
.provider("xpbt")
.configureAll(false)
.build();