mirror of
https://github.com/xpipe-io/xpipe.git
synced 2026-04-24 08:31:39 -04:00
Large refactor
This commit is contained in:
@@ -152,7 +152,7 @@ public interface DataSource {
|
||||
DataSourceType getType();
|
||||
|
||||
|
||||
DataSourceConfigInstance getConfig();
|
||||
DataSourceConfig getConfig();
|
||||
|
||||
/**
|
||||
* Attempts to cast this object to a {@link DataTable}.
|
||||
|
||||
32
api/src/main/java/io/xpipe/api/DataSourceConfig.java
Normal file
32
api/src/main/java/io/xpipe/api/DataSourceConfig.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package io.xpipe.api;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Represents the current configuration of a data source.
|
||||
*/
|
||||
public final class DataSourceConfig {
|
||||
|
||||
/**
|
||||
* The data source provider id.
|
||||
*/
|
||||
private final String provider;
|
||||
|
||||
/**
|
||||
* The set configuration parameters.
|
||||
*/
|
||||
private final Map<String, String> configInstance;
|
||||
|
||||
public DataSourceConfig(String provider, Map<String, String> configInstance) {
|
||||
this.provider = provider;
|
||||
this.configInstance = configInstance;
|
||||
}
|
||||
|
||||
public String getProvider() {
|
||||
return provider;
|
||||
}
|
||||
|
||||
public Map<String, String> getConfigInstance() {
|
||||
return configInstance;
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataRaw;
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
import java.io.InputStream;
|
||||
@@ -9,7 +10,7 @@ public class DataRawImpl extends DataSourceImpl implements DataRaw {
|
||||
|
||||
private final DataSourceInfo.Raw info;
|
||||
|
||||
public DataRawImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Raw info) {
|
||||
public DataRawImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Raw info) {
|
||||
super(sourceId, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSource;
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.connector.XPipeConnection;
|
||||
import io.xpipe.beacon.exchange.StoreStreamExchange;
|
||||
import io.xpipe.beacon.exchange.QueryDataSourceExchange;
|
||||
import io.xpipe.beacon.exchange.ReadExecuteExchange;
|
||||
import io.xpipe.beacon.exchange.ReadPreparationExchange;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.beacon.exchange.StoreStreamExchange;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
|
||||
@@ -19,22 +18,23 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
return XPipeConnection.execute(con -> {
|
||||
var req = QueryDataSourceExchange.Request.builder().ref(ds).build();
|
||||
QueryDataSourceExchange.Response res = con.performSimpleExchange(req);
|
||||
var config = new DataSourceConfig(res.getProvider(), res.getConfig());
|
||||
switch (res.getInfo().getType()) {
|
||||
case TABLE -> {
|
||||
var data = res.getInfo().asTable();
|
||||
return new DataTableImpl(res.getId(), res.getConfig(), data);
|
||||
return new DataTableImpl(res.getId(), config, data);
|
||||
}
|
||||
case STRUCTURE -> {
|
||||
var info = res.getInfo().asStructure();
|
||||
return new DataStructureImpl(res.getId(), res.getConfig(), info);
|
||||
return new DataStructureImpl(res.getId(), config, info);
|
||||
}
|
||||
case TEXT -> {
|
||||
var info = res.getInfo().asText();
|
||||
return new DataTextImpl(res.getId(), res.getConfig(), info);
|
||||
return new DataTextImpl(res.getId(), config, info);
|
||||
}
|
||||
case RAW -> {
|
||||
var info = res.getInfo().asRaw();
|
||||
return new DataRawImpl(res.getId(), res.getConfig(), info);
|
||||
return new DataRawImpl(res.getId(), config, info);
|
||||
}
|
||||
}
|
||||
throw new AssertionError();
|
||||
@@ -60,20 +60,21 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
});
|
||||
|
||||
var configInstance = startRes.getConfig();
|
||||
configInstance.getConfigInstance().getCurrentValues().putAll(config);
|
||||
var endReq = ReadExecuteExchange.Request.builder()
|
||||
.target(id).dataStore(store).config(configInstance).build();
|
||||
XPipeConnection.execute(con -> {
|
||||
con.performSimpleExchange(endReq);
|
||||
});
|
||||
//TODO
|
||||
// configInstance.getConfigInstance().getCurrentValues().putAll(config);
|
||||
// var endReq = ReadExecuteExchange.Request.builder()
|
||||
// .target(id).dataStore(store).config(configInstance).build();
|
||||
// XPipeConnection.execute(con -> {
|
||||
// con.performSimpleExchange(endReq);
|
||||
// });
|
||||
var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest();
|
||||
return get(ref);
|
||||
}
|
||||
|
||||
private final DataSourceId sourceId;
|
||||
private final DataSourceConfigInstance config;
|
||||
private final DataSourceConfig config;
|
||||
|
||||
public DataSourceImpl(DataSourceId sourceId, DataSourceConfigInstance config) {
|
||||
public DataSourceImpl(DataSourceId sourceId, DataSourceConfig config) {
|
||||
this.sourceId = sourceId;
|
||||
this.config = config;
|
||||
}
|
||||
@@ -84,7 +85,7 @@ public abstract class DataSourceImpl implements DataSource {
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceConfigInstance getConfig() {
|
||||
public DataSourceConfig getConfig() {
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataStructure;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.source.*;
|
||||
@@ -8,7 +9,7 @@ public class DataStructureImpl extends DataSourceImpl implements DataStructure {
|
||||
|
||||
private final DataSourceInfo.Structure info;
|
||||
|
||||
public DataStructureImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Structure info) {
|
||||
public DataStructureImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Structure info) {
|
||||
super(sourceId, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@@ -5,14 +5,13 @@ import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.DataTableAccumulator;
|
||||
import io.xpipe.api.connector.XPipeConnection;
|
||||
import io.xpipe.api.util.TypeDescriptor;
|
||||
import io.xpipe.beacon.exchange.StoreStreamExchange;
|
||||
import io.xpipe.beacon.exchange.ReadExecuteExchange;
|
||||
import io.xpipe.beacon.exchange.StoreStreamExchange;
|
||||
import io.xpipe.core.data.node.DataStructureNode;
|
||||
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
|
||||
import io.xpipe.core.data.node.TupleNode;
|
||||
import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.data.typed.TypedDataStreamWriter;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
|
||||
@@ -40,7 +39,7 @@ public class DataTableAccumulatorImpl implements DataTableAccumulator {
|
||||
connection.close();
|
||||
|
||||
var req = ReadExecuteExchange.Request.builder()
|
||||
.target(id).dataStore(res.getStore()).config(DataSourceConfigInstance.xpbt()).build();
|
||||
.target(id).dataStore(res.getStore()).build();
|
||||
XPipeConnection.execute(con -> {
|
||||
con.performSimpleExchange(req);
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataTable;
|
||||
import io.xpipe.api.connector.XPipeConnection;
|
||||
import io.xpipe.beacon.BeaconConnection;
|
||||
@@ -24,7 +25,7 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
|
||||
|
||||
private final DataSourceInfo.Table info;
|
||||
|
||||
DataTableImpl(DataSourceId id, DataSourceConfigInstance sourceConfig, DataSourceInfo.Table info) {
|
||||
DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info) {
|
||||
super(id, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.xpipe.api.impl;
|
||||
|
||||
import io.xpipe.api.DataSourceConfig;
|
||||
import io.xpipe.api.DataText;
|
||||
import io.xpipe.core.source.*;
|
||||
|
||||
@@ -10,7 +11,7 @@ public class DataTextImpl extends DataSourceImpl implements DataText {
|
||||
|
||||
private final DataSourceInfo.Text info;
|
||||
|
||||
public DataTextImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Text info) {
|
||||
public DataTextImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Text info) {
|
||||
super(sourceId, sourceConfig);
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user