More beacon additions and small fixes

This commit is contained in:
Christopher Schnick
2022-05-28 23:52:08 +02:00
parent 628e322ca3
commit 948dcf33ef
25 changed files with 449 additions and 74 deletions

View File

@@ -14,6 +14,11 @@ public class MutableValueNode extends ValueNode {
this.textual = textual;
}
@Override
public String asString() {
return new String(data);
}
@Override
public String toString(int indent) {
return (textual ? "\"" : "") + new String(data) + (textual ? "\"" : "") + " (M)";

View File

@@ -1,6 +1,11 @@
package io.xpipe.core.source;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import io.xpipe.core.store.DataStore;
import io.xpipe.core.util.JacksonHelper;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.SneakyThrows;
import java.util.Optional;
@@ -10,16 +15,25 @@ import java.util.Optional;
*
* This instance is only valid in combination with its associated data store instance.
*/
@AllArgsConstructor
public abstract class DataSource<DS extends DataStore> {
@NonNull
protected DS store;
public DataSource(DS store) {
this.store = store;
@SneakyThrows
@SuppressWarnings("unchecked")
public DataSource<DS> copy() {
var mapper = JacksonHelper.newMapper();
TokenBuffer tb = new TokenBuffer(mapper, false);
mapper.writeValue(tb, this);
return mapper.readValue(tb.asParser(), getClass());
}
public DataSource<DS> withStore(DS newStore) {
return null;
public DataSource<DS> withStore(DS store) {
var c = copy();
c.store = store;
return c;
}
/**

View File

@@ -86,10 +86,12 @@ public abstract class DataSourceInfo {
@Value
@JsonTypeName("text")
public static class Text extends DataSourceInfo {
int characters;
int lineCount;
@JsonCreator
public Text(int lineCount) {
public Text(int characters, int lineCount) {
this.characters = characters;
this.lineCount = lineCount;
}

View File

@@ -1,11 +1,21 @@
package io.xpipe.core.source;
import java.io.OutputStream;
public interface RawReadConnection extends DataSourceReadConnection {
byte[] readBytes(int max) throws Exception;
int BUFFER_SIZE = 8192;
default void forwardBytes(OutputStream out, int maxBytes) throws Exception {
if (maxBytes == 0) {
return;
}
out.write(readBytes(maxBytes));
}
default void forward(DataSourceConnection con) throws Exception {
try (var tCon = (RawWriteConnection) con) {
tCon.init();

View File

@@ -2,6 +2,8 @@ package io.xpipe.core.source;
import io.xpipe.core.store.DataStore;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class TextDataSource<DS extends DataStore> extends DataSource<DS> {
private static final int MAX_LINE_READ = 1000;
@@ -13,9 +15,14 @@ public abstract class TextDataSource<DS extends DataStore> extends DataSource<DS
@Override
public final DataSourceInfo determineInfo() throws Exception {
try (var con = openReadConnection()) {
int count = (int) con.lines().limit(MAX_LINE_READ).count();
int usedCount = count == MAX_LINE_READ ? -1 : count;
return new DataSourceInfo.Text(usedCount);
AtomicInteger lineCount = new AtomicInteger();
AtomicInteger charCount = new AtomicInteger();
con.lines().limit(MAX_LINE_READ).forEach(s -> {
lineCount.getAndIncrement();
charCount.addAndGet(s.length());
});
boolean limitHit = lineCount.get() == MAX_LINE_READ;
return new DataSourceInfo.Text(limitHit ? -1 : charCount.get(), limitHit ? -1 : lineCount.get());
}
}

View File

@@ -0,0 +1,93 @@
package io.xpipe.core.store;
import lombok.EqualsAndHashCode;
import lombok.Value;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@EqualsAndHashCode
@Value
public class StdinDataStore implements StreamDataStore {
@Override
public InputStream openInput() throws Exception {
var in = System.in;
return new InputStream() {
@Override
public int read() throws IOException {
return in.read();
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public byte[] readAllBytes() throws IOException {
return in.readAllBytes();
}
@Override
public byte[] readNBytes(int len) throws IOException {
return in.readNBytes(len);
}
@Override
public int readNBytes(byte[] b, int off, int len) throws IOException {
return in.readNBytes(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return in.skip(n);
}
@Override
public void skipNBytes(long n) throws IOException {
in.skipNBytes(n);
}
@Override
public int available() throws IOException {
return in.available();
}
@Override
public void close() throws IOException {
}
@Override
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
in.reset();
}
@Override
public boolean markSupported() {
return in.markSupported();
}
@Override
public long transferTo(OutputStream out) throws IOException {
return in.transferTo(out);
}
};
}
@Override
public boolean exists() {
return false;
}
}

View File

@@ -0,0 +1,46 @@
package io.xpipe.core.store;
import lombok.EqualsAndHashCode;
import lombok.Value;
import java.io.IOException;
import java.io.OutputStream;
@EqualsAndHashCode
@Value
public class StdoutDataStore implements StreamDataStore {
@Override
public OutputStream openOutput() throws Exception {
return new OutputStream() {
@Override
public void write(int b) throws IOException {
System.out.write(b);
}
@Override
public void write(byte[] b) throws IOException {
System.out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
System.out.write(b, off, len);
}
@Override
public void flush() throws IOException {
System.out.flush();
}
@Override
public void close() throws IOException {
}
};
}
@Override
public boolean exists() {
return false;
}
}