mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-04-23 08:00:56 -04:00
Refactor
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import io.xpipe.api.impl.DataSourceImpl;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -34,6 +36,18 @@ public interface DataSource {
|
||||
return DataSourceImpl.get(id);
|
||||
}
|
||||
|
||||
static DataSource wrap(InputStream in, String type, Map<String, String> configOptions) {
|
||||
return DataSourceImpl.wrap(in, type, configOptions);
|
||||
}
|
||||
|
||||
static DataSource wrap(InputStream in, String type) {
|
||||
return DataSourceImpl.wrap(in, type, Map.of());
|
||||
}
|
||||
|
||||
static DataSource wrap(InputStream in) {
|
||||
return DataSourceImpl.wrap(in, null, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a reference to the given local data source that is specified by a URL.
|
||||
*
|
||||
@@ -41,26 +55,38 @@ public interface DataSource {
|
||||
* i.e. it is not added to the XPipe data source storage.
|
||||
*
|
||||
* @param url the url that points to the data
|
||||
* @param type the data source type
|
||||
* @param configOptions additional configuration options for the specific data source type
|
||||
* @return a reference to the data source that can be used to access the underlying data source
|
||||
*/
|
||||
static DataSource wrap(URL url, Map<String, String> configOptions) {
|
||||
return null;
|
||||
static DataSource wrap(URL url, String type, Map<String, String> configOptions) {
|
||||
return DataSourceImpl.wrap(url, type, configOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #wrap(URL, Map)} that passes no configuration options.
|
||||
* Wrapper for {@link #wrap(URL, String, Map)} that passes no configuration options.
|
||||
* As a result, the data source configuration is automatically determined by X-Pipe for the given type.
|
||||
*/
|
||||
static DataSource wrap(URL url, String type) {
|
||||
return wrap(url, type, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #wrap(URL, String, Map)} that passes no type and no configuration options.
|
||||
* As a result, the data source type and configuration is automatically determined by X-Pipe.
|
||||
*/
|
||||
static DataSource wrap(URL url) {
|
||||
return wrap(url, Map.of());
|
||||
return wrap(url, null, Map.of());
|
||||
}
|
||||
|
||||
DataSourceId getId();
|
||||
|
||||
DataSourceType getType();
|
||||
|
||||
DataSourceConfig getConfig();
|
||||
|
||||
/**
|
||||
* Attemps to cast this object to a {@link DataTable}.
|
||||
* Attempts to cast this object to a {@link DataTable}.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the data source is not a table
|
||||
*/
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import java.util.function.IntConsumer;
|
||||
|
||||
public class IntConverter {
|
||||
|
||||
private IntConsumer consumer;
|
||||
|
||||
public IntConverter(IntConsumer consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public void onValue(byte[] value) {
|
||||
if (value.length > 4) {
|
||||
throw new IllegalArgumentException("Unable to fit " + value.length + " bytes into an integer");
|
||||
}
|
||||
|
||||
int v = value[0] << 24 | (value[1] & 0xFF) << 16 | (value[2] & 0xFF) << 8 | (value[3] & 0xFF);
|
||||
consumer.accept(v);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,18 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSource;
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.XPipeApiConnector;
|
||||
import io.xpipe.beacon.BeaconClient;
|
||||
import io.xpipe.beacon.ClientException;
|
||||
import io.xpipe.beacon.ConnectorException;
|
||||
import io.xpipe.beacon.ServerException;
|
||||
import io.xpipe.beacon.exchange.ReadInfoExchange;
|
||||
import io.xpipe.beacon.exchange.StoreResourceExchange;
|
||||
import io.xpipe.beacon.exchange.StoreStreamExchange;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -25,7 +28,7 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
switch (res.getType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getTableData();
|
||||
source[0] = new DataTableImpl(res.getSourceId(), data.getRowCount(), data.getDataType());
|
||||
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data.getRowCount(), data.getDataType());
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
}
|
||||
@@ -42,12 +45,35 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
var req = ReadInfoExchange.Request.builder().sourceId(ds).build();
|
||||
ReadInfoExchange.Response res = performSimpleExchange(sc, req);
|
||||
switch (res.getType()) {
|
||||
var req = StoreResourceExchange.Request.builder()
|
||||
.url(url).type(type).build();
|
||||
StoreResourceExchange.Response res = performSimpleExchange(sc, req);
|
||||
switch (res.getSourceType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getTableData();
|
||||
source[0] = new DataTableImpl(res.getSourceId(), data.getRowCount(), data.getDataType());
|
||||
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data.getRowCount(), data.getDataType());
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
}
|
||||
case RAW -> {
|
||||
}
|
||||
}
|
||||
}
|
||||
}.execute();
|
||||
return source[0];
|
||||
}
|
||||
|
||||
public static DataSource wrap(InputStream in, String type, Map<String,String> config) {
|
||||
final DataSource[] source = new DataSource[1];
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
var req = StoreStreamExchange.Request.builder().type(type).build();
|
||||
StoreStreamExchange.Response res = performOutputExchange(sc, req, in::transferTo);
|
||||
switch (res.getSourceType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getTableData();
|
||||
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data.getRowCount(), data.getDataType());
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
}
|
||||
@@ -60,13 +86,20 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
}
|
||||
|
||||
private final DataSourceId sourceId;
|
||||
private final DataSourceConfig sourceConfig;
|
||||
|
||||
public DataSourceImpl(DataSourceId sourceId) {
|
||||
public DataSourceImpl(DataSourceId sourceId, DataSourceConfig sourceConfig) {
|
||||
this.sourceId = sourceId;
|
||||
this.sourceConfig = sourceConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceId getId() {
|
||||
return sourceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceConfig getConfig() {
|
||||
return sourceConfig;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import io.xpipe.core.data.typed.TypedAbstractReader;
|
||||
import io.xpipe.core.data.typed.TypedDataStreamParser;
|
||||
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
|
||||
import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
@@ -31,8 +32,8 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
private final int size;
|
||||
private final DataType dataType;
|
||||
|
||||
public DataTableImpl(DataSourceId id, int size, DataType dataType) {
|
||||
super(id);
|
||||
public DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, int size, DataType dataType) {
|
||||
super(id, sourceConfig);
|
||||
this.id = id;
|
||||
this.size = size;
|
||||
this.dataType = dataType;
|
||||
|
||||
Reference in New Issue
Block a user