Rework many parts and move some components into the core module

This commit is contained in:
Christopher Schnick
2022-01-08 04:30:16 +01:00
parent e5a84a83db
commit b192fd5b09
29 changed files with 644 additions and 67 deletions

View File

@@ -2,7 +2,7 @@ package io.xpipe.api;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TupleType;
import java.util.OptionalInt;
import java.util.stream.Stream;
@@ -15,7 +15,7 @@ public interface DataTable extends Iterable<TupleNode>, DataSource {
OptionalInt getRowCountIfPresent();
DataType getDataType();
TupleType getDataType();
ArrayNode readAll();

View File

@@ -25,16 +25,7 @@ public abstract class DataSourceImpl implements DataSource {
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = ReadInfoExchange.Request.builder().sourceId(ds).build();
ReadInfoExchange.Response res = performSimpleExchange(sc, req);
switch (res.getType()) {
case TABLE -> {
var data = res.getTableData();
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data.getRowCount(), data.getDataType());
}
case STRUCTURE -> {
}
case RAW -> {
}
}
}
}.execute();
return source[0];
@@ -53,10 +44,10 @@ public abstract class DataSourceImpl implements DataSource {
s.transferTo(out);
}
});
switch (res.getSourceType()) {
switch (res.getInfo().getType()) {
case TABLE -> {
var data = res.getTableData();
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data.getRowCount(), data.getDataType());
var data = res.getInfo().asTable();
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data);
}
case STRUCTURE -> {
}
@@ -75,16 +66,7 @@ public abstract class DataSourceImpl implements DataSource {
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = StoreStreamExchange.Request.builder().type(type).build();
StoreStreamExchange.Response res = performOutputExchange(sc, req, in::transferTo);
switch (res.getSourceType()) {
case TABLE -> {
var data = res.getTableData();
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data.getRowCount(), data.getDataType());
}
case STRUCTURE -> {
}
case RAW -> {
}
}
}
}.execute();
return source[0];

View File

@@ -10,13 +10,14 @@ import io.xpipe.beacon.exchange.ReadTableDataExchange;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedAbstractReader;
import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader;
import io.xpipe.core.source.DataSourceConfig;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceType;
import java.io.IOException;
@@ -29,14 +30,17 @@ import java.util.stream.StreamSupport;
public class DataTableImpl extends DataSourceImpl implements DataTable {
private final DataSourceId id;
private final int size;
private final DataType dataType;
private final DataSourceInfo.Table info;
public DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, int size, DataType dataType) {
public DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info) {
super(id, sourceConfig);
this.id = id;
this.size = size;
this.dataType = dataType;
this.info = info;
}
@Override
public DataTable asTable() {
return this;
}
public Stream<TupleNode> stream() {
@@ -56,21 +60,21 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
@Override
public int getRowCount() {
if (size == -1) {
if (info.getRowCount() == -1) {
throw new UnsupportedOperationException("Row count is unknown");
}
return size;
return info.getRowCount();
}
@Override
public OptionalInt getRowCountIfPresent() {
return size != -1 ? OptionalInt.of(size) : OptionalInt.empty();
return info.getRowCount() != -1 ? OptionalInt.of(info.getRowCount()) : OptionalInt.empty();
}
@Override
public DataType getDataType() {
return dataType;
public TupleType getDataType() {
return info.getDataType();
}
@Override
@@ -80,7 +84,7 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
@Override
public ArrayNode read(int maxRows) {
int maxToRead = size == -1 ? maxRows : Math.min(size, maxRows);
int maxToRead = info.getRowCount() == -1 ? maxRows : Math.min(info.getRowCount(), maxRows);
List<DataStructureNode> nodes = new ArrayList<>();
new XPipeApiConnector() {
@@ -89,8 +93,9 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
var req = ReadTableDataExchange.Request.builder()
.sourceId(id).maxRows(maxToRead).build();
performInputExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
var r = new TypedDataStreamParser(dataType);
r.parseStructures(in, TypedDataStructureNodeReader.immutable(dataType), nodes::add);
var r = new TypedDataStreamParser(info.getDataType());
r.parseStructures(in, TypedDataStructureNodeReader.immutable(info.getDataType()), nodes::add);
return true;
});
}
}.execute();
@@ -103,7 +108,7 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
private InputStream input;
private int read;
private final int toRead = size;
private final int toRead = info.getRowCount();
private final TypedDataStreamParser parser;
private final TypedAbstractReader nodeReader;
@@ -114,21 +119,33 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
var req = ReadTableDataExchange.Request.builder()
.sourceId(id).maxRows(Integer.MAX_VALUE).build();
performInputExchange(sc, req,
(ReadTableDataExchange.Response res, InputStream in) -> input = in);
(ReadTableDataExchange.Response res, InputStream in) -> {
input = in;
return false;
});
}
}.execute();
nodeReader = TypedReusableDataStructureNodeReader.create(dataType);
parser = new TypedDataStreamParser(dataType);
nodeReader = TypedReusableDataStructureNodeReader.create(info.getDataType());
parser = new TypedDataStreamParser(info.getDataType());
}
private void finish() {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean hasKnownSize() {
return size != -1;
return info.getRowCount() != -1;
}
@Override
public boolean hasNext() {
if (hasKnownSize() && read == toRead) {
finish();
return false;
}
@@ -137,8 +154,13 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
}
try {
return parser.hasNext(input);
var hasNext = parser.hasNext(input);
if (!hasNext) {
finish();
}
return hasNext;
} catch (IOException ex) {
finish();
throw new UncheckedIOException(ex);
}
}