mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-04-24 08:31:39 -04:00
Rework beacon connections and flesh out API more
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import io.xpipe.api.impl.DataSourceImpl;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@@ -67,7 +68,7 @@ public interface DataSource {
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a reference to the given data source.
|
||||
* Retrieves the data source for a given reference.
|
||||
*
|
||||
* @param ref the data source reference
|
||||
*/
|
||||
@@ -83,54 +84,75 @@ public interface DataSource {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
static DataSource wrap(InputStream in, String type, Map<String, String> configOptions) {
|
||||
return DataSourceImpl.wrap(in, type, configOptions);
|
||||
}
|
||||
|
||||
static DataSource wrap(InputStream in, String type) {
|
||||
return DataSourceImpl.wrap(in, type, Map.of());
|
||||
}
|
||||
|
||||
static DataSource wrap(InputStream in) {
|
||||
return DataSourceImpl.wrap(in, null, Map.of());
|
||||
/**
|
||||
* Wrapper for {@link #create(DataSourceId, String, Map, InputStream)} that creates an anonymous data source.
|
||||
*/
|
||||
public static DataSource createAnonymous(String type, Map<String,String> config, Path path) {
|
||||
return create(null, type, config, path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a reference to the given local data source that is specified by a URL.
|
||||
* Wrapper for {@link #create(DataSourceId, String, Map, InputStream)}.
|
||||
*/
|
||||
public static DataSource create(DataSourceId id, String type, Map<String,String> config, Path path) {
|
||||
try (var in = Files.newInputStream(path)) {
|
||||
return create(id, type, config, in);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataSourceId, String, Map, InputStream)} that creates an anonymous data source.
|
||||
*/
|
||||
public static DataSource createAnonymous(String type, Map<String,String> config, URL url) {
|
||||
return create(null, type, config, url);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataSourceId, String, Map, InputStream)}.
|
||||
*/
|
||||
public static DataSource create(DataSourceId id, String type, Map<String,String> config, URL url) {
|
||||
try (var in = url.openStream()) {
|
||||
return create(id, type, config, in);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataSourceId, String, Map, InputStream)} that creates an anonymous data source.
|
||||
*/
|
||||
public static DataSource createAnonymous(String type, Map<String,String> config, InputStream in) {
|
||||
return create(null, type, config, in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new data source from an input stream.
|
||||
*
|
||||
* This wrapped data source is only available temporarily and locally,
|
||||
* i.e. it is not added to the XPipe data source storage.
|
||||
*
|
||||
* @param url the url that points to the data
|
||||
* @param id the data source id
|
||||
* @param type the data source type
|
||||
* @param configOptions additional configuration options for the specific data source type
|
||||
* @return a reference to the data source that can be used to access the underlying data source
|
||||
* @param config additional configuration options for the specific data source type
|
||||
* @param in the input stream to read
|
||||
* @return a {@link DataSource} instances that can be used to access the underlying data
|
||||
*/
|
||||
static DataSource wrap(URL url, String type, Map<String, String> configOptions) {
|
||||
return DataSourceImpl.wrap(url, type, configOptions);
|
||||
public static DataSource create(DataSourceId id, String type, Map<String,String> config, InputStream in) {
|
||||
return DataSourceImpl.create(id, type, config, in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #wrap(URL, String, Map)} that passes no configuration options.
|
||||
* As a result, the data source configuration is automatically determined by X-Pipe for the given type.
|
||||
* Returns the id of this data source.
|
||||
*/
|
||||
static DataSource wrap(URL url, String type) {
|
||||
return wrap(url, type, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #wrap(URL, String, Map)} that passes no type and no configuration options.
|
||||
* As a result, the data source type and configuration is automatically determined by X-Pipe.
|
||||
*/
|
||||
static DataSource wrap(URL url) {
|
||||
return wrap(url, null, Map.of());
|
||||
}
|
||||
|
||||
DataSourceId getId();
|
||||
|
||||
/**
|
||||
* Returns the type of this data source.
|
||||
*/
|
||||
DataSourceType getType();
|
||||
|
||||
DataSourceConfig getConfig();
|
||||
|
||||
DataSourceConfigInstance getConfig();
|
||||
|
||||
/**
|
||||
* Attempts to cast this object to a {@link DataTable}.
|
||||
|
||||
@@ -20,9 +20,20 @@ public interface DataTableAccumulator {
|
||||
*/
|
||||
DataTable finish(DataSourceId id);
|
||||
|
||||
/**
|
||||
* Adds a row to the table.
|
||||
*
|
||||
* @param row the row to add
|
||||
*/
|
||||
void add(TupleNode row);
|
||||
|
||||
/**
|
||||
* Creates a tuple acceptor that adds all accepted tuples to the table.
|
||||
*/
|
||||
DataStructureNodeAcceptor<TupleNode> acceptor();
|
||||
|
||||
/**
|
||||
* Returns the current amount of rows added to the table.
|
||||
*/
|
||||
int getCurrentRows();
|
||||
}
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
package io.xpipe.api.connector;
|
||||
|
||||
import io.xpipe.beacon.*;
|
||||
import io.xpipe.core.util.JacksonHelper;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class XPipeApiConnector extends BeaconConnector {
|
||||
|
||||
public void execute() {
|
||||
try {
|
||||
var socket = constructSocket();
|
||||
handle(socket);
|
||||
} catch (Throwable ce) {
|
||||
throw new RuntimeException(ce);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void handle(BeaconClient sc) throws Exception;
|
||||
|
||||
@Override
|
||||
protected BeaconClient constructSocket() throws ConnectorException {
|
||||
if (!JacksonHelper.isInit()) {
|
||||
JacksonHelper.initModularized(ModuleLayer.boot());
|
||||
}
|
||||
|
||||
if (!BeaconServer.isRunning()) {
|
||||
try {
|
||||
start();
|
||||
} catch (Exception ex) {
|
||||
throw new ConnectorException("Unable to start xpipe daemon", ex);
|
||||
}
|
||||
|
||||
var r = waitForStartup();
|
||||
if (r.isEmpty()) {
|
||||
throw new ConnectorException("Wait for xpipe daemon timed out");
|
||||
} else {
|
||||
return r.get();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return new BeaconClient();
|
||||
} catch (Exception ex) {
|
||||
throw new ConnectorException("Unable to connect to running xpipe daemon", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void start() throws Exception {
|
||||
if (!BeaconServer.tryStart()) {
|
||||
throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command");
|
||||
};
|
||||
}
|
||||
|
||||
private Optional<BeaconClient> waitForStartup() {
|
||||
for (int i = 0; i < 40; i++) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
var s = BeaconClient.tryConnect();
|
||||
if (s.isPresent()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public static interface Handler {
|
||||
|
||||
void handle(BeaconClient sc) throws ClientException, ServerException;
|
||||
}
|
||||
}
|
||||
112
api/src/main/java/io/xpipe/api/connector/XPipeConnection.java
Normal file
112
api/src/main/java/io/xpipe/api/connector/XPipeConnection.java
Normal file
@@ -0,0 +1,112 @@
|
||||
package io.xpipe.api.connector;
|
||||
|
||||
import io.xpipe.beacon.*;
|
||||
import io.xpipe.core.util.JacksonHelper;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public final class XPipeConnection extends BeaconConnection {
|
||||
|
||||
public static XPipeConnection open() {
|
||||
var con = new XPipeConnection();
|
||||
con.constructSocket();
|
||||
return con;
|
||||
}
|
||||
|
||||
public static void execute(Handler handler) {
|
||||
try (var con = new XPipeConnection()) {
|
||||
con.constructSocket();
|
||||
handler.handle(con);
|
||||
} catch (Exception e) {
|
||||
throw new BeaconException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T execute(Mapper<T> mapper) {
|
||||
try (var con = new XPipeConnection()) {
|
||||
con.constructSocket();
|
||||
return mapper.handle(con);
|
||||
} catch (Exception e) {
|
||||
throw new BeaconException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private XPipeConnection() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void constructSocket() {
|
||||
if (!JacksonHelper.isInit()) {
|
||||
JacksonHelper.initModularized(ModuleLayer.boot());
|
||||
}
|
||||
|
||||
if (!BeaconServer.isRunning()) {
|
||||
try {
|
||||
start();
|
||||
} catch (Exception ex) {
|
||||
throw new BeaconException("Unable to start xpipe daemon", ex);
|
||||
}
|
||||
|
||||
var r = waitForStartup();
|
||||
if (r.isEmpty()) {
|
||||
throw new BeaconException("Wait for xpipe daemon timed out");
|
||||
} else {
|
||||
socket = r.get();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
socket = new BeaconClient();
|
||||
} catch (Exception ex) {
|
||||
throw new BeaconException("Unable to connect to running xpipe daemon", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void start() throws Exception {
|
||||
if (!BeaconServer.tryStart()) {
|
||||
throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command");
|
||||
};
|
||||
}
|
||||
|
||||
public static Optional<BeaconClient> waitForStartup() {
|
||||
for (int i = 0; i < 40; i++) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
var s = BeaconClient.tryConnect();
|
||||
if (s.isPresent()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static void waitForShutdown() {
|
||||
for (int i = 0; i < 40; i++) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
var r = BeaconServer.isRunning();
|
||||
if (!r) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public static interface Handler {
|
||||
|
||||
void handle(BeaconConnection con) throws ClientException, ServerException, ConnectorException;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public static interface Mapper<T> {
|
||||
|
||||
T handle(BeaconConnection con) throws ClientException, ServerException, ConnectorException;
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,7 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataRaw;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceInfo;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
@@ -12,7 +9,7 @@ public class DataRawImpl extends DataSourceImpl implements DataRaw {
|
||||
|
||||
private final DataSourceInfo.Raw info;
|
||||
|
||||
public DataRawImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Raw info) {
|
||||
public DataRawImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Raw info) {
|
||||
super(sourceId, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@@ -1,101 +1,81 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSource;
|
||||
import io.xpipe.api.connector.XPipeApiConnector;
|
||||
import io.xpipe.beacon.BeaconClient;
|
||||
import io.xpipe.beacon.ClientException;
|
||||
import io.xpipe.beacon.ConnectorException;
|
||||
import io.xpipe.beacon.ServerException;
|
||||
import io.xpipe.api.connector.XPipeConnection;
|
||||
import io.xpipe.beacon.exchange.PreStoreExchange;
|
||||
import io.xpipe.beacon.exchange.QueryDataSourceExchange;
|
||||
import io.xpipe.beacon.exchange.StoreResourceExchange;
|
||||
import io.xpipe.beacon.exchange.StoreStreamExchange;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.beacon.exchange.ReadExecuteExchange;
|
||||
import io.xpipe.beacon.exchange.ReadPreparationExchange;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class DataSourceImpl implements DataSource {
|
||||
|
||||
public static DataSource get(DataSourceReference ds) {
|
||||
final DataSource[] source = new DataSource[1];
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
var req = QueryDataSourceExchange.Request.builder().ref(ds).build();
|
||||
QueryDataSourceExchange.Response res = performSimpleExchange(sc, req);
|
||||
switch (res.getInfo().getType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getInfo().asTable();
|
||||
source[0] = new DataTableImpl(res.getId(), res.getConfig().getConfig(), data);
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
var info = res.getInfo().asStructure();
|
||||
source[0] = new DataStructureImpl(res.getId(), res.getConfig().getConfig(), info);
|
||||
}
|
||||
case TEXT -> {
|
||||
var info = res.getInfo().asText();
|
||||
source[0] = new DataTextImpl(res.getId(), res.getConfig().getConfig(), info);
|
||||
}
|
||||
case RAW -> {
|
||||
var info = res.getInfo().asRaw();
|
||||
source[0] = new DataRawImpl(res.getId(), res.getConfig().getConfig(), info);
|
||||
}
|
||||
return XPipeConnection.execute(con -> {
|
||||
var req = QueryDataSourceExchange.Request.builder().ref(ds).build();
|
||||
QueryDataSourceExchange.Response res = con.performSimpleExchange(req);
|
||||
switch (res.getInfo().getType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getInfo().asTable();
|
||||
return new DataTableImpl(res.getId(), res.getConfig(), data);
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
var info = res.getInfo().asStructure();
|
||||
return new DataStructureImpl(res.getId(), res.getConfig(), info);
|
||||
}
|
||||
case TEXT -> {
|
||||
var info = res.getInfo().asText();
|
||||
return new DataTextImpl(res.getId(), res.getConfig(), info);
|
||||
}
|
||||
case RAW -> {
|
||||
var info = res.getInfo().asRaw();
|
||||
return new DataRawImpl(res.getId(), res.getConfig(), info);
|
||||
}
|
||||
}
|
||||
}.execute();
|
||||
return source[0];
|
||||
throw new AssertionError();
|
||||
});
|
||||
}
|
||||
|
||||
public static DataSource wrap(URL url, String type, Map<String,String> config) {
|
||||
final DataSource[] source = new DataSource[1];
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
var req = StoreResourceExchange.Request.builder()
|
||||
.url(url).providerId(type).build();
|
||||
StoreResourceExchange.Response res = performOutputExchange(sc, req, out -> {
|
||||
try (var s = url.openStream()) {
|
||||
writeLength(sc, s.available());
|
||||
s.transferTo(out);
|
||||
}
|
||||
});
|
||||
switch (res.getInfo().getType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getInfo().asTable();
|
||||
source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data);
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
}
|
||||
case RAW -> {
|
||||
}
|
||||
}
|
||||
}
|
||||
}.execute();
|
||||
return source[0];
|
||||
}
|
||||
public static DataSource create(DataSourceId id, String type, Map<String,String> config, InputStream in) {
|
||||
var res = XPipeConnection.execute(con -> {
|
||||
var req = PreStoreExchange.Request.builder().build();
|
||||
PreStoreExchange.Response r = con.performOutputExchange(req, in::transferTo);
|
||||
return r;
|
||||
});
|
||||
|
||||
public static DataSource wrap(InputStream in, String type, Map<String,String> config) {
|
||||
final DataSource[] source = new DataSource[1];
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
var req = StoreStreamExchange.Request.builder().type(type).build();
|
||||
StoreStreamExchange.Response res = performOutputExchange(sc, req, in::transferTo);
|
||||
var store = res.getStore();
|
||||
|
||||
}
|
||||
}.execute();
|
||||
return source[0];
|
||||
var startReq = ReadPreparationExchange.Request.builder()
|
||||
.provider(type)
|
||||
.store(store)
|
||||
.build();
|
||||
var startRes = XPipeConnection.execute(con -> {
|
||||
ReadPreparationExchange.Response r = con.performSimpleExchange(startReq);
|
||||
return r;
|
||||
});
|
||||
|
||||
var configInstance = startRes.getConfig();
|
||||
configInstance.getCurrentValues().putAll(config);
|
||||
var endReq = ReadExecuteExchange.Request.builder()
|
||||
.target(id).dataStore(store).config(configInstance).build();
|
||||
XPipeConnection.execute(con -> {
|
||||
con.performSimpleExchange(endReq);
|
||||
});
|
||||
var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest();
|
||||
return get(ref);
|
||||
}
|
||||
|
||||
private final DataSourceId sourceId;
|
||||
private final DataSourceConfig sourceConfig;
|
||||
private final DataSourceConfigInstance config;
|
||||
|
||||
public DataSourceImpl(DataSourceId sourceId, DataSourceConfig sourceConfig) {
|
||||
public DataSourceImpl(DataSourceId sourceId, DataSourceConfigInstance config) {
|
||||
this.sourceId = sourceId;
|
||||
this.sourceConfig = sourceConfig;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -104,7 +84,7 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceConfig getConfig() {
|
||||
return sourceConfig;
|
||||
public DataSourceConfigInstance getConfig() {
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,16 +2,13 @@ package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataStructure;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceInfo;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
public class DataStructureImpl extends DataSourceImpl implements DataStructure {
|
||||
|
||||
private final DataSourceInfo.Structure info;
|
||||
|
||||
public DataStructureImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Structure info) {
|
||||
public DataStructureImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Structure info) {
|
||||
super(sourceId, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@@ -1,30 +1,63 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSource;
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.DataTableAccumulator;
|
||||
import io.xpipe.api.connector.XPipeConnection;
|
||||
import io.xpipe.beacon.exchange.PreStoreExchange;
|
||||
import io.xpipe.beacon.exchange.ReadExecuteExchange;
|
||||
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.data.typed.TypedDataStreamWriter;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
|
||||
public class DataTableAccumulatorImpl implements DataTableAccumulator {
|
||||
|
||||
@Override
|
||||
public DataTable finish(DataSourceId id) {
|
||||
return null;
|
||||
private final XPipeConnection connection;
|
||||
private final TupleType type;
|
||||
private int rows;
|
||||
|
||||
public DataTableAccumulatorImpl(TupleType type) {
|
||||
this.type = type;
|
||||
connection = XPipeConnection.open();
|
||||
connection.sendRequest(PreStoreExchange.Request.builder().build());
|
||||
connection.sendBodyStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(TupleNode row) {
|
||||
public synchronized DataTable finish(DataSourceId id) {
|
||||
PreStoreExchange.Response res = connection.receiveResponse();
|
||||
connection.close();
|
||||
|
||||
var req = ReadExecuteExchange.Request.builder()
|
||||
.target(id).dataStore(res.getStore()).config(DataSourceConfigInstance.xpbt()).build();
|
||||
XPipeConnection.execute(con -> {
|
||||
con.performSimpleExchange(req);
|
||||
});
|
||||
return DataSource.get(DataSourceReference.id(id)).asTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStructureNodeAcceptor<TupleNode> acceptor() {
|
||||
return null;
|
||||
public synchronized void add(TupleNode row) {
|
||||
connection.withOutputStream(out -> {
|
||||
TypedDataStreamWriter.writeStructure(connection.getOutputStream(), row, type);
|
||||
rows++;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCurrentRows() {
|
||||
return 0;
|
||||
public synchronized DataStructureNodeAcceptor<TupleNode> acceptor() {
|
||||
return node -> {
|
||||
add(node);
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getCurrentRows() {
|
||||
return rows;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,26 +1,22 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.connector.XPipeApiConnector;
|
||||
import io.xpipe.beacon.BeaconClient;
|
||||
import io.xpipe.beacon.ClientException;
|
||||
import io.xpipe.beacon.ConnectorException;
|
||||
import io.xpipe.beacon.ServerException;
|
||||
import io.xpipe.api.connector.XPipeConnection;
|
||||
import io.xpipe.beacon.BeaconConnection;
|
||||
import io.xpipe.beacon.exchange.api.QueryTableDataExchange;
|
||||
import io.xpipe.core.data.node.ArrayNode;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
import io.xpipe.core.data.typed.TypedAbstractReader;
|
||||
import io.xpipe.core.data.typed.TypedDataStreamParser;
|
||||
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
|
||||
import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceInfo;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
@@ -28,7 +24,7 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
|
||||
private final DataSourceInfo.Table info;
|
||||
|
||||
DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info) {
|
||||
DataTableImpl(DataSourceId id, DataSourceConfigInstance sourceConfig, DataSourceInfo.Table info) {
|
||||
super(id, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
@@ -60,20 +56,15 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
|
||||
@Override
|
||||
public ArrayNode read(int maxRows) {
|
||||
int maxToRead = info.getRowCount() == -1 ? maxRows : Math.min(info.getRowCount(), maxRows);
|
||||
|
||||
List<DataStructureNode> nodes = new ArrayList<>();
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
// var req = ReadTableDataExchange.Request.builder()
|
||||
// .sourceId(id).maxRows(maxToRead).build();
|
||||
// performInputExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
|
||||
// var r = new TypedDataStreamParser(info.getDataType());
|
||||
// r.parseStructures(in, TypedDataStructureNodeReader.immutable(info.getDataType()), nodes::add);
|
||||
// });
|
||||
}
|
||||
}.execute();
|
||||
XPipeConnection.execute(con -> {
|
||||
var req = QueryTableDataExchange.Request.builder()
|
||||
.id(getId()).maxRows(maxRows).build();
|
||||
con.performInputExchange(req, (QueryTableDataExchange.Response res, InputStream in) -> {
|
||||
var r = new TypedDataStreamParser(info.getDataType());
|
||||
r.parseStructures(in, TypedDataStructureNodeReader.immutable(info.getDataType()), nodes::add);
|
||||
});
|
||||
});
|
||||
return ArrayNode.of(nodes);
|
||||
}
|
||||
|
||||
@@ -81,74 +72,49 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
public Iterator<TupleNode> iterator() {
|
||||
return new Iterator<>() {
|
||||
|
||||
private InputStream input;
|
||||
private int read;
|
||||
private final int toRead = info.getRowCount();
|
||||
private final BeaconConnection connection;
|
||||
private final TypedDataStreamParser parser;
|
||||
private final TypedAbstractReader nodeReader;
|
||||
|
||||
{
|
||||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
// var req = ReadTableDataExchange.Request.builder()
|
||||
// .sourceId(id).maxRows(Integer.MAX_VALUE).build();
|
||||
// performInputExchange(sc, req,
|
||||
// (ReadTableDataExchange.Response res, InputStream in) -> {
|
||||
// input = in;
|
||||
// });
|
||||
}
|
||||
}.execute();
|
||||
|
||||
nodeReader = TypedReusableDataStructureNodeReader.create(info.getDataType());
|
||||
parser = new TypedDataStreamParser(info.getDataType());
|
||||
|
||||
connection = XPipeConnection.open();
|
||||
var req = QueryTableDataExchange.Request.builder()
|
||||
.id(getId()).build();
|
||||
connection.sendRequest(req);
|
||||
connection.receiveResponse();
|
||||
connection.receiveBody();
|
||||
}
|
||||
|
||||
private void finish() {
|
||||
try {
|
||||
input.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean hasKnownSize() {
|
||||
return info.getRowCount() != -1;
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (hasKnownSize() && read == toRead) {
|
||||
finish();
|
||||
return false;
|
||||
}
|
||||
connection.checkClosed();
|
||||
|
||||
if (hasKnownSize() && read < toRead) {
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
var hasNext = parser.hasNext(input);
|
||||
if (!hasNext) {
|
||||
finish();
|
||||
}
|
||||
return hasNext;
|
||||
} catch (IOException ex) {
|
||||
AtomicBoolean hasNext = new AtomicBoolean(false);
|
||||
connection.withInputStream(in -> {
|
||||
hasNext.set(parser.hasNext(in));
|
||||
});
|
||||
if (!hasNext.get()) {
|
||||
finish();
|
||||
throw new UncheckedIOException(ex);
|
||||
}
|
||||
return hasNext.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TupleNode next() {
|
||||
TupleNode current;
|
||||
try {
|
||||
current = (TupleNode) parser.parseStructure(input, nodeReader);
|
||||
} catch (IOException ex) {
|
||||
throw new UncheckedIOException(ex);
|
||||
}
|
||||
read++;
|
||||
return current;
|
||||
connection.checkClosed();
|
||||
|
||||
AtomicReference<TupleNode> current = new AtomicReference<>();
|
||||
connection.withInputStream(in -> {
|
||||
current.set((TupleNode) parser.parseStructure(connection.getInputStream(), nodeReader));
|
||||
});
|
||||
return current.get();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataText;
|
||||
import io.xpipe.core.source.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceInfo;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -13,7 +10,7 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
|
||||
|
||||
private final DataSourceInfo.Text info;
|
||||
|
||||
public DataTextImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Text info) {
|
||||
public DataTextImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Text info) {
|
||||
super(sourceId, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user