Add more data sources and refactor some parts

This commit is contained in:
Christopher Schnick
2022-03-03 16:44:14 +01:00
parent bcc581c0bd
commit 2c041ecb0e
42 changed files with 775 additions and 196 deletions

View File

@@ -0,0 +1,16 @@
package io.xpipe.api;
import io.xpipe.core.source.DataSourceInfo;
import java.io.InputStream;
public interface DataRaw extends DataSource {
DataSourceInfo.Raw getInfo();
InputStream open();
byte[] readAll();
byte[] read(int maxBytes);
}

View File

@@ -3,6 +3,7 @@ package io.xpipe.api;
import io.xpipe.api.impl.DataSourceImpl;
import io.xpipe.core.source.DataSourceConfig;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceReference;
import io.xpipe.core.source.DataSourceType;
import java.io.InputStream;
@@ -10,14 +11,22 @@ import java.net.URL;
import java.util.Map;
/**
* Represents a reference to an XPipe data source.
* Represents a reference to a data source that is managed by X-Pipe.
*
* The actual data is only queried when required and is not cached.
* Therefore, the queried data is always up-to-date at the point of calling a method that queries the data.
*
* As soon a data source reference is created, the data source is locked
* within X-Pipe to prevent concurrent modification and the problems that can arise from it.
* By default, the lock is held until the calling program terminates and prevents
* other applications from modifying the data source in any way.
* To unlock the data source earlier, you can make use the {@link #unlock()} method.
*/
public interface DataSource {
/**
* NOT YET IMPLEMENTED!
*
* Creates a new supplier data source that will be interpreted as the generated data source.
* In case this program should be a data source generator, this method has to be called at
* least once to register that it actually generates a data source.
@@ -29,28 +38,49 @@ public interface DataSource {
*
* @return the generator data source
*/
@Deprecated
static DataSource supplySource() {
return null;
}
/**
* Wrapper for {@link #get(DataSourceId)}.
* Wrapper for {@link #get(DataSourceReference)}.
*
* @throws IllegalArgumentException if {@code id} is not a valid data source id
*/
static DataSource get(String id) {
return get(DataSourceId.fromString(id));
static DataSource getById(String id) {
return get(DataSourceReference.id(id));
}
/**
* Wrapper for {@link #get(DataSourceReference)} using the latest reference.
*/
static DataSource getLatest() {
return get(DataSourceReference.latest());
}
/**
* Wrapper for {@link #get(DataSourceReference)} using a name reference.
*/
static DataSource getByName(String name) {
return get(DataSourceReference.name(name));
}
/**
* Retrieves a reference to the given data source.
*
* @param id the data source id
* @return a reference to the data source that can be used to access the underlying data source
* @param ref the data source reference
*/
static DataSource get(DataSourceId id) {
return null;
//return DataSourceImpl.get(id);
static DataSource get(DataSourceReference ref) {
return DataSourceImpl.get(ref);
}
/**
* Releases the lock held by this program for this data source such
* that other applications can modify the data source again.
*/
static void unlock() {
throw new UnsupportedOperationException();
}
static DataSource wrap(InputStream in, String type, Map<String, String> configOptions) {
@@ -110,4 +140,31 @@ public interface DataSource {
default DataTable asTable() {
throw new UnsupportedOperationException("Data source is not a table");
}
/**
* Attempts to cast this object to a {@link DataStructure}.
*
* @throws UnsupportedOperationException if the data source is not a structure
*/
default DataStructure asStructure() {
throw new UnsupportedOperationException("Data source is not a structure");
}
/**
* Attempts to cast this object to a {@link DataText}.
*
* @throws UnsupportedOperationException if the data source is not a text
*/
default DataText asText() {
throw new UnsupportedOperationException("Data source is not a text");
}
/**
* Attempts to cast this object to a {@link DataRaw}.
*
* @throws UnsupportedOperationException if the data source is not raw
*/
default DataRaw asRaw() {
throw new UnsupportedOperationException("Data source is not raw");
}
}

View File

@@ -0,0 +1,11 @@
package io.xpipe.api;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.source.DataSourceInfo;
public interface DataStructure extends DataSource {
DataSourceInfo.Structure getInfo();
DataStructureNode read();
}

View File

@@ -2,28 +2,16 @@ package io.xpipe.api;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.source.DataSourceInfo;
import java.util.OptionalInt;
import java.util.stream.Stream;
public interface DataTable extends Iterable<TupleNode>, DataSource {
/**
* @see DataSource#supplySource()
*/
static DataTable supplySource() {
return null;
}
DataSourceInfo.Table getInfo();
Stream<TupleNode> stream();
int getRowCount();
OptionalInt getRowCountIfPresent();
TupleType getDataType();
ArrayNode readAll();
ArrayNode read(int maxRows);

View File

@@ -0,0 +1,28 @@
package io.xpipe.api;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.source.DataSourceId;
/**
* An accumulator for table data.
*
* This class can be used to construct new table data sources by
* accumulating the rows using {@link #add(TupleNode)} or {@link #acceptor()} and then calling
* {@link #finish(DataSourceId)} to complete the construction process and create a new data source.
*/
public interface DataTableAccumulator {
/**
* Finishes the construction process and returns the data source reference.
*
* @param id the data source id to assign
*/
DataTable finish(DataSourceId id);
void add(TupleNode row);
DataStructureNodeAcceptor<TupleNode> acceptor();
int getCurrentRows();
}

View File

@@ -0,0 +1,18 @@
package io.xpipe.api;
import io.xpipe.core.source.DataSourceInfo;
import java.util.List;
public interface DataText extends DataSource, Iterable<String> {
DataSourceInfo.Text getInfo();
List<String> readAllLines();
List<String> readLines(int maxLines);
String readAll();
String read(int maxCharacters);
}

View File

@@ -1,19 +0,0 @@
package io.xpipe.api;
public class XPipeClientException extends RuntimeException {
public XPipeClientException() {
}
public XPipeClientException(String message) {
super(message);
}
public XPipeClientException(String message, Throwable cause) {
super(message, cause);
}
public XPipeClientException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,19 +0,0 @@
package io.xpipe.api;
public class XPipeConnectException extends RuntimeException {
public XPipeConnectException() {
}
public XPipeConnectException(String message) {
super(message);
}
public XPipeConnectException(String message, Throwable cause) {
super(message, cause);
}
public XPipeConnectException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,8 +0,0 @@
package io.xpipe.api;
import io.xpipe.core.data.node.DataStructureNode;
public interface XPipeDataStructureSource {
DataStructureNode read();
}

View File

@@ -1,12 +0,0 @@
package io.xpipe.api;
import io.xpipe.core.source.DataSourceId;
public abstract class XPipeDataTableBuilder {
private DataSourceId id;
public abstract void write();
public abstract void commit();
}

View File

@@ -1,23 +0,0 @@
package io.xpipe.api;
public class XPipeException extends RuntimeException {
public XPipeException() {
}
public XPipeException(String message) {
super(message);
}
public XPipeException(String message, Throwable cause) {
super(message, cause);
}
public XPipeException(Throwable cause) {
super(cause);
}
public XPipeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -1,19 +0,0 @@
package io.xpipe.api;
public class XPipeServerException extends RuntimeException {
public XPipeServerException() {
}
public XPipeServerException(String message) {
super(message);
}
public XPipeServerException(String message, Throwable cause) {
super(message, cause);
}
public XPipeServerException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,4 +1,4 @@
package io.xpipe.api;
package io.xpipe.api.connector;
import io.xpipe.beacon.*;
import io.xpipe.core.util.JacksonHelper;
@@ -12,7 +12,7 @@ public abstract class XPipeApiConnector extends BeaconConnector {
var socket = constructSocket();
handle(socket);
} catch (Throwable ce) {
throw new XPipeException(ce);
throw new RuntimeException(ce);
}
}

View File

@@ -0,0 +1,49 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataRaw;
import io.xpipe.core.source.DataSourceConfig;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceType;
import java.io.InputStream;
public class DataRawImpl extends DataSourceImpl implements DataRaw {
private final DataSourceInfo.Raw info;
public DataRawImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Raw info) {
super(sourceId, sourceConfig);
this.info = info;
}
@Override
public DataSourceInfo.Raw getInfo() {
return info;
}
@Override
public InputStream open() {
return null;
}
@Override
public byte[] readAll() {
return new byte[0];
}
@Override
public byte[] read(int maxBytes) {
return new byte[0];
}
@Override
public DataSourceType getType() {
return DataSourceType.RAW;
}
@Override
public DataRaw asRaw() {
return this;
}
}

View File

@@ -1,12 +1,12 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataSource;
import io.xpipe.api.XPipeApiConnector;
import io.xpipe.api.connector.XPipeApiConnector;
import io.xpipe.beacon.BeaconClient;
import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
import io.xpipe.beacon.ServerException;
import io.xpipe.beacon.exchange.InfoExchange;
import io.xpipe.beacon.exchange.QueryDataSourceExchange;
import io.xpipe.beacon.exchange.StoreResourceExchange;
import io.xpipe.beacon.exchange.StoreStreamExchange;
import io.xpipe.core.source.DataSourceConfig;
@@ -24,9 +24,26 @@ public abstract class DataSourceImpl implements DataSource {
new XPipeApiConnector() {
@Override
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = InfoExchange.Request.builder().ref(ds).build();
InfoExchange.Response res = performSimpleExchange(sc, req);
var req = QueryDataSourceExchange.Request.builder().ref(ds).build();
QueryDataSourceExchange.Response res = performSimpleExchange(sc, req);
switch (res.getInfo().getType()) {
case TABLE -> {
var data = res.getInfo().asTable();
source[0] = new DataTableImpl(res.getId(), res.getConfig().getConfig(), data);
}
case STRUCTURE -> {
var info = res.getInfo().asStructure();
source[0] = new DataStructureImpl(res.getId(), res.getConfig().getConfig(), info);
}
case TEXT -> {
var info = res.getInfo().asText();
source[0] = new DataTextImpl(res.getId(), res.getConfig().getConfig(), info);
}
case RAW -> {
var info = res.getInfo().asRaw();
source[0] = new DataRawImpl(res.getId(), res.getConfig().getConfig(), info);
}
}
}
}.execute();
return source[0];

View File

@@ -0,0 +1,38 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataStructure;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.source.DataSourceConfig;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceType;
public class DataStructureImpl extends DataSourceImpl implements DataStructure {
private final DataSourceInfo.Structure info;
public DataStructureImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Structure info) {
super(sourceId, sourceConfig);
this.info = info;
}
@Override
public DataSourceType getType() {
return DataSourceType.STRUCTURE;
}
@Override
public DataStructure asStructure() {
return this;
}
@Override
public DataSourceInfo.Structure getInfo() {
return info;
}
@Override
public DataStructureNode read() {
return null;
}
}

View File

@@ -0,0 +1,30 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataTable;
import io.xpipe.api.DataTableAccumulator;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.source.DataSourceId;
public class DataTableAccumulatorImpl implements DataTableAccumulator {
@Override
public DataTable finish(DataSourceId id) {
return null;
}
@Override
public void add(TupleNode row) {
}
@Override
public DataStructureNodeAcceptor<TupleNode> acceptor() {
return null;
}
@Override
public int getCurrentRows() {
return 0;
}
}

View File

@@ -1,7 +1,7 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataTable;
import io.xpipe.api.XPipeApiConnector;
import io.xpipe.api.connector.XPipeApiConnector;
import io.xpipe.beacon.BeaconClient;
import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
@@ -9,7 +9,6 @@ import io.xpipe.beacon.ServerException;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedAbstractReader;
import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader;
@@ -27,12 +26,10 @@ import java.util.stream.StreamSupport;
public class DataTableImpl extends DataSourceImpl implements DataTable {
private final DataSourceId id;
private final DataSourceInfo.Table info;
public DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info) {
DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info) {
super(id, sourceConfig);
this.id = id;
this.info = info;
}
@@ -41,40 +38,21 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
return this;
}
@Override
public DataSourceInfo.Table getInfo() {
return info;
}
public Stream<TupleNode> stream() {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED), false);
}
@Override
public DataSourceId getId() {
return id;
}
@Override
public DataSourceType getType() {
return DataSourceType.TABLE;
}
@Override
public int getRowCount() {
if (info.getRowCount() == -1) {
throw new UnsupportedOperationException("Row count is unknown");
}
return info.getRowCount();
}
@Override
public OptionalInt getRowCountIfPresent() {
return info.getRowCount() != -1 ? OptionalInt.of(info.getRowCount()) : OptionalInt.empty();
}
@Override
public TupleType getDataType() {
return info.getDataType();
}
@Override
public ArrayNode readAll() {
return read(Integer.MAX_VALUE);

View File

@@ -0,0 +1,60 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataText;
import io.xpipe.core.source.DataSourceConfig;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceType;
import java.util.Iterator;
import java.util.List;
public class DataTextImpl extends DataSourceImpl implements DataText {
private final DataSourceInfo.Text info;
public DataTextImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Text info) {
super(sourceId, sourceConfig);
this.info = info;
}
@Override
public DataSourceType getType() {
return DataSourceType.TEXT;
}
@Override
public DataText asText() {
return this;
}
@Override
public DataSourceInfo.Text getInfo() {
return info;
}
@Override
public List<String> readAllLines() {
return null;
}
@Override
public List<String> readLines(int maxLines) {
return null;
}
@Override
public String readAll() {
return null;
}
@Override
public String read(int maxCharacters) {
return null;
}
@Override
public Iterator<String> iterator() {
return null;
}
}

View File

@@ -3,4 +3,5 @@ module io.xpipe.api {
requires io.xpipe.beacon;
exports io.xpipe.api;
exports io.xpipe.api.connector;
}