Implement drains and small fixes

This commit is contained in:
Christopher Schnick
2022-11-01 08:01:33 +01:00
parent a0c008ab3b
commit 3c72078d2b
14 changed files with 250 additions and 36 deletions

View File

@@ -5,7 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Value;
@Value
@AllArgsConstructor(onConstructor_ = @JsonCreator)
@AllArgsConstructor(onConstructor=@__({@JsonCreator}))
public class Choice {
/**

View File

@@ -0,0 +1,71 @@
package io.xpipe.core.impl;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.source.TableReadConnection;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;
public class BufferedTableReadConnection implements TableReadConnection {
private final TableReadConnection connection;
private final int maxCount;
private int count = 0;
private ArrayNode read;
public BufferedTableReadConnection(TableReadConnection connection, int maxCount) throws Exception {
this.connection = connection;
this.maxCount = maxCount;
read = connection.readRows(maxCount);
}
private TupleNode get() throws Exception {
if (count == read.size()) {
read = connection.readRows(maxCount);
}
if (read.size() == 0) {
return null;
}
return read.at(count++).asTuple();
}
@Override
public void close() throws Exception {
connection.close();
}
@Override
public TupleType getDataType() {
return connection.getDataType();
}
@Override
public OptionalInt getRowCount() throws Exception {
return connection.getRowCount();
}
@Override
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
AtomicInteger localCounter = new AtomicInteger();
TupleNode node;
while (((node = get()) != null)) {
var returned = lineAcceptor.accept(node);
if (!returned) {
break;
}
localCounter.getAndIncrement();
}
return localCounter.get();
}
@Override
public boolean canRead() throws Exception {
return connection.canRead();
}
}

View File

@@ -0,0 +1,67 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.store.KnownFormatStreamDataStore;
import io.xpipe.core.util.JacksonizedValue;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
@JsonTypeName("drain")
@SuperBuilder
@Jacksonized
@Getter
public class DrainStore extends JacksonizedValue implements KnownFormatStreamDataStore {
private final String description;
private final StreamCharset charset;
private final NewLine newLine;
@JsonIgnore
private boolean open;
@JsonIgnore
private Pipe pipe;
private boolean used;
private void waitForOpen() {
while (!open) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
break;
}
}
}
@Override
public boolean canOpen() throws Exception {
return false;
}
@Override
public InputStream openInput() throws Exception {
if (used) {
throw new IllegalStateException("Drain has already been used");
}
waitForOpen();
return Channels.newInputStream(pipe.source());
}
@Override
public OutputStream openOutput() throws Exception {
used = true;
pipe = Pipe.open();
open = true;
return Channels.newOutputStream(pipe.sink());
}
}

View File

@@ -11,14 +11,15 @@ import java.util.Optional;
public abstract class TableDataSource<DS extends DataStore> extends DataSource<DS> {
public Optional<TupleType> determineDataType() throws Exception {
try (var readConnection = newReadConnection()) {
var canRead = readConnection != null && readConnection.canRead();
if (canRead) {
var readConnection = newReadConnection();
var canRead = readConnection != null && readConnection.canRead();
if (canRead) {
try (var in = readConnection) {
readConnection.init();
return Optional.ofNullable(readConnection.getDataType());
} else {
return Optional.empty();
}
} else {
return Optional.empty();
}
}

View File

@@ -6,6 +6,7 @@ import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedDataStreamWriter;
import io.xpipe.core.impl.BufferedTableReadConnection;
import io.xpipe.core.impl.LimitTableReadConnection;
import java.io.OutputStream;
@@ -63,6 +64,14 @@ public interface TableReadConnection extends DataSourceReadConnection {
return new LimitTableReadConnection(this, limit);
}
default TableReadConnection buffered() throws Exception {
return buffered(Integer.MAX_VALUE);
}
default TableReadConnection buffered(int limit) throws Exception {
return new BufferedTableReadConnection(this, limit);
}
/**
* Consumes the table rows until the acceptor returns false.
*

View File

@@ -0,0 +1,11 @@
package io.xpipe.core.store;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
public interface KnownFormatStreamDataStore extends StreamDataStore {
StreamCharset getCharset();
NewLine getNewLine();
}