Rework beacon connection and implement various improvements

This commit is contained in:
Christopher Schnick
2022-03-07 22:59:48 +01:00
parent 46e83ae757
commit f5cccd5687
29 changed files with 386 additions and 117 deletions

View File

@@ -1,7 +1,10 @@
package io.xpipe.api;
import io.xpipe.api.impl.DataTableAccumulatorImpl;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.source.DataSourceId;
/**
@@ -13,6 +16,17 @@ import io.xpipe.core.source.DataSourceId;
*/
public interface DataTableAccumulator {
public static DataTableAccumulator create(TupleType type) {
return new DataTableAccumulatorImpl(type);
}
/**
* Wrapper for {@link #finish(DataSourceId)}.
*/
default DataTable finish(String id) {
return finish(DataSourceId.fromString(id));
}
/**
* Finishes the construction process and returns the data source reference.
*
@@ -25,12 +39,12 @@ public interface DataTableAccumulator {
*
* @param row the row to add
*/
void add(TupleNode row);
void add(DataStructureNode row);
/**
* Creates a tuple acceptor that adds all accepted tuples to the table.
*/
DataStructureNodeAcceptor<TupleNode> acceptor();
DataStructureNodeAcceptor<DataStructureNode> acceptor();
/**
* Returns the current amount of rows added to the table.

View File

@@ -17,6 +17,8 @@ public final class XPipeConnection extends BeaconConnection {
try (var con = new XPipeConnection()) {
con.constructSocket();
handler.handle(con);
} catch (BeaconException e) {
throw e;
} catch (Exception e) {
throw new BeaconException(e);
}
@@ -26,6 +28,8 @@ public final class XPipeConnection extends BeaconConnection {
try (var con = new XPipeConnection()) {
con.constructSocket();
return mapper.handle(con);
} catch (BeaconException e) {
throw e;
} catch (Exception e) {
throw new BeaconException(e);
}

View File

@@ -44,7 +44,7 @@ public abstract class DataSourceImpl implements DataSource {
public static DataSource create(DataSourceId id, String type, Map<String,String> config, InputStream in) {
var res = XPipeConnection.execute(con -> {
var req = PreStoreExchange.Request.builder().build();
PreStoreExchange.Response r = con.performOutputExchange(req, in::transferTo);
PreStoreExchange.Response r = con.performOutputExchange(req, out -> in.transferTo(out));
return r;
});

View File

@@ -4,8 +4,10 @@ import io.xpipe.api.DataSource;
import io.xpipe.api.DataTable;
import io.xpipe.api.DataTableAccumulator;
import io.xpipe.api.connector.XPipeConnection;
import io.xpipe.api.util.TypeDescriptor;
import io.xpipe.beacon.exchange.PreStoreExchange;
import io.xpipe.beacon.exchange.ReadExecuteExchange;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
@@ -14,21 +16,26 @@ import io.xpipe.core.source.DataSourceConfigInstance;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceReference;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
public class DataTableAccumulatorImpl implements DataTableAccumulator {
private final XPipeConnection connection;
private final TupleType type;
private int rows;
private TupleType writtenDescriptor;
public DataTableAccumulatorImpl(TupleType type) {
this.type = type;
connection = XPipeConnection.open();
connection.sendRequest(PreStoreExchange.Request.builder().build());
connection.sendBodyStart();
connection.sendBody();
}
@Override
public synchronized DataTable finish(DataSourceId id) {
connection.withOutputStream(OutputStream::close);
PreStoreExchange.Response res = connection.receiveResponse();
connection.close();
@@ -40,16 +47,29 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator {
return DataSource.get(DataSourceReference.id(id)).asTable();
}
@Override
public synchronized void add(TupleNode row) {
private void writeDescriptor() {
if (writtenDescriptor != null) {
return;
}
writtenDescriptor = TupleType.tableType(type.getNames());
connection.withOutputStream(out -> {
TypedDataStreamWriter.writeStructure(connection.getOutputStream(), row, type);
out.write((TypeDescriptor.create(type.getNames())).getBytes(StandardCharsets.UTF_8));
});
}
@Override
public synchronized void add(DataStructureNode row) {
TupleNode toUse = type.matches(row) ? row.asTuple() : type.convert(row).orElseThrow().asTuple();
connection.withOutputStream(out -> {
writeDescriptor();
TypedDataStreamWriter.writeStructure(out, toUse, writtenDescriptor);
rows++;
});
}
@Override
public synchronized DataStructureNodeAcceptor<TupleNode> acceptor() {
public synchronized DataStructureNodeAcceptor<DataStructureNode> acceptor() {
return node -> {
add(node);
return true;

View File

@@ -0,0 +1,13 @@
package io.xpipe.api.util;
import java.util.List;
import java.util.stream.Collectors;
public class TypeDescriptor {
public static String create(List<String> names) {
return "[" + names.stream()
.map(n -> n != null ? "\"" + n + "\"" : null)
.collect(Collectors.joining(",")) + "]\n";
}
}