Refactor various components

This commit is contained in:
Christopher Schnick
2022-01-01 20:11:13 +01:00
parent b54d8ad362
commit d63882c5ff
45 changed files with 771 additions and 190 deletions

View File

@@ -0,0 +1,70 @@
package io.xpipe.api;
import io.xpipe.api.impl.DataSourceImpl;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceType;
import java.net.URL;
import java.util.Map;
/**
* Represents a reference to an XPipe data source.
*
* The actual data is only queried when required and is not cached.
* Therefore, the queried data is always up-to-date at the point of calling a method that queries the data.
*/
public interface DataSource {
/**
* Wrapper for {@link #get(DataSourceId)}.
*
* @throws IllegalArgumentException if {@code id} is not a valid data source id
*/
static DataSource get(String id) {
return get(DataSourceId.fromString(id));
}
/**
* Retrieves a reference to the given data source.
*
* @param id the data source id
* @return a reference to the data source that can be used to access the underlying data source
*/
static DataSource get(DataSourceId id) {
return DataSourceImpl.get(id);
}
/**
* Retrieves a reference to the given local data source that is specified by a URL.
*
* This wrapped data source is only available temporarily and locally,
* i.e. it is not added to the XPipe data source storage.
*
* @param url the url that points to the data
* @param configOptions additional configuration options for the specific data source type
* @return a reference to the data source that can be used to access the underlying data source
*/
static DataSource wrap(URL url, Map<String, String> configOptions) {
return null;
}
/**
* Wrapper for {@link #wrap(URL, Map)} that passes no configuration options.
*/
static DataSource wrap(URL url) {
return wrap(url, Map.of());
}
DataSourceId getId();
DataSourceType getType();
/**
* Attemps to cast this object to a {@link DataTable}.
*
* @throws UnsupportedOperationException if the data source is not a table
*/
default DataTable asTable() {
throw new UnsupportedOperationException("Data source is not a table");
}
}

View File

