mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-04-26 09:28:26 -04:00
Make drains more robust
This commit is contained in:
@@ -1,67 +0,0 @@
|
||||
package io.xpipe.core.impl;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.xpipe.core.charsetter.NewLine;
|
||||
import io.xpipe.core.charsetter.StreamCharset;
|
||||
import io.xpipe.core.store.KnownFormatStreamDataStore;
|
||||
import io.xpipe.core.util.JacksonizedValue;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.Pipe;
|
||||
|
||||
@JsonTypeName("drain")
|
||||
@SuperBuilder
|
||||
@Jacksonized
|
||||
@Getter
|
||||
public class DrainStore extends JacksonizedValue implements KnownFormatStreamDataStore {
|
||||
|
||||
private final String description;
|
||||
private final StreamCharset charset;
|
||||
private final NewLine newLine;
|
||||
|
||||
@JsonIgnore
|
||||
private boolean open;
|
||||
@JsonIgnore
|
||||
private Pipe pipe;
|
||||
|
||||
private boolean used;
|
||||
|
||||
private void waitForOpen() {
|
||||
while (!open) {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canOpen() throws Exception {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream openInput() throws Exception {
|
||||
if (used) {
|
||||
throw new IllegalStateException("Drain has already been used");
|
||||
}
|
||||
|
||||
waitForOpen();
|
||||
return Channels.newInputStream(pipe.source());
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream openOutput() throws Exception {
|
||||
used = true;
|
||||
pipe = Pipe.open();
|
||||
open = true;
|
||||
return Channels.newOutputStream(pipe.sink());
|
||||
}
|
||||
}
|
||||
151
core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java
Normal file
151
core/src/main/java/io/xpipe/core/impl/SinkDrainStore.java
Normal file
@@ -0,0 +1,151 @@
|
||||
package io.xpipe.core.impl;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.xpipe.core.charsetter.NewLine;
|
||||
import io.xpipe.core.charsetter.StreamCharset;
|
||||
import io.xpipe.core.store.DataFlow;
|
||||
import io.xpipe.core.store.KnownFormatStreamDataStore;
|
||||
import io.xpipe.core.util.JacksonizedValue;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.Pipe;
|
||||
|
||||
/*
|
||||
TODO: Properly enter closed State
|
||||
*/
|
||||
|
||||
@JsonTypeName("drain")
|
||||
@SuperBuilder
|
||||
@Jacksonized
|
||||
@Getter
|
||||
public class SinkDrainStore extends JacksonizedValue implements KnownFormatStreamDataStore {
|
||||
|
||||
public static enum State {
|
||||
NONE_CONNECTED,
|
||||
PRODUCER_CONNECTED,
|
||||
CONSUMER_CONNECTED,
|
||||
OPEN,
|
||||
CLOSED
|
||||
}
|
||||
|
||||
private final String description;
|
||||
private final StreamCharset charset;
|
||||
private final NewLine newLine;
|
||||
|
||||
@JsonIgnore
|
||||
@Setter
|
||||
@Builder.Default
|
||||
private State state = State.NONE_CONNECTED;
|
||||
|
||||
@JsonIgnore
|
||||
private Pipe pipe;
|
||||
|
||||
@Override
|
||||
public DataFlow getFlow() {
|
||||
if (state == State.NONE_CONNECTED) {
|
||||
return DataFlow.INPUT_OR_OUTPUT;
|
||||
}
|
||||
|
||||
if (state == State.PRODUCER_CONNECTED) {
|
||||
return DataFlow.INPUT;
|
||||
}
|
||||
|
||||
if (state == State.CONSUMER_CONNECTED) {
|
||||
return DataFlow.OUTPUT;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void waitForOpen() {
|
||||
while (state != State.OPEN) {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canOpen() throws Exception {
|
||||
return state == State.PRODUCER_CONNECTED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream openInput() throws Exception {
|
||||
checkState(false);
|
||||
|
||||
if (state == State.PRODUCER_CONNECTED) {
|
||||
state = State.OPEN;
|
||||
}
|
||||
|
||||
if (state == State.NONE_CONNECTED) {
|
||||
state = State.CONSUMER_CONNECTED;
|
||||
waitForOpen();
|
||||
}
|
||||
|
||||
try {
|
||||
openPipe();
|
||||
return Channels.newInputStream(pipe.source());
|
||||
} catch (Exception ex) {
|
||||
state = State.CLOSED;
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream openOutput() throws Exception {
|
||||
checkState(true);
|
||||
|
||||
if (state == State.CONSUMER_CONNECTED) {
|
||||
state = State.OPEN;
|
||||
}
|
||||
|
||||
if (state == State.NONE_CONNECTED) {
|
||||
state = State.PRODUCER_CONNECTED;
|
||||
waitForOpen();
|
||||
}
|
||||
|
||||
try {
|
||||
openPipe();
|
||||
return Channels.newOutputStream(pipe.sink());
|
||||
} catch (Exception ex) {
|
||||
state = State.CLOSED;
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private void openPipe() throws IOException {
|
||||
if (pipe == null) {
|
||||
pipe = Pipe.open();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkState(boolean isProducer) {
|
||||
if (state == State.CLOSED) {
|
||||
throw new IllegalStateException("Drain has already been closed");
|
||||
}
|
||||
|
||||
if (state == State.OPEN) {
|
||||
throw new IllegalStateException("Drain is already open");
|
||||
}
|
||||
|
||||
if (state == State.PRODUCER_CONNECTED && isProducer) {
|
||||
throw new IllegalStateException("Producer is already connected");
|
||||
}
|
||||
|
||||
if (state == State.CONSUMER_CONNECTED && !isProducer) {
|
||||
throw new IllegalStateException("Consumer is already connected");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,10 +63,18 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
|
||||
}
|
||||
|
||||
public List<WriteMode> getAvailableWriteModes() {
|
||||
if (getFlow() != null && !getFlow().hasOutput()) {
|
||||
if (getFlow() == null) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
if (getFlow() != null && (getFlow() == DataFlow.TRANSFORMER || getFlow() == DataFlow.INPUT)) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
if (getFlow() != null && (getFlow() == DataFlow.OUTPUT || getFlow() == DataFlow.INPUT_OR_OUTPUT)) {
|
||||
return List.of(WriteMode.REPLACE);
|
||||
}
|
||||
|
||||
return List.of(WriteMode.REPLACE, WriteMode.APPEND, WriteMode.PREPEND);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ public enum DataFlow {
|
||||
INPUT("Input"),
|
||||
@JsonProperty("output")
|
||||
OUTPUT("Output"),
|
||||
@JsonProperty("inputOrOutput")
|
||||
INPUT_OR_OUTPUT("Input or Output"),
|
||||
@JsonProperty("inputOutput")
|
||||
INPUT_OUTPUT("Input/Output"),
|
||||
@JsonProperty("transformer")
|
||||
|
||||
Reference in New Issue
Block a user