mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-04-26 01:17:52 -04:00
More fixes
This commit is contained in:
@@ -2,25 +2,16 @@ package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.connector.XPipeApiConnection;
|
||||
import io.xpipe.beacon.BeaconConnection;
|
||||
import io.xpipe.beacon.BeaconException;
|
||||
import io.xpipe.beacon.exchange.api.QueryTableDataExchange;
|
||||
import io.xpipe.core.data.node.ArrayNode;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
import io.xpipe.core.data.typed.TypedAbstractReader;
|
||||
import io.xpipe.core.data.typed.TypedDataStreamParser;
|
||||
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
|
||||
@@ -34,9 +25,7 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
}
|
||||
|
||||
public Stream<TupleNode> stream() {
|
||||
var iterator = new TableIterator();
|
||||
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
|
||||
.onClose(iterator::finish);
|
||||
return Stream.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -52,71 +41,21 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
@Override
|
||||
public ArrayNode read(int maxRows) {
|
||||
List<DataStructureNode> nodes = new ArrayList<>();
|
||||
XPipeApiConnection.execute(con -> {
|
||||
var req = QueryTableDataExchange.Request.builder()
|
||||
.ref(DataSourceReference.id(getId()))
|
||||
.maxRows(maxRows)
|
||||
.build();
|
||||
con.performInputExchange(req, (QueryTableDataExchange.Response res, InputStream in) -> {
|
||||
var r = new TypedDataStreamParser(res.getDataType());
|
||||
|
||||
r.parseStructures(in, TypedDataStructureNodeReader.of(res.getDataType()), nodes::add);
|
||||
});
|
||||
});
|
||||
return ArrayNode.of(nodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<TupleNode> iterator() {
|
||||
return new TableIterator();
|
||||
}
|
||||
|
||||
private class TableIterator implements Iterator<TupleNode> {
|
||||
|
||||
private final BeaconConnection connection;
|
||||
private final TypedDataStreamParser parser;
|
||||
private final TypedAbstractReader nodeReader;
|
||||
private TupleNode node;
|
||||
|
||||
{
|
||||
connection = XPipeApiConnection.open();
|
||||
var req = QueryTableDataExchange.Request.builder()
|
||||
.ref(DataSourceReference.id(getId()))
|
||||
.maxRows(Integer.MAX_VALUE)
|
||||
.build();
|
||||
connection.sendRequest(req);
|
||||
QueryTableDataExchange.Response response = connection.receiveResponse();
|
||||
|
||||
nodeReader = TypedDataStructureNodeReader.of(response.getDataType());
|
||||
parser = new TypedDataStreamParser(response.getDataType());
|
||||
|
||||
connection.receiveBody();
|
||||
}
|
||||
|
||||
private void finish() {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
connection.checkClosed();
|
||||
|
||||
try {
|
||||
node = (TupleNode) parser.parseStructure(connection.getInputStream(), nodeReader);
|
||||
} catch (IOException e) {
|
||||
throw new BeaconException(e);
|
||||
return new Iterator<TupleNode>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return false;
|
||||
}
|
||||
if (node == null) {
|
||||
// finish();
|
||||
|
||||
@Override
|
||||
public TupleNode next() {
|
||||
return null;
|
||||
}
|
||||
return node != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TupleNode next() {
|
||||
connection.checkClosed();
|
||||
|
||||
return node;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,25 +2,12 @@ package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataText;
|
||||
import io.xpipe.api.connector.XPipeApiConnection;
|
||||
import io.xpipe.beacon.BeaconConnection;
|
||||
import io.xpipe.beacon.BeaconException;
|
||||
import io.xpipe.beacon.exchange.api.QueryTextDataExchange;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.source.DataSourceType;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class DataTextImpl extends DataSourceImpl implements DataText {
|
||||
|
||||
@@ -53,47 +40,7 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
|
||||
|
||||
@Override
|
||||
public Stream<String> lines() {
|
||||
var iterator = new Iterator<String>() {
|
||||
|
||||
private final BeaconConnection connection;
|
||||
private final BufferedReader reader;
|
||||
private String nextValue;
|
||||
|
||||
{
|
||||
connection = XPipeApiConnection.open();
|
||||
var req = QueryTextDataExchange.Request.builder()
|
||||
.ref(DataSourceReference.id(getId()))
|
||||
.maxLines(-1)
|
||||
.build();
|
||||
connection.sendRequest(req);
|
||||
connection.receiveResponse();
|
||||
reader = new BufferedReader(new InputStreamReader(connection.receiveBody(), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
private void close() {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
connection.checkClosed();
|
||||
|
||||
try {
|
||||
nextValue = reader.readLine();
|
||||
} catch (IOException e) {
|
||||
throw new BeaconException(e);
|
||||
}
|
||||
return nextValue != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
return nextValue;
|
||||
}
|
||||
};
|
||||
|
||||
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
|
||||
.onClose(iterator::close);
|
||||
return Stream.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user