@@ -1,20 +1,15 @@
package io.xpipe.api;
import io.xpipe.api.impl.DataTableImpl;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.source.DataSourceId;
import java.util.OptionalInt;
import java.util.stream.Stream;
public interface DataTable extends Iterable<TupleNode> {
public interface DataTable extends Iterable<TupleNode>, DataSource {
static DataTable get(String s) {
return DataTableImpl.get(s);
}
DataSourceId getId();
Stream<TupleNode> stream();
int getRowCount();

View File

@@ -1,7 +1,8 @@
package io.xpipe.api;
import io.xpipe.beacon.*;
import io.xpipe.beacon.BeaconClient;
import java.util.Optional;
public abstract class XPipeApiConnector extends BeaconConnector {
@@ -23,12 +24,48 @@ public abstract class XPipeApiConnector extends BeaconConnector {
protected abstract void handle(BeaconClient sc) throws Exception;
@Override
protected void waitForStartup() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
protected BeaconClient constructSocket() throws ConnectorException {
if (!BeaconServer.isRunning()) {
try {
start();
} catch (Exception ex) {
throw new ConnectorException("Unable to start xpipe daemon", ex);
}
var r = waitForStartup();
if (r.isEmpty()) {
throw new ConnectorException("Wait for xpipe daemon timed out");
} else {
return r.get();
}
}
try {
return new BeaconClient();
} catch (Exception ex) {
throw new ConnectorException("Unable to connect to running xpipe daemon", ex);
}
}
private void start() throws Exception {
if (!BeaconServer.tryStart()) {
throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command");
};
}
private Optional<BeaconClient> waitForStartup() {
for (int i = 0; i < 40; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
var s = BeaconClient.tryConnect();
if (s.isPresent()) {
return s;
}
}
return Optional.empty();
}
@FunctionalInterface

View File

@@ -0,0 +1,47 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataSource;
import io.xpipe.api.DataTable;
import io.xpipe.api.XPipeApiConnector;
import io.xpipe.beacon.BeaconClient;
import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
import io.xpipe.beacon.ServerException;
import io.xpipe.beacon.exchange.ReadInfoExchange;
import io.xpipe.core.source.DataSourceId;
public abstract class DataSourceImpl implements DataSource {
public static DataSource get(DataSourceId ds) {
final DataSource[] source = new DataSource[1];
new XPipeApiConnector() {
@Override
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = ReadInfoExchange.Request.builder().sourceId(ds).build();
ReadInfoExchange.Response res = performSimpleExchange(sc, req);
switch (res.getType()) {
case TABLE -> {
var data = res.getTableData();
source[0] = new DataTableImpl(res.getSourceId(), data.getRowCount(), data.getDataType());
}
case STRUCTURE -> {
}
case RAW -> {
}
}
}
}.execute();
return source[0];
}
private final DataSourceId sourceId;
public DataSourceImpl(DataSourceId sourceId) {
this.sourceId = sourceId;
}
@Override
public DataSourceId getId() {
return sourceId;
}
}

View File

@@ -7,9 +7,8 @@ import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
import io.xpipe.beacon.ServerException;
import io.xpipe.beacon.exchange.ReadTableDataExchange;
import io.xpipe.beacon.exchange.ReadTableInfoExchange;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.typed.TypedAbstractReader;
@@ -17,6 +16,7 @@ import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader;
import io.xpipe.core.source.DataSourceId;
import io.xpipe.core.source.DataSourceType;
import java.io.IOException;
import java.io.InputStream;
@@ -25,30 +25,14 @@ import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class DataTableImpl implements DataTable {
public static DataTable get(String s) {
return get(DataSourceId.fromString(s));
}
public static DataTable get(DataSourceId ds) {
final DataTable[] table = {null};
new XPipeApiConnector() {
@Override
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableInfoExchange.Request(ds);
ReadTableInfoExchange.Response res = performSimpleExchange(sc, req);
table[0] = new DataTableImpl(res.sourceId(), res.rowCount(), res.dataType());
}
}.execute();
return table[0];
}
public class DataTableImpl extends DataSourceImpl implements DataTable {
private final DataSourceId id;
private final int size;
private final DataType dataType;
public DataTableImpl(DataSourceId id, int size, DataType dataType) {
super(id);
this.id = id;
this.size = size;
this.dataType = dataType;
@@ -64,6 +48,11 @@ public class DataTableImpl implements DataTable {
return id;
}
@Override
public DataSourceType getType() {
return DataSourceType.TABLE;
}
@Override
public int getRowCount() {
if (size == -1) {
@@ -96,11 +85,12 @@ public class DataTableImpl implements DataTable {
new XPipeApiConnector() {
@Override
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, maxToRead);
performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
var req = ReadTableDataExchange.Request.builder()
.sourceId(id).maxRows(maxToRead).build();
performInputExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
var r = new TypedDataStreamParser(dataType);
r.parseStructures(in, TypedDataStructureNodeReader.immutable(dataType), nodes::add);
}, false);
});
}
}.execute();
return ArrayNode.of(nodes);
@@ -120,9 +110,10 @@ public class DataTableImpl implements DataTable {
new XPipeApiConnector() {
@Override
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, Integer.MAX_VALUE);
performExchange(sc, req,
(ReadTableDataExchange.Response res, InputStream in) -> input = in, false);
var req = ReadTableDataExchange.Request.builder()
.sourceId(id).maxRows(Integer.MAX_VALUE).build();
performInputExchange(sc, req,
(ReadTableDataExchange.Response res, InputStream in) -> input = in);
}
}.execute();

View File

@@ -1,7 +1,6 @@
module io.xpipe.api {
requires io.xpipe.core;
requires io.xpipe.beacon;
requires org.apache.commons.lang;
requires transitive io.xpipe.core;
requires transitive io.xpipe.beacon;
exports io.xpipe.api;
}