Initial commit

This commit is contained in:
Christopher Schnick
2021-12-01 19:17:54 +01:00
commit 63cdfb40c5
92 changed files with 3882 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
package io.xpipe.api;
import io.xpipe.api.impl.DataTableImpl;
import io.xpipe.core.data.generic.ArrayNode;
import io.xpipe.core.data.generic.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.source.DataSourceId;
import java.util.OptionalInt;
public interface DataTable extends Iterable<TupleNode> {
static DataTable get(String s) {
return DataTableImpl.get(s);
}
DataSourceId getId();
int getRowCount();
OptionalInt getRowCountIfPresent();
DataType getDataType();
ArrayNode readAll();
ArrayNode read(int maxRows);
}

View File

@@ -0,0 +1,21 @@
package io.xpipe.api;
import java.util.function.IntConsumer;
public class IntConverter {
private IntConsumer consumer;
public IntConverter(IntConsumer consumer) {
this.consumer = consumer;
}
public void onValue(byte[] value) {
if (value.length > 4) {
throw new IllegalArgumentException("Unable to fit " + value.length + " bytes into an integer");
}
int v = value[0] << 24 | (value[1] & 0xFF) << 16 | (value[2] & 0xFF) << 8 | (value[3] & 0xFF);
consumer.accept(v);
}
}

View File

@@ -0,0 +1,39 @@
package io.xpipe.api;
import io.xpipe.beacon.*;
import io.xpipe.beacon.socket.SocketClient;
public abstract class XPipeApiConnector extends XPipeConnector {
public void execute() {
try {
var socket = constructSocket();
handle(socket);
} catch (ConnectorException ce) {
throw new XPipeException("Connection error: " + ce.getMessage());
} catch (ClientException ce) {
throw new XPipeException("Client error: " + ce.getMessage());
} catch (ServerException se) {
throw new XPipeException("Server error: " + se.getMessage());
} catch (Throwable t) {
throw new XPipeException("Unexpected error", t);
}
}
protected abstract void handle(SocketClient sc) throws Exception;
@Override
protected void waitForStartup() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@FunctionalInterface
public static interface Handler {
void handle(SocketClient sc) throws ClientException, ServerException;
}
}

View File

@@ -0,0 +1,8 @@
package io.xpipe.api;
import io.xpipe.core.data.DataStructureNode;
public interface XPipeDataStructureSource {
DataStructureNode read();
}

View File

@@ -0,0 +1,12 @@
package io.xpipe.api;
import io.xpipe.core.source.DataSourceId;
public abstract class XPipeDataTableBuilder {
private DataSourceId id;
public abstract void write();
public abstract void commit();
}

View File

@@ -0,0 +1,23 @@
package io.xpipe.api;
public class XPipeException extends RuntimeException {
public XPipeException() {
}
public XPipeException(String message) {
super(message);
}
public XPipeException(String message, Throwable cause) {
super(message, cause);
}
public XPipeException(Throwable cause) {
super(cause);
}
public XPipeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -0,0 +1,164 @@
package io.xpipe.api.impl;
import io.xpipe.api.DataTable;
import io.xpipe.api.XPipeApiConnector;
import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
import io.xpipe.beacon.ServerException;
import io.xpipe.beacon.socket.SocketClient;
import io.xpipe.beacon.message.impl.ReadTableDataExchange;
import io.xpipe.beacon.message.impl.ReadTableInfoExchange;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.generic.ArrayNode;
import io.xpipe.core.data.generic.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TypedDataStreamReader;
import io.xpipe.core.data.type.callback.TypedDataStreamCallback;
import io.xpipe.core.data.type.callback.TypedDataStructureNodeCallback;
import io.xpipe.core.source.DataSourceId;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class DataTableImpl implements DataTable {
public static DataTable get(String s) {
final DataTable[] table = {null};
var ds = DataSourceId.fromString(s);
new XPipeApiConnector() {
@Override
protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableInfoExchange.Request(ds);
ReadTableInfoExchange.Response res = performSimpleExchange(sc, req);
table[0] = new DataTableImpl(res.sourceId(), res.rowCount(), res.dataType());
}
}.execute();
return table[0];
}
private final DataSourceId id;
private final int size;
private final DataType dataType;
public DataTableImpl(DataSourceId id, int size, DataType dataType) {
this.id = id;
this.size = size;
this.dataType = dataType;
}
public Stream<TupleNode> stream() {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED), false);
}
@Override
public DataSourceId getId() {
return id;
}
@Override
public int getRowCount() {
if (size == -1) {
throw new UnsupportedOperationException("Row count is unknown");
}
return size;
}
@Override
public OptionalInt getRowCountIfPresent() {
return size != -1 ? OptionalInt.of(size) : OptionalInt.empty();
}
@Override
public DataType getDataType() {
return dataType;
}
@Override
public ArrayNode readAll() {
return read(Integer.MAX_VALUE);
}
@Override
public ArrayNode read(int maxRows) {
int maxToRead = size == -1 ? maxRows : Math.min(size, maxRows);
List<DataStructureNode> nodes = new ArrayList<>();
new XPipeApiConnector() {
@Override
protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, maxToRead);
performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
TypedDataStreamReader.readStructures(in, new TypedDataStructureNodeCallback(dataType, nodes::add));
}, false);
}
}.execute();
return ArrayNode.wrap(nodes);
}
@Override
public Iterator<TupleNode> iterator() {
return new Iterator<TupleNode>() {
private InputStream input;
private int read;
private final int toRead = size;
private TypedDataStreamCallback callback;
private TupleNode current;
{
new XPipeApiConnector() {
@Override
protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, Integer.MAX_VALUE);
performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
input = in;
}, false);
}
}.execute();
callback = new TypedDataStructureNodeCallback(dataType, dsn -> {
current = (TupleNode) dsn;
});
}
private boolean hasKnownSize() {
return size != -1;
}
@Override
public boolean hasNext() {
if (hasKnownSize() && read == toRead) {
return false;
}
if (hasKnownSize() && read < toRead) {
return true;
}
try {
return TypedDataStreamReader.hasNext(input);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
public TupleNode next() {
try {
TypedDataStreamReader.readStructure(input, callback);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
read++;
return current;
}
};
}
}

View File

@@ -0,0 +1,7 @@
module io.xpipe.api {
requires io.xpipe.core;
requires io.xpipe.beacon;
requires org.apache.commons.lang;
exports io.xpipe.api;
}