mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-04-26 17:39:37 -04:00
Merge branch 1.7 into master
This commit is contained in:
@@ -1,12 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
public interface DataRaw extends DataSource {
|
||||
|
||||
InputStream open();
|
||||
|
||||
byte[] readAll();
|
||||
|
||||
byte[] read(int maxBytes);
|
||||
}
|
||||
@@ -1,229 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import io.xpipe.api.impl.DataSourceImpl;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
import io.xpipe.core.store.DataStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
/**
|
||||
* Represents a reference to a data source that is managed by XPipe.
|
||||
* <p>
|
||||
* The actual data is only queried when required and is not cached.
|
||||
* Therefore, the queried data is always up-to-date at the point of calling a method that queries the data.
|
||||
* <p>
|
||||
* As soon a data source reference is created, the data source is locked
|
||||
* within XPipe to prevent concurrent modification and the problems that can arise from it.
|
||||
* By default, the lock is held until the calling program terminates and prevents
|
||||
* other applications from modifying the data source in any way.
|
||||
* To unlock the data source earlier, you can make use the {@link #unlock()} method.
|
||||
*/
|
||||
public interface DataSource {
|
||||
|
||||
/**
|
||||
* NOT YET IMPLEMENTED!
|
||||
* <p>
|
||||
* Creates a new supplier data source that will be interpreted as the generated data source.
|
||||
* In case this program should be a data source generator, this method has to be called at
|
||||
* least once to register that it actually generates a data source.
|
||||
* <p>
|
||||
* All content that is written to this data source until the generator program terminates is
|
||||
* will be available later on when the data source is used as a supplier later on.
|
||||
* <p>
|
||||
* In case this method is called multiple times, the same data source is returned.
|
||||
*
|
||||
* @return the generator data source
|
||||
*/
|
||||
static DataSource drain() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* NOT YET IMPLEMENTED!
|
||||
* <p>
|
||||
* Creates a data source sink that will block with any read operations
|
||||
* until an external data producer routes the output into this sink.
|
||||
*/
|
||||
static DataSource sink() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #get(DataSourceReference)}.
|
||||
*
|
||||
* @throws IllegalArgumentException if {@code id} is not a valid data source id
|
||||
*/
|
||||
static DataSource getById(String id) {
|
||||
return get(DataSourceReference.id(id));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #get(DataSourceReference)} using the latest reference.
|
||||
*/
|
||||
static DataSource getLatest() {
|
||||
return get(DataSourceReference.latest());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #get(DataSourceReference)} using a name reference.
|
||||
*/
|
||||
static DataSource getByName(String name) {
|
||||
return get(DataSourceReference.name(name));
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the data source for a given reference.
|
||||
*
|
||||
* @param ref the data source reference
|
||||
*/
|
||||
static DataSource get(DataSourceReference ref) {
|
||||
return DataSourceImpl.get(ref);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the lock held by this program for this data source such
|
||||
* that other applications can modify the data source again.
|
||||
*/
|
||||
static void unlock() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataStoreId, String, InputStream)} that creates an anonymous data source.
|
||||
*/
|
||||
static DataSource createAnonymous(String type, Path path) {
|
||||
return create(null, type, path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataStoreId, String, InputStream)}.
|
||||
*/
|
||||
static DataSource create(DataStoreId id, String type, Path path) {
|
||||
try (var in = Files.newInputStream(path)) {
|
||||
return create(id, type, in);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataStoreId, String, InputStream)} that creates an anonymous data source.
|
||||
*/
|
||||
static DataSource createAnonymous(String type, URL url) {
|
||||
return create(null, type, url);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataStoreId, String, InputStream)}.
|
||||
*/
|
||||
static DataSource create(DataStoreId id, String type, URL url) {
|
||||
try (var in = url.openStream()) {
|
||||
return create(id, type, in);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #create(DataStoreId, String, InputStream)} that creates an anonymous data source.
|
||||
*/
|
||||
static DataSource createAnonymous(String type, InputStream in) {
|
||||
return create(null, type, in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new data source from an input stream.
|
||||
*
|
||||
* @param id the data source id
|
||||
* @param type the data source type
|
||||
* @param in the input stream to read
|
||||
* @return a {@link DataSource} instances that can be used to access the underlying data
|
||||
*/
|
||||
static DataSource create(DataStoreId id, String type, InputStream in) {
|
||||
return DataSourceImpl.create(id, type, in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new data source from an input stream.
|
||||
*
|
||||
* @param id the data source id
|
||||
* @return a {@link DataSource} instances that can be used to access the underlying data
|
||||
*/
|
||||
static DataSource create(DataStoreId id, io.xpipe.core.source.DataSource<?> source) {
|
||||
return DataSourceImpl.create(id, source);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new data source from an input stream.
|
||||
* 1
|
||||
*
|
||||
* @param id the data source id
|
||||
* @param type the data source type
|
||||
* @param in the data store to add
|
||||
* @return a {@link DataSource} instances that can be used to access the underlying data
|
||||
*/
|
||||
static DataSource create(DataStoreId id, String type, DataStore in) {
|
||||
return DataSourceImpl.create(id, type, in);
|
||||
}
|
||||
|
||||
void forwardTo(DataSource target);
|
||||
|
||||
void appendTo(DataSource target);
|
||||
|
||||
io.xpipe.core.source.DataSource<?> getInternalSource();
|
||||
|
||||
/**
|
||||
* Returns the id of this data source.
|
||||
*/
|
||||
DataStoreId getId();
|
||||
|
||||
/**
|
||||
* Returns the type of this data source.
|
||||
*/
|
||||
DataSourceType getType();
|
||||
|
||||
DataSourceConfig getConfig();
|
||||
|
||||
/**
|
||||
* Attempts to cast this object to a {@link DataTable}.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the data source is not a table
|
||||
*/
|
||||
default DataTable asTable() {
|
||||
throw new UnsupportedOperationException("Data source is not a table");
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to cast this object to a {@link DataStructure}.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the data source is not a structure
|
||||
*/
|
||||
default DataStructure asStructure() {
|
||||
throw new UnsupportedOperationException("Data source is not a structure");
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to cast this object to a {@link DataText}.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the data source is not a text
|
||||
*/
|
||||
default DataText asText() {
|
||||
throw new UnsupportedOperationException("Data source is not a text");
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to cast this object to a {@link DataRaw}.
|
||||
*
|
||||
* @throws UnsupportedOperationException if the data source is not raw
|
||||
*/
|
||||
default DataRaw asRaw() {
|
||||
throw new UnsupportedOperationException("Data source is not raw");
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Represents the current configuration of a data source.
|
||||
*/
|
||||
public final class DataSourceConfig {
|
||||
|
||||
/**
|
||||
* The data source provider id.
|
||||
*/
|
||||
private final String provider;
|
||||
|
||||
/**
|
||||
* The set configuration parameters.
|
||||
*/
|
||||
private final Map<String, String> configInstance;
|
||||
|
||||
public DataSourceConfig(String provider, Map<String, String> configInstance) {
|
||||
this.provider = provider;
|
||||
this.configInstance = configInstance;
|
||||
}
|
||||
|
||||
public String getProvider() {
|
||||
return provider;
|
||||
}
|
||||
|
||||
public Map<String, String> getConfig() {
|
||||
return configInstance;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
|
||||
public interface DataStructure extends DataSource {
|
||||
DataStructureNode read();
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import io.xpipe.core.data.node.ArrayNode;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public interface DataTable extends Iterable<TupleNode>, DataSource {
|
||||
|
||||
Stream<TupleNode> stream();
|
||||
|
||||
ArrayNode readAll();
|
||||
|
||||
ArrayNode read(int maxRows);
|
||||
|
||||
default int countAndDiscard() {
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
try (var stream = stream()) {
|
||||
stream.forEach(dataStructureNodes -> {
|
||||
count.getAndIncrement();
|
||||
});
|
||||
}
|
||||
return count.get();
|
||||
}
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import io.xpipe.api.impl.DataTableAccumulatorImpl;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
|
||||
import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
|
||||
/**
|
||||
* An accumulator for table data.
|
||||
* <p>
|
||||
* This class can be used to construct new table data sources by
|
||||
* accumulating the rows using {@link #add(DataStructureNode)} or {@link #acceptor()} and then calling
|
||||
* {@link #finish(DataStoreId)} to complete the construction process and create a new data source.
|
||||
*/
|
||||
public interface DataTableAccumulator {
|
||||
|
||||
static DataTableAccumulator create(TupleType type) {
|
||||
return new DataTableAccumulatorImpl(type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for {@link #finish(DataStoreId)}.
|
||||
*/
|
||||
default DataTable finish(String id) {
|
||||
return finish(DataStoreId.fromString(id));
|
||||
}
|
||||
|
||||
/**
|
||||
* Finishes the construction process and returns the data source reference.
|
||||
*
|
||||
* @param id the data source id to assign
|
||||
*/
|
||||
DataTable finish(DataStoreId id);
|
||||
|
||||
/**
|
||||
* Adds a row to the table.
|
||||
*
|
||||
* @param row the row to add
|
||||
*/
|
||||
void add(DataStructureNode row);
|
||||
|
||||
/**
|
||||
* Creates a tuple acceptor that adds all accepted tuples to the table.
|
||||
*/
|
||||
DataStructureNodeAcceptor<DataStructureNode> acceptor();
|
||||
|
||||
/**
|
||||
* Returns the current amount of rows added to the table.
|
||||
*/
|
||||
int getCurrentRows();
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public interface DataText extends DataSource {
|
||||
|
||||
List<String> readAllLines();
|
||||
|
||||
List<String> readLines(int maxLines);
|
||||
|
||||
Stream<String> lines();
|
||||
|
||||
String readAll();
|
||||
|
||||
String read(int maxCharacters);
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataRaw;
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
public class DataRawImpl extends DataSourceImpl implements DataRaw {
|
||||
|
||||
public DataRawImpl(
|
||||
DataStoreId sourceId, DataSourceConfig sourceConfig, io.xpipe.core.source.DataSource<?> internalSource) {
|
||||
super(sourceId, sourceConfig, internalSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream open() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] readAll() {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] read(int maxBytes) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceType getType() {
|
||||
return DataSourceType.RAW;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataRaw asRaw() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -1,161 +0,0 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSource;
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.connector.XPipeApiConnection;
|
||||
import io.xpipe.beacon.exchange.*;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.store.DataStore;
|
||||
import io.xpipe.core.store.StreamDataStore;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
public abstract class DataSourceImpl implements DataSource {
|
||||
|
||||
private final DataStoreId sourceId;
|
||||
private final DataSourceConfig config;
|
||||
private final io.xpipe.core.source.DataSource<?> internalSource;
|
||||
|
||||
public DataSourceImpl(
|
||||
DataStoreId sourceId, DataSourceConfig config, io.xpipe.core.source.DataSource<?> internalSource) {
|
||||
this.sourceId = sourceId;
|
||||
this.config = config;
|
||||
this.internalSource = internalSource;
|
||||
}
|
||||
|
||||
public static DataSource get(DataSourceReference ds) {
|
||||
return XPipeApiConnection.execute(con -> {
|
||||
var req = QueryDataSourceExchange.Request.builder().ref(ds).build();
|
||||
QueryDataSourceExchange.Response res = con.performSimpleExchange(req);
|
||||
var config = new DataSourceConfig(res.getProvider(), res.getConfig());
|
||||
return switch (res.getType()) {
|
||||
case TABLE -> {
|
||||
yield new DataTableImpl(res.getId(), config, res.getInternalSource());
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
yield new DataStructureImpl(res.getId(), config, res.getInternalSource());
|
||||
}
|
||||
case TEXT -> {
|
||||
yield new DataTextImpl(res.getId(), config, res.getInternalSource());
|
||||
}
|
||||
case RAW -> {
|
||||
yield new DataRawImpl(res.getId(), config, res.getInternalSource());
|
||||
}
|
||||
case COLLECTION -> throw new UnsupportedOperationException("Unimplemented case: " + res.getType());
|
||||
default -> throw new IllegalArgumentException("Unexpected value: " + res.getType());
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
public static DataSource create(DataStoreId id, io.xpipe.core.source.DataSource<?> source) {
|
||||
var startReq =
|
||||
AddSourceExchange.Request.builder().source(source).target(id).build();
|
||||
var returnedId = XPipeApiConnection.execute(con -> {
|
||||
AddSourceExchange.Response r = con.performSimpleExchange(startReq);
|
||||
return r.getId();
|
||||
});
|
||||
|
||||
var ref = DataSourceReference.id(returnedId);
|
||||
return get(ref);
|
||||
}
|
||||
|
||||
public static DataSource create(DataStoreId id, String type, DataStore store) {
|
||||
if (store instanceof StreamDataStore s && s.isContentExclusivelyAccessible()) {
|
||||
store = XPipeApiConnection.execute(con -> {
|
||||
var internal = con.createInternalStreamStore();
|
||||
var req = WriteStreamExchange.Request.builder()
|
||||
.name(internal.getUuid().toString())
|
||||
.build();
|
||||
con.performOutputExchange(req, out -> {
|
||||
try (InputStream inputStream = s.openInput()) {
|
||||
inputStream.transferTo(out);
|
||||
}
|
||||
});
|
||||
return internal;
|
||||
});
|
||||
}
|
||||
|
||||
var startReq = ReadExchange.Request.builder()
|
||||
.provider(type)
|
||||
.store(store)
|
||||
.target(id)
|
||||
.configureAll(false)
|
||||
.build();
|
||||
var startRes = XPipeApiConnection.execute(con -> {
|
||||
return con.<ReadExchange.Request, ReadExchange.Response>performSimpleExchange(startReq);
|
||||
});
|
||||
|
||||
var configInstance = startRes.getConfig();
|
||||
XPipeApiConnection.finishDialog(configInstance);
|
||||
|
||||
var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest();
|
||||
return get(ref);
|
||||
}
|
||||
|
||||
public static DataSource create(DataStoreId id, String type, InputStream in) {
|
||||
var store = XPipeApiConnection.execute(con -> {
|
||||
var internal = con.createInternalStreamStore();
|
||||
var req = WriteStreamExchange.Request.builder()
|
||||
.name(internal.getUuid().toString())
|
||||
.build();
|
||||
con.performOutputExchange(req, out -> {
|
||||
in.transferTo(out);
|
||||
});
|
||||
return internal;
|
||||
});
|
||||
|
||||
var startReq = ReadExchange.Request.builder()
|
||||
.provider(type)
|
||||
.store(store)
|
||||
.target(id)
|
||||
.configureAll(false)
|
||||
.build();
|
||||
var startRes = XPipeApiConnection.execute(con -> {
|
||||
return con.<ReadExchange.Request, ReadExchange.Response>performSimpleExchange(startReq);
|
||||
});
|
||||
|
||||
var configInstance = startRes.getConfig();
|
||||
XPipeApiConnection.finishDialog(configInstance);
|
||||
|
||||
var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest();
|
||||
return get(ref);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forwardTo(DataSource target) {
|
||||
XPipeApiConnection.execute(con -> {
|
||||
var req = ForwardExchange.Request.builder()
|
||||
.source(DataSourceReference.id(sourceId))
|
||||
.target(DataSourceReference.id(target.getId()))
|
||||
.build();
|
||||
ForwardExchange.Response res = con.performSimpleExchange(req);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendTo(DataSource target) {
|
||||
XPipeApiConnection.execute(con -> {
|
||||
var req = ForwardExchange.Request.builder()
|
||||
.source(DataSourceReference.id(sourceId))
|
||||
.target(DataSourceReference.id(target.getId()))
|
||||
.append(true)
|
||||
.build();
|
||||
ForwardExchange.Response res = con.performSimpleExchange(req);
|
||||
});
|
||||
}
|
||||
|
||||
public io.xpipe.core.source.DataSource<?> getInternalSource() {
|
||||
return internalSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStoreId getId() {
|
||||
return sourceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceConfig getConfig() {
|
||||
return config;
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataStructure;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
public class DataStructureImpl extends DataSourceImpl implements DataStructure {
|
||||
|
||||
DataStructureImpl(
|
||||
DataStoreId sourceId, DataSourceConfig sourceConfig, io.xpipe.core.source.DataSource<?> internalSource) {
|
||||
super(sourceId, sourceConfig, internalSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceType getType() {
|
||||
return DataSourceType.STRUCTURE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStructure asStructure() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStructureNode read() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -1,115 +0,0 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSource;
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.DataTableAccumulator;
|
||||
import io.xpipe.api.connector.XPipeApiConnection;
|
||||
import io.xpipe.api.util.TypeDescriptor;
|
||||
import io.xpipe.beacon.BeaconException;
|
||||
import io.xpipe.beacon.exchange.ReadExchange;
|
||||
import io.xpipe.beacon.exchange.WriteStreamExchange;
|
||||
import io.xpipe.beacon.exchange.cli.StoreAddExchange;
|
||||
import io.xpipe.beacon.util.QuietDialogHandler;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.data.typed.TypedDataStreamWriter;
|
||||
import io.xpipe.core.impl.InternalStreamStore;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class DataTableAccumulatorImpl implements DataTableAccumulator {
|
||||
|
||||
private final XPipeApiConnection connection;
|
||||
private final TupleType type;
|
||||
private int rows;
|
||||
private final InternalStreamStore store;
|
||||
private TupleType writtenDescriptor;
|
||||
private final OutputStream bodyOutput;
|
||||
|
||||
public DataTableAccumulatorImpl(TupleType type) {
|
||||
this.type = type;
|
||||
connection = XPipeApiConnection.open();
|
||||
|
||||
store = new InternalStreamStore();
|
||||
var addReq = StoreAddExchange.Request.builder()
|
||||
.storeInput(store)
|
||||
.name(store.getUuid().toString())
|
||||
.build();
|
||||
StoreAddExchange.Response addRes = connection.performSimpleExchange(addReq);
|
||||
QuietDialogHandler.handle(addRes.getConfig(), connection);
|
||||
|
||||
connection.sendRequest(WriteStreamExchange.Request.builder()
|
||||
.name(store.getUuid().toString())
|
||||
.build());
|
||||
bodyOutput = connection.sendBody();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DataTable finish(DataStoreId id) {
|
||||
try {
|
||||
bodyOutput.close();
|
||||
} catch (IOException e) {
|
||||
throw new BeaconException(e);
|
||||
}
|
||||
|
||||
WriteStreamExchange.Response res = connection.receiveResponse();
|
||||
connection.close();
|
||||
|
||||
var req = ReadExchange.Request.builder()
|
||||
.target(id)
|
||||
.store(store)
|
||||
.provider("xpbt")
|
||||
.configureAll(false)
|
||||
.build();
|
||||
ReadExchange.Response response = XPipeApiConnection.execute(con -> {
|
||||
return con.performSimpleExchange(req);
|
||||
});
|
||||
|
||||
var configInstance = response.getConfig();
|
||||
XPipeApiConnection.finishDialog(configInstance);
|
||||
|
||||
return DataSource.get(DataSourceReference.id(id)).asTable();
|
||||
}
|
||||
|
||||
private void writeDescriptor() {
|
||||
if (writtenDescriptor != null) {
|
||||
return;
|
||||
}
|
||||
writtenDescriptor = TupleType.tableType(type.getNames());
|
||||
|
||||
connection.withOutputStream(out -> {
|
||||
out.write((TypeDescriptor.create(type.getNames())).getBytes(StandardCharsets.UTF_8));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void add(DataStructureNode row) {
|
||||
TupleNode toUse = type.matches(row)
|
||||
? row.asTuple()
|
||||
: type.convert(row).orElseThrow().asTuple();
|
||||
connection.withOutputStream(out -> {
|
||||
writeDescriptor();
|
||||
TypedDataStreamWriter.writeStructure(out, toUse, writtenDescriptor);
|
||||
rows++;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DataStructureNodeAcceptor<DataStructureNode> acceptor() {
|
||||
return node -> {
|
||||
add(node);
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getCurrentRows() {
|
||||
return rows;
|
||||
}
|
||||
}
|
||||
@@ -1,61 +0,0 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.core.data.node.ArrayNode;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
|
||||
DataTableImpl(DataStoreId id, DataSourceConfig sourceConfig, io.xpipe.core.source.DataSource<?> internalSource) {
|
||||
super(id, sourceConfig, internalSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataTable asTable() {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Stream<TupleNode> stream() {
|
||||
return Stream.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceType getType() {
|
||||
return DataSourceType.TABLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayNode readAll() {
|
||||
return read(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayNode read(int maxRows) {
|
||||
List<DataStructureNode> nodes = new ArrayList<>();
|
||||
return ArrayNode.of(nodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<TupleNode> iterator() {
|
||||
return new Iterator<>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TupleNode next() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataText;
|
||||
import io.xpipe.core.source.DataStoreId;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class DataTextImpl extends DataSourceImpl implements DataText {
|
||||
|
||||
DataTextImpl(
|
||||
DataStoreId sourceId, DataSourceConfig sourceConfig, io.xpipe.core.source.DataSource<?> internalSource) {
|
||||
super(sourceId, sourceConfig, internalSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceType getType() {
|
||||
return DataSourceType.TEXT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataText asText() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> readAllLines() {
|
||||
return readLines(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> readLines(int maxLines) {
|
||||
try (Stream<String> lines = lines()) {
|
||||
return lines.limit(maxLines).toList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<String> lines() {
|
||||
return Stream.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readAll() {
|
||||
try (Stream<String> lines = lines()) {
|
||||
return lines.collect(Collectors.joining("\n"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String read(int maxCharacters) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
lines().takeWhile(s -> {
|
||||
if (builder.length() > maxCharacters) {
|
||||
return false;
|
||||
}
|
||||
|
||||
builder.append(s);
|
||||
return true;
|
||||
});
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